锁定并占有资源

同步块


synchronized是最基本的内置同步方式

  • 同步普通方法
    锁在实例上,同一实例对象,即使是不同方法,也不能被不同线程同时访问,因为实例对象被独占
Sample
1
2
3
4
5
6
7
8
class Test{
public synchronized void method1(){
//logic
}
public synchronized void method2(){
//logic
}
}
  • 同步静态方法
    锁在类上,同一时刻只有其中一个方法能被静态访问
Sample
1
2
3
4
5
6
7
8
class Test{
public synchronized static void method1(){
//logic
}
public synchronized static void method2(){
//logic
}
}
  • 同步代码块
    锁在传入对象上,传入this即锁在当前实例对象上,也可锁在其他自定义对象
    锁对象不同,相互间就不会互斥
Sample
1
2
3
4
5
6
7
8
9
10
11
12
13
class Test{
Object lockObj = new Object();
public void method1(){
synchronized(this){
//logic
}
}
public void method2(){
synchronized(lockObj){
//logic
}
}
}

Synchronized重量级锁实现原理


synchronized同步是可重入的,已拿到锁的线程可以再次自由进入,可以处理锁内递归调用
本质上每个对象都含有monitor,synchronized会找到所对应的对象monitor来判断是否可以加锁

  • synchronized代码块经过编译,会在同步块的前后分别形成monitorentermonitorexit这个两个字节码指令
  • synchronized方法经过编译,方法表中将该方法的access_flags字段中的synchronized标志位置1,调用时进行锁操作
    空闲时有线程申请进入,就成为了它的主人,进入锁定状态。进入时计数加1,退出时减1,同一线程可以再次进入。执行结束后计数重新变成0退出。静态时对象可以是类,表示获取类字节码的monitor

Thread类提供方法,可以检测线程是否持有对象上的锁

Thread.java
1
public static native boolean holdsLock(Object obj);

内部实现

  • 等待的线程进入WaitSet
  • 竞争锁的线程进入ContentionList
  • 为了降低同时访问ContentionList的压力,把一部分线程移动到EntryList
  • 锁持有者Owner在解锁时从EntryList中指定一个线程OnDeck参与竞争,并非直接传递
  • 进入ContentionList前同样可以参与竞争,因此是非公平的

原生锁内部性能优化


锁膨胀:偏向 -> 轻量级 -> 重量级

  • 偏向锁(biased lock):无锁
    同一线程可能连续请求锁,尤其是竞争很少的时候。对象内部有ThreadId字段,获得锁时尝试CAS把线程id写入字段,成功写入相当于给特定线程开了特权无需锁操作
    其他线程要锁时会触发撤销这个偏向锁,撤销方式是在虚拟机安全点(不执行字节码时)暂停持有偏向锁的线程,如果线程已经死了或者不持有锁,恢复成无锁状态,如果还活着并且持有锁,清空线程ID字段并升级成轻量锁,恢复线程继续执行

批量再偏向:虚拟机会监测撤销次数,如果发现一个类的所有锁对象总被撤销,那么判定这个类不适合偏向模式,会批量把所有该类的没在用的偏向锁无释放掉
要找到一个类的所有对象需要遍历堆开销很大,而找到被锁定的类对象只需要遍历线程栈即可,因此采用特殊策略。在对象Mark Word中维护epoch字段,每个类元数据也维护epoch字段,两者相等视为偏向有效。如果进行批量释放,会把类的epoch自增,并且遍历所有持有中的锁对象也epoch自增,保持有效状态,此时未持锁的该类对象由于epoch不一致偏向失效,可以直接被其他线程获取,批量效果是在一个安全点大量释放偏向锁,避免每个单独寻找安全点释放

启动偏向锁-XX:+UseBiasedLocking 默认启用。 偏向锁解锁有开销,大量竞争下性能反而负作用,适合无竞争场景下

  • 轻量锁:自旋
    栈帧中创建lock record存储对象头Mark word副本, CAS假定无锁状态尝试将对象头中的锁记录指针替换成这个新引用,如果失败,说明已经被占用。
    这时判断当前指向的是否是当前线程自己的锁记录,如果不是说明被其他线程锁住,线程获取轻量级锁失败后会进行自旋,但是自旋开销大,在一定情况下进行锁升级,变成重量级锁,进入阻塞状态。 解锁时CAS假定轻量级锁状态,尝试把之前保存在锁记录中的副本替换回来。如果替换成功,说明持有锁时无竞争无需特殊操作,如果失败说明已经是重量级锁有其他线程在阻塞排队,执行唤醒操作。
    轻量级锁适合同步块快速执行场景,避免了上下文切换,自旋追求响应时间

  • 重量锁:阻塞
    操作系统互斥机制,Linux的pthread库的互斥锁Mutex Lock,阻塞唤醒机制,需要用户态内核态转换,性能差
    禁用偏向锁和轻量锁-XX:+UseHeavyMonitors。重量级锁适合同步块时间长执行场景,阻塞后空余CPU,追求吞吐量

优化

  • 减少锁粒度,只在必要时加锁

  • 锁粗化(Lock Coarsening)
    临近的代码块用一个锁合并
    启用锁消除-XX:+EliminateLocks

  • 锁消除(lock elision)
    逃逸分析,分析变量的作用范围。如果发现没有共享的可能,消除锁操作,避免无意义锁
    启用逃逸分析和锁消除-XX:+DoEscapeAnalysis,-XX:+EliminateLocks

  • 锁分离
    分解成多个锁,比如LinkedBlockingQueue插入取出在两端,两个锁控制

原生锁使用性能优化


创建一个对象用来加锁,byte[0]Object更好,基本类型byte空数组,相比对象而言,减少字节码指令

Lock


显式锁提供了同步块不能提供的功能,是同步块的补充

Lock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Lock {
void lock();
//可响应中断
void lockInterruptibly() throws InterruptedException;
//非阻塞加锁
boolean tryLock();
//超时时间
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();
//创建条件等待队列
Condition newCondition();
}

通过java.util.concurrent.locks.Lock的接口可以看出基本功能,对比原生锁可以看出功能增强

  • 同步块都是自动加锁放锁,而显式锁必须自己维护
    原生synchronized锁遇到异常会自动释放,而显式锁不能
    所以解锁务必放在finally中,并且加锁应该在try外面,因为加锁可能抛异常失败,此时finally对未加锁对象解锁会异常
Sample
1
2
3
4
5
6
7
8
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
  • 同步块拿不到锁会阻塞等待,而显式锁可以tryLock非阻塞,还可以加等待时间,超时拿不到就先返回
  • 同步块不响应中断,阻塞后不能退出,而显式锁遇到中断可以抛异常放弃加锁
  • 同步块只有一路等待控制,而显式锁可以多个condition多路控制
  • 多个锁间操作,同步块比较难实现
  • 同步块底层基于对象锁,而显式锁基于线程park机制

ReentrantLock


同步块对同一线程是可以重入的,相对应存在ReentrantLock
ReentrantLock的操作都是基于内部类的Sync,Sync继承自队列同步抽象类AbstractQueuedSynchronizer,也就是AQS
AQS不直接作为锁的基类,而是作为内部代理类的基类

ReentrantLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ReentrantLock implements Lock, java.io.Serializable {
//AQS的一种实现
private final Sync sync;

//使用AQS的state来表示锁的持有次数
abstract static class Sync extends AbstractQueuedSynchronizer {
//子类实现加锁策略
abstract void lock();

//tryLock时都使用非公平模式,因此放到公共实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//锁没人用立刻加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {
//锁有人用,而且持有锁的刚好是当前线程,累计重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

//AQS子类定制实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//释放操作的线程并非持有锁线程,非法操作
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果所有重入都已释放,移除独占
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//...
}
}

Sync有两个内部子类提供两种模式

  • 公平:锁空闲时,谁先请求就给谁
  • 非公平:锁空闲时,虽然早有等待中的线程,但新来的线程还是有机会直接拿到锁
ReentrantLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static final class NonfairSync extends Sync {
//非公平版本,能拿到就拿,拿不到就正常排队
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

//AQS子类定制实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}


static final class FairSync extends Sync {
//公平版本,直接去排队
final void lock() {
acquire(1);
}

//AQS子类定制实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//增加了没人排队的条件
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
//当前线程已经持有锁
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//状态设为累计次数
setState(nextc);
return true;
}
return false;
}
}

完全公平每次需要将等待的线程唤醒,而非公平可能无需等待直接拿锁
即使保证了锁的公平,也不能保证线程调度的公平,大多数情况下意义不大
对于重入锁来说,公平非公平只在锁空闲时有效,已持锁线程重入时,总能拿到锁无需排队
非公平锁效率一般更高,默认是非公平的,提供构造参数可配置

ReentrantLock.java
1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

tryLock方法会无视公平性,直接用非公平方式尝试拿锁

ReentrantLock.java
1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

ReentrantLock作为显示锁,还提供一些信息

  • isFair: 锁是否公平
  • isLock: 锁是否被任意线程占用
  • hasQueuedThreads: 是否有线程等待加锁
  • getQueueLength: 预估排队长度
  • hasWaiters: 条件上是否等待
  • isHeldByCurrentThread: 当前线程是否持有锁
  • getHoldCount: 当前线程加锁次数

AQS队列同步器


基础队列同步器,内置FIFO队列,靠state表示锁状态,子类可以定制如何控制state实现不同行为
概括上就是维护了当前状态,持有线程,排队线程队列

基础框架,本身不支持Lock接口,AQS可以作为内部类适配
底层维护状态变量和一个等待队列,全部是volatile能及时感知变化
队列是transient不参与序列化,也没有自定义序列化方法,因此传输后等待队列会丢失

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
//队列结构
private transient volatile Node head;
private transient volatile Node tail;
//状态
private volatile int state;
}

从AbstractOwnableSynchronizer还继承了线程变量,表示当前占有锁的线程

AbstractOwnableSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }

private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

既然维护了队列,就可以获取当前队列中的线程
从后向前遍历,提取Node中的线程作为列表返回

AbstractOwnableSynchronizer.java
1
2
3
4
5
6
7
8
9
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}

AQS暴露给子类一套状态操作方法,不可覆写

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

模板方法设计模式,需要子类提供具体方法实现,即定义什么状态表示加锁或解锁
支持两套模式,独占和共享,供子类实现,默认都是抛出不支持异常,子类可以两套都使用比如ReadWriteLock

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//独占模式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

//共享模式
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

//判断线程独占,用于内部Condition
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

独占加锁逻辑,先尝试加锁,不行就加入队列,队列中尝试加锁,都不行就设置中断
这里的传参没有使用,子类可以用于自由表示含义

AbstractQueuedSynchronizer.java
1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

没有成功加锁的线程和相关状态作为Node结构的等待节点,使用CAS逻辑加入队列尾部,之所以使用CAS是因为多个线程可以操作队列尾部

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//死循环不断尝试,直到加入尾部为止
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

进入队列后开始自旋,看有没有机会拿到锁,拿不到可以根据情况进入等待状态
队列头部的线程是已经获得锁的,因此排到第二位就是有机会获得锁
因为只有一个线程能成功获取到锁,因此这个线程可以操作队列,把自己变成头部,无需CAS操作

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋循环
for (;;) {
//找到前一节点
final Node p = node.predecessor();
//如果排队到第二名,尝试加锁
if (p == head && tryAcquire(arg)) {
//已拿到锁,设为头节点,断开前向指针
setHead(node);
//之前头节点断开后向指针
p.next = null; // help GC
//成功返回
failed = false;
return interrupted;
}
//决定没拿到锁是否进入等待状态
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放锁时并不会操作队列,而是唤醒下一个任务,队列剔除由之后要拿锁的线程负责

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

共享加锁模式

AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final void acquireShared(int arg) {
//小于0表示没有拿到共享锁,0表示拿到共享锁并且是最后一个
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//创建共享模式节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//拿到锁后,除了设置头节点以外,因为是非排他,如果后续也是申请共享锁,唤醒后面的
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

等待时基于park机制

AbstractQueuedSynchronizer.java
1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

ReadWriteLock


一般的锁比如重入锁是线程独占的,但是读写锁允许多个只读线程同时拿到读锁,可以提升读多写少场景的性能
读写锁的接口非常简单,就是获取读锁和获取写锁

ReadWriteLock.java
1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

读写锁的实现类为ReentrantReadWriteLock
读写锁作为一个锁容器实现,本身没有实现Lock接口,而是交给内部的读锁和写锁实现
表面上读锁写锁是两个锁,但其实是共用实现,即读锁写锁实际上是一个队列

ReentrantReadWriteLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
//共用的锁实现
final Sync sync;

//默认是非公平实现
public ReentrantReadWriteLock() {
this(false);
}

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
//构造时都传入自身,读锁写锁实际上共用同一底层实现
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//...
}

AQS只维护一个状态,现在要同时维护读写两种状态,需要进行位划分,高位是读低位是写

ReentrantReadWriteLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
abstract static class Sync extends AbstractQueuedSynchronizer {
//以16位为单位划分
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//0x0000FFFF
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

//获取读锁状态
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//获取写锁状态
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//...
}

获取写锁过程
没有读锁时才允许加写锁,这是为了保证可见性。如果写锁同时存在读锁,那么写操作不能保障可见,这也是为什么不支持锁升级,因为升级一个读锁并不能保证没有其他读锁存在

ReentrantReadWriteLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
//总状态不等于0但是写锁等于0,说明有读锁时,不能加写锁
//线程并非持有排他写锁线程,说明有写锁时,不能给其他线程再加写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//超出最大数量限制
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//写锁重入,更新状态,因为写状态在末尾维护,直接加即可,
setState(c + acquires);
return true;
}
//如果有阻塞的策略,或者尝试状态变更失败(状态被其他线程改变过)
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
//总状态为0,没有任何锁,设置独占
setExclusiveOwnerThread(current);
return true;
}

获取读锁过程
持有写锁的线程可以继续加读锁,这种行为可以进行锁降级
先持有写锁进行数据更新,加读锁,放写锁,继续进行数据处理。好处是虽然释放了写锁,但是继续持有读锁防止其他线程加写锁,保障了后续处理数据不被外界改变。
锁降级之所以提前释放写锁而不是最后释放,目的是允许其他线程读

ReentrantReadWriteLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//有写锁并且持锁线程不是当前线程
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//判断是否应当加读锁并尝试
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//继续尝试
return fullTryAcquireShared(current);
}

饥饿问题

  • 公平模式下,没有饥饿问题,读写都排队,依次执行
  • 非公平模式下,读可能插队,如果持续不断读请求,那么写可能一直拿不到,需要采取策略
    • 写请求总是尝试插队,能拿就直接拿,拿不到就排队
    • 如果一个读请求发现队列头部排了一个写请求,那么应放弃插队。这是一个概率性改进,因为写请求不一定排在头部,写请求在中间时,读可以插队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 公平锁下,只要有排队就去排,读锁不能超越写锁
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

// 非公平模式下,读请求应适当让路
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

StampedLock

读写锁开销还是很大,因为读每次也要加锁
StampedLock基于读操作大概率不和写操作冲突,因此提供不加锁的优化读模式
总体思路是先直接读,读完后判断期间如果写锁,放弃结果

  • 申请一个优化读的stamp
  • 直接进行读操作
  • 验证stamp,是否后来有写锁
  • 如果没写锁,说明没竞争,直接返回读结果
  • 如果有,说明前面读无效,正常申请读锁再次读
    由于有很多特有的接口方法,StampedLock没有继承Lock接口,自成一派

CountDownLatch


倒计时计数锁,一次性使用,可以实现批量join的效果
基础操作countDown/await

main线程等待5个线程结束后再继续执行,因为主程满足条件后不一定能立即执行,因此前面至少有5个线程完成

Sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(5);
Runnable r = () -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("Finish:" + Thread.currentThread().getName());
cdl.countDown();
}
};
for (int i = 0; i < 10; i++) {
new Thread(r, String.valueOf(i)).start();
}
cdl.await();
System.out.println("Main");
}
// Finish:0
// Finish:1
// Finish:2
// Finish:3
// Finish:9
// Finish:8
// Main
// Finish:7
// Finish:6
// Finish:5
// Finish:4

CyclicBarrier


循环栅栏,每集齐一定数量,放开一次,支持reset重置可重复使用
基础操作await

设定容量5的栅栏,开9个线程,必然有5个执行完,剩下4个没凑齐无法继续执行
等待中的线程,如果有成员出异常,这批等待线程也会报异常,栅栏被打破

Sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) throws InterruptedException {
//关联一个barrier action, 达到数量时执行一段逻辑
CyclicBarrier cb = new CyclicBarrier(5, () -> {
System.out.println("Gather");
});

Runnable r = () -> {
try {
Thread.sleep(200);
cb.await(1, TimeUnit.SECONDS);
System.out.println("Finish:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
System.out.println("Broken:" + Thread.currentThread().getName());
} catch (TimeoutException e) {
System.out.println("Timeout:" + Thread.currentThread().getName());
}
};
for (int i = 0; i < 9; i++) {
new Thread(r, String.valueOf(i)).start();
}
}
// Gather
// Finish:6
// Finish:2
// Finish:3
// Finish:0
// Finish:1
// Timeout:5
// Broken:7
// Broken:4
// Broken:8

Semaphore


多容量的锁,如果容量是1就是互斥锁,资源有限时进行数量控制,比如对象池、资源池
基础操作acquire/release

设定容量2,开启5个线程竞争

Sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) throws InterruptedException {
Semaphore s = new Semaphore(2);
Runnable r = () -> {
try {
System.out.println("Want Lock:" + Thread.currentThread().getName());
s.acquire();
System.out.println("Get Lock:" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("Finish:" + Thread.currentThread().getName());
s.release();
}
};
for (int i = 0; i < 5; i++) {
new Thread(r, String.valueOf(i)).start();
}
}
// Want Lock:2
// Get Lock:2
// Want Lock:0
// Get Lock:0
// Want Lock:1
// Want Lock:3
// Want Lock:4
// Finish:2
// Finish:0
// Get Lock:1
// Get Lock:3
// Finish:3
// Finish:1
// Get Lock:4
// Finish:4

Exchanger


两个线程在一个同步点进行数据交换,先到达同步点的线程会等待
Exchanger相当于两个线程的一个共享变量
基础操作exchange

Sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
Thread.sleep(1000);
exchanger.exchange("A");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
Thread.sleep(2000);
String s = exchanger.exchange("B");
System.out.println("B gets " + s); //B gets A
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}