- 浏览: 1583741 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
jsrgzhangzhiyong:
关于null值的转换还是感觉不太友好,就像 mapstruct ...
我也造了个轮子:BeanMapping(属性拷贝) -
he037:
a417930422 写道引用使用EPHEMERAL会引出一个 ...
基于zookeeper的分布式lock实现 -
seancheer:
qianshangding 写道首先节点启动后,尝试读取本地的 ...
zookeeper学习记录三(session,watcher,persit机制) -
雪夜归人:
您好,我想咨询一下,开源的canal都能支持mysql的哪些版 ...
Canal BinlogChange(mysql5.6) -
zhoudengyun:
copy 一份做记录,后续学习,请知悉
阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费
以前虽然看过一次AQS的源码实现,但在过一段时间后与同学交流时,发觉自己理解并不够深,印像太浅。需要做一个记录整理,帮助自己消化。
AQS中Node的设计:
几个点:
1. Node实现作者: "CLH" (Craig, Landin, and * Hagersten) ,有名的CLH queue
2. 是一个FIFO的链表的实现,对于队列的控制经常要做double-check。
3. Node节点通过一个int waiteStatus代表一些不同意义的状态。
- SIGNAL=-1,代表是需要当前Node节点需要唤起后一个Node节点。在Node节点enqueue时,会设置前一个节点的状态。这样链式的唤醒,完成这样的一个交接棒。
- CONDITION = -2 ,
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 位置1 pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 位置3 Node h = new Node(); // Dummy header h.next = node; node.prev = h; if (compareAndSetHead(h)) { // 位置4 tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { // 位置2 t.next = node; return t; } } } }
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
final Node p = node.predecessor(); // 位置1 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; }
private void unparkSuccessor(Node node) { /* * Try to clear status in anticipation of signalling. It is * OK if this fails or if status is changed by waiting thread. */ compareAndSetWaitStatus(node, Node.SIGNAL, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { //位置1 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
acquire , release , cancel三个动作设计
按照原先作者的设计:
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;
预留了5个protected方法,用于client自己实现相关的处理,进行业务行为控制,因为cocurrent很多Lock,Future的实现都是基于此扩展,定义了自己的处理。
具体的一些方法使用,后续再补。
acquire动作:
独占锁:
- public final void acquire(int arg)
- public final void acquireInterruptibly(int arg) throws InterruptedException
- public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 位置1
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 位置2
parkAndCheckInterrupt()) // 位置3
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node); // 位置4
throw ex;
}
}
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); // 位置1 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } if (shouldParkAfterFailedAcquire(p, node) && // 位置2 parkAndCheckInterrupt()) break; // 位置3 } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); // 位置4 }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); // 位置1 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } if (nanosTimeout <= 0) { // 位置2 cancelAcquire(node); return false; } if (nanosTimeout > spinForTimeoutThreshold && // 位置3 shouldParkAfterFailedAcquire(p, node)) // 位置4 LockSupport.parkNanos(this, nanosTimeout); // 位置5 long now = System.nanoTime(); // 位置6 nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) // 位置7 break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); // 位置8 }
public final void acquireShared(int arg)
和独占锁处理方式基本类似,来看一下核心代码:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 位置1 p.next = null; // help GC if (interrupted) selfInterrupt(); return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
这里设计上有点小技巧,原先思考一个共享锁的典型场景:读写锁。 一旦写锁释放,应该是唤起所有的读锁。而原先在看setHeadAndPropagate,并没有一个循环释放锁的过程。后来思考了下,采用的是一个链式释放的过程,前一个shared的锁对象释放下一个,在释放的时候继续进行tryAccquireShared控制。
一点感悟:在写并发程序时,一些传统编程的思路要有所改变。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException
这两个实现上和独占锁类似,也就是setHeadAndPropagate处理上的不同点而已。
release动作:
public final boolean release(int arg)
if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;
没啥特别好将的,一看基本也就明白了,出队列的时候,同时唤醒下一个Node。
cancel动作:
private void cancelAcquire(Node node)
代码就不贴了,几个处理:
1. 从链表上删除cancel节点
2. 如果cancel节点是head,则尝试唤醒cancel节点的下一个节点。
ConditionObject的理解
几个主要方法:
- public final void await() throws InterruptedException
- public final void awaitUninterruptibly()
- public final long awaitNanos(long nanosTimeout) throws InterruptedException
- public final boolean awaitUntil(Date deadline) throws InterruptedException
- public final boolean await(long time, TimeUnit unit) throws InterruptedException
- public final void signal()
- public final void signalAll()
Array queue; Object empty = new Object(); Object full = new Object(); // 生产者 if(queue 是否满了) full.wait() //阻塞等待 else put(queue , data) //放入数据 empty.single(); // 已经放了一个,通知一下 // 消费者 if(queue 是否空了) empty.wait() // 阻塞等待 else data = get(queue); full.single() // 已经消费了,通知一下
整体概念
在整个AQS存在两种链表。 一个链表就是整个Sync Node链表,横向链表。另一种链表就是Condition的wait Node链表,相对于Sync node,它属于node节点的一个纵向链表。当纵向列表被single通知后,会进入对应的Sync Node进行排队处理。
通过这样的纵横队列,实现了ConditionObject共享lock锁数据。
评论
注释也提到了这一点 /* We also use "next" links to implement blocking mechanics. * The thread id for each node is kept in its own node, so a * predecessor signals the next node to wake up by traversing * next link to determine which thread it is. Determination of * successor must avoid races with newly queued nodes to set * the "next" fields of their predecessors. This is solved * when necessary by checking backwards from the atomically * updated "tail" when a node's successor appears to be null. * (Or, said differently, the next-links are an optimization * so that we don't usually need a backward scan.) */
先前也没特别留意,仔细看了下代码。
按照作者自己给的注释,提到的应该是cancel引起的。
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
unparkSuccessor方法中的代码:
Node s = node.next; if (s == null || s.waitStatus > 0) { //位置1 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
注意下位置1,s.waitStatus代表的就是cancel的状态判断。
再看一下,cancelAcquire代码行:693行,node.next = node; // help GC
处理cancel节点时,会修改next节点,导致unparkSuccessor时next节点就不再是一个闭环。所以这时需要从tail一直往前查找。
还有一个原因, addWaiter方法先设置node.prev, CASTail之后再设置pred.next。CASTail是原子的,但是其他线程仍然可能看到pred.next为null的不一致的状态,这种情况下从后向前能够完成查找
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
你看它的javadoc说明阿
a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
有3种case的结果要标示,所以用了int
在失败时返回负值;如果在共享模式下获取成功,但后续共享模式下的获取无法成功,则返回 0;如果在共享模式下获取成功,并且后续共享模式下的获取也能成功(在这种情况下,后续等待线程必须检查可用性),则返回正值。
不太明白说明中=0和>0两种场景的含义,麻烦讲解一下。
举个case:读写锁.
1. 第1次read操作,调用tryAcquireShared,返回的是0
2. 第2次read操作,调用tryAcquireShared,返回的是1,表示还是能获取成功.
3. 第3次write操作,调用tryAcquireShared会-1,直到read操作释放锁,然后返回0
4. 第4次write操作,调用tryAcquireShared会返回-1,直到write操作释放锁之后,然后返回0
ReentrantReadWriteLock的tryAcquireShared返回值只有1和-1,没有0的返回值
有点明白了:
tryAcquireShared大部分场景只需要判断<0和>=0两种,Semaphore的tryAcquireShared就存在对于=0的情况,应该只是对调用者一个暗示,可能在某些情况下会区分=0和>0。唯一不好的地方就是这几个API变得有那么一点点不一致了。
你看它的javadoc说明阿
a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
有3种case的结果要标示,所以用了int
在失败时返回负值;如果在共享模式下获取成功,但后续共享模式下的获取无法成功,则返回 0;如果在共享模式下获取成功,并且后续共享模式下的获取也能成功(在这种情况下,后续等待线程必须检查可用性),则返回正值。
不太明白说明中=0和>0两种场景的含义,麻烦讲解一下。
举个case:读写锁.
1. 第1次read操作,调用tryAcquireShared,返回的是0
2. 第2次read操作,调用tryAcquireShared,返回的是1,表示还是能获取成功.
3. 第3次write操作,调用tryAcquireShared会-1,直到read操作释放锁,然后返回0
4. 第4次write操作,调用tryAcquireShared会返回-1,直到write操作释放锁之后,然后返回0
ReentrantReadWriteLock的tryAcquireShared返回值只有1和-1,没有0的返回值
你看它的javadoc说明阿
a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
有3种case的结果要标示,所以用了int
在失败时返回负值;如果在共享模式下获取成功,但后续共享模式下的获取无法成功,则返回 0;如果在共享模式下获取成功,并且后续共享模式下的获取也能成功(在这种情况下,后续等待线程必须检查可用性),则返回正值。
不太明白说明中=0和>0两种场景的含义,麻烦讲解一下。
举个case:读写锁.
1. 第1次read操作,调用tryAcquireShared,返回的是0
2. 第2次read操作,调用tryAcquireShared,返回的是1,表示还是能获取成功.
3. 第3次write操作,调用tryAcquireShared会-1,直到read操作释放锁,然后返回0
4. 第4次write操作,调用tryAcquireShared会返回-1,直到write操作释放锁之后,然后返回0
你看它的javadoc说明阿
a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
有3种case的结果要标示,所以用了int
在失败时返回负值;如果在共享模式下获取成功,但后续共享模式下的获取无法成功,则返回 0;如果在共享模式下获取成功,并且后续共享模式下的获取也能成功(在这种情况下,后续等待线程必须检查可用性),则返回正值。
不太明白说明中=0和>0两种场景的含义,麻烦讲解一下。
你看它的javadoc说明阿
a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
有3种case的结果要标示,所以用了int
先前也没特别留意,仔细看了下代码。
按照作者自己给的注释,提到的应该是cancel引起的。
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
unparkSuccessor方法中的代码:
Node s = node.next; if (s == null || s.waitStatus > 0) { //位置1 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
注意下位置1,s.waitStatus代表的就是cancel的状态判断。
再看一下,cancelAcquire代码行:693行,node.next = node; // help GC
处理cancel节点时,会修改next节点,导致unparkSuccessor时next节点就不再是一个闭环。所以这时需要从tail一直往前查找。
这段:
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
我自己理解的是如果前一个判断shouldParkAfterFailedAcquire返回true,则说明node线程可以放心的去park,因为它的前一个节点的waitStatus已经为Signal了,所以进入了下一个判断parkAndCheckInterrupt,看了下:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
其中LockSupport.park(this);是把当前线程阻塞住,这个是不是意味着当先线程执行被阻塞,也就暂时停止了在acquireQueued方法中的for(;;)的自旋循环呢?
如果轮到改node出queue时,它被唤醒,自旋循环又开始了呢?
不清楚理解是不是有误,望楼主赐教啊~
1. 自旋锁的理解有偏差。 一般自旋锁是以cpu换效率。如果thread.wait阻塞100ns超时,线程切换(阻塞/唤醒)的代价比较高,肯能超过了100ns。这时就可以选择while(true){}的死循环来处理。
Exchanger类中有一个自旋锁的使用:
private static Object spinWait(Node node, Slot slot) { int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) --spins; else tryCancel(node, slot); } }
2. 其中LockSupport.park(this);是把当前线程阻塞住。这意味着当前的线程处于WAITING状态,Node队列的上一个节点会唤醒该节点。重新回到for(;;)进行检查,这里不是一个自旋锁。可以理解为一个乐观锁的实现。
比如我们在数据库上常用的乐观锁: update xxxx set A = A+1 where A = xxx。 每次都先check一下当前的current value,然后再进行相应的操作。如果更新失败,继续for(;;),直到更新成功。
这段:
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
我自己理解的是如果前一个判断shouldParkAfterFailedAcquire返回true,则说明node线程可以放心的去park,因为它的前一个节点的waitStatus已经为Signal了,所以进入了下一个判断parkAndCheckInterrupt,看了下:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
其中LockSupport.park(this);是把当前线程阻塞住,这个是不是意味着当先线程执行被阻塞,也就暂时停止了在acquireQueued方法中的for(;;)的自旋循环呢?
如果轮到改node出queue时,它被唤醒,自旋循环又开始了呢?
不清楚理解是不是有误,望楼主赐教啊~
发表评论
-
yugong QuickStart
2016-03-05 01:52 0几点说明 a. 数据迁移的方案可参见设计文档,oracl ... -
阿里巴巴开源项目: 阿里巴巴去Oracle数据迁移同步工具
2016-03-05 18:29 6344背景 08年左右,阿里巴巴开始尝试MySQL的相关 ... -
愚公performance
2016-03-02 17:29 0性能测试 全量测试 场景1 (单主键, ... -
yugong AdminGuide
2016-03-02 16:40 0环境要求 操作系统 数据库 迁移方案 部署 ... -
Tddl_hint
2014-01-27 13:52 0背景 工作原理 Hint格式 direct模 ... -
tddl5分库规则
2014-01-26 14:41 0背景 工作原理 构建语法树 元数据 基于 ... -
tddl5优化器
2014-01-22 15:12 0背景 工作原理 构建语法树 元数据 抽象语 ... -
Canal BinlogChange(mariadb5/10)
2014-01-20 17:25 4450背景 先前开源了一个 ... -
asynload quickstart
2013-10-08 22:49 0几点说明: 1. asyncload是做为一个j ... -
网友文档贡献
2013-09-18 15:50 01. Otter源代码解析系列 链接:http://e ... -
Manager配置介绍
2013-09-16 13:00 0通道配置说明 多种同步方式配置 a. 单向同步 ... -
canal&otter FAQ
2013-09-05 17:30 0常见问题 1. canal和 ... -
阿里巴巴开源项目:分布式数据库同步系统otter(解决中美异地机房)
2013-08-22 16:48 40211项目背景 阿里巴巴B2B公司,因为业务的特性 ... -
Otter AdminGuide
2013-08-19 11:06 0几点说明 otter系统自带了manager,所以简化了一 ... -
Otter高可用性
2013-08-17 23:41 0基本需求 网络不可靠,异地机房尤为明显. man ... -
Otter数据一致性
2013-08-17 23:39 0技术选型分析 需要处理一致性的业务场景: 多地修改 ( ... -
Otter扩展性
2013-08-17 22:20 0扩展性定义 按照实现不同,可分为两类: 数据处理自定 ... -
Otter双向回环控制
2013-08-17 21:37 0基本需求 支持mysql/oracle的异构数据库的双 ... -
Otter调度模型
2013-08-17 20:13 0背景 在介绍调度模型之前,首先了解一下otter系统要解 ... -
Otter Manager介绍
2013-08-16 11:16 0背景 otter4.0发布至 ...
相关推荐
JDK1.8中文文档 JDK1.8中文 jkd8中文文档 JDK中文版 标准的API规范文档,谷歌中文翻译 全翻译
JDK中文手册(JAVA),JDK API手册
下载后直接去本机jdk目录里替换jdk中的src.zip 再打开idea就能看到中文版的源码注释 示例 https://blog.csdn.net/a7459/article/details/106495622
jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助文档jdk8帮助...
jdk8中文说明文档_CHM.zip jdk1.8文档 jdk1.8说明文档 官方直接翻译
dubbo-admin在jdk1.8环境下运行,dubbo-admin在jdk1.8环境下运行dubbo-admin在jdk1.8环境下运行dubbo-admin在jdk1.8环境下运行dubbo-admin在jdk1.8环境下运行dubbo-admin在jdk1.8环境下运行dubbo-admin在jdk1.8环境下...
JDK 1.8中文API文档
jdk中文文档,积分便宜你的不二选择, jdk中文文档,积分便宜你的不二选择, jdk中文文档,积分便宜你的不二选择
JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11安装包,JDK11安装包JDK11...
jdk8中文手册
JAVA设计模式在JDK中的应用JAVA设计模式在JDK中的应用 各种设计模式在JDK当中使用的地方 方便分析源码
JDK1.8 API 中文谷歌翻译版 java帮助文档 JDK API java 帮助文档 谷歌翻译 JDK1.8 API 中文 谷歌翻译版 java帮助文档 Java最新帮助文档 本帮助文档是使用谷歌翻译,非人工翻译。准确性不能保证,请与英文版配合使用 ...
JDK1.8API中文文档,谷歌翻译版
jdk api 1.8_中文文档 jdk api 1.8_中文文档 jdk api 1.8_中文文档
mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk1.8安装包!mac系统jdk...
主要是帮助基于jdk1.8版本开发的项目的中文帮助文档。
JDK API 1.8 中文 JDK API 1.8 中文 JDK API 1.8 中文 JDK API 1.8 中文 JDK API 1.8 中文 JDK API 1.8 中文 JDK API 1.8 中文 JDK API 1.8 中文 ...JDK API 1.8 中 JDK API 1.8 中文 JDK API 1.8 中文
jdk7 jdk8 jdk9 jdk10 jdk11 jdk12 jdk13 jdk14 (win-64位) 资源共享
JDK8 中文帮助文档(jdk api 1.8 google.CHM)
JDK8 API 中文 文档.CHM