Java Condition

提供类似Object的wait/notify功能,搭配显式锁Lock使用

创建Condition是每个锁的基本功能,一个Condition相当于在锁上开辟了一个条件等待队列

Lock.java
1
2
3
4
public interface Lock {
Condition newCondition();
//...
}

Condition依赖Lock,在操作前必须获得锁
唤醒条件

  • 其他线程在Condition上调用了signal或signalAll主动唤醒
  • 其他线程在线程上调用中断
  • 无原因错误唤醒
Condition.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Condition {
//放锁,线程进入等待状态,响应中断
void await() throws InterruptedException;
//也可以不响应中断
void awaitUninterruptibly();
//一段时间后唤醒,因为存在无原因唤醒,因此可能没到时间提前醒过来,可以通过返回值判断
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;

//唤醒一个线程,线程必须要重新获得锁才能从await返回
void signal();
void signalAll();
}

和wait/notify类似,由于存在错误唤醒,需要包围循环检查逻辑

Sample
1
2
3
4
5
6
7
8
9
lock.lock();
try{
while(!condition) {
cond.await();
}
//process
}finally{
lock.unlock();
}

Condition实现


Condition的实现是ConditionObject
它是AQS的内部类,并且是非静态的,因为Condition依附于Lock,因此内部类很合理,并且作为内部类ConditionObject可以操作AQS同步队列

  • 使用Condition需要获得锁,利用AQS中的同步队列
  • 多个线程等待在同一Condition上,因此Condition自身维护等待队列
    用于同步的节点结构和用于条件等待的节点结构是共享的, 使用上有细微区别,条件等待队列是单链表并且维护节点状态,而同步队列是双链表
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
//...

public class ConditionObject implements Condition, java.io.Serializable {
//维护一个队列,记录头尾指针
private transient Node firstWaiter;
private transient Node lastWaiter;

//...
}

创建Condition过程交给AQS实现类,即内部类内使用new获取实例,以重入锁为例

ReentrantLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;

public Condition newCondition() {
return sync.newCondition();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
//...
}

等待过程,大体逻辑是进入条件等待队列等待,等到进入同步队列后重新参与锁竞争

AbstractQueuedSynchronizer$ConditionObject.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程进入等待队列
Node node = addConditionWaiter();
//释放锁,保留状态
int savedState = fullyRelease(node);
int interruptMode = 0;

//节点还在等待队列,没有进入同步队列,线程等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//唤醒后使用之前的状态参与锁竞争,只有拿到锁才能返回
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//处理中断,由于park中断后不会抛异常,异常需要自己抛
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

//用当前线程构建等待节点,加到等待队列尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
//当前指向的最后等待者已经不是等待状态
if (t != null && t.waitStatus != Node.CONDITION) {
//从头开始遍历清理非等待状态已被取消的节点
unlinkCancelledWaiters();
//重新找到最后等待者
t = lastWaiter;
}
//用当前线程创建等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//只维护Node后向指针,作为单链表,新节点插入末尾成为最后等待者
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

唤醒过程,大体逻辑是在条件等待队列中,选取一个还没取消的,转移到同步队列参与锁竞争

AbstractQueuedSynchronizer$ConditionObject.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    public final void signal() {
//必须已经获得锁才能操作
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//等待队列的首节点是唤醒目标
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

//因为存在取消的节点,因此不断尝试
private void doSignal(Node first) {
do {
//第一等待者交给后面的节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//断开当前节点
first.nextWaiter = null;
//转移没成功需要再次尝试下一个
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

//把节点从条件队列转移到同步队列
final boolean transferForSignal(Node node) {
//检测是否依然处于条件等待状态,0是一个未定义的状态,只是为了尝试更新用
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//等待队列中的节点直接插入同步队列
Node p = enq(node);
int ws = p.waitStatus;
//大于0是CANCEL状态,或者更新成SIGNAL状态失败,直接唤醒线程尝试刷新状态
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

//节点插入同步队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//同步队列为空时,需要初始化,用一个空白节点作为头部
if (compareAndSetHead(new Node()))
tail = head;
} else {
//同步队列是双链表结构,需要设置前向指针
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}

总结


  • Condition本身没有条件,条件是外围业务逻辑控制,Condition作用是开辟一个条件等待队列
  • 线程本身是参与锁竞争的,由于不满足条件无法继续执行,外围逻辑调用await把线程放入条件等待队列等待,等到满足条件外围调用signal后加入同步队列重新参与锁竞争
  • Object有一个同步队列(加锁队列)和一个等待队列(wait-notify),而Lock可以包含多个Condition,即能维护多个等待队列
  • Condition底层实现基于LockSupport.park