Java 并发锁

Java.util.concurrent 中有很多 Java 并发相关工具,例如 锁 ReentrantLock、线程 Callable FutureTask、信号量 Semaphore、共享锁 CountDownLatch

此外 Java 原生的 volatilesynchronized 关键字也是并发编程的关键。

简单的 CAS 实现:AtomicInteger.compareAndSet(1, 2)

synchronized 关键字

属性:可重入锁、独占锁、非公平锁

每个对象都有一个 monitor 对象于之关联,synchronized 通过对象监视器 monitor 和操作系统的 Mutex Lock 实现加锁和解锁,被锁住的区域是临界区:临界区内被锁保护,线程间只能串行访问的代码;

作用域

  • 修饰实例方法,锁当前对象实例。

    • 修饰的方法在编译成字节码的时候,在 flag 上标记 ACC_SYNCHRONIZED
  • 修饰静态方法,锁当前类,会作用于所有实例。

  • 修饰同步语句块,程序指定 Object / .class。

    • 通过 monitorenter 和 monitorexit 指令实现

锁粗化和锁消除

  • 锁粗化:将多次锁请求合并成一个请求,以降低短时间内大量锁请求、同步、释放带来的性能损耗
  • 锁消除:Java 虚拟机在 JIT 编译时,通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过锁消除,可以节省毫无意义的请求锁时间

.java -> .class 字节码:Javac

字节码 -> 机器码:JIT(Just-in-time)动态(即时)编译;AOT(Ahead Of Time)运行前编译

锁升级策略

  • 偏向锁:只有一个线程持有锁,无竞争情况时,线程可以直接获得锁,没有额外的开销。CAS 修改 Markword 头信息,该操作一般不会撤销,以避免 CAS 开销。

  • 轻量级锁:多个线程都是在不同的时间段来请求同一把锁时,JVM 会使用轻量级锁:通过 CAS 尝试获取锁,不阻塞线程。如果竞争失败直接会升级成重量级锁,升级过程会自旋尝试获取锁。

  • 重量级锁:加锁的过程中,采用自适应自旋,避免直接阻塞线程。自旋失败才会阻塞线程并入队,阻塞操作LockSupport.park由操作系统来实现,性能消耗高。唤醒线程的策略见下图:(waitset 环形双向链表;cxq 栈;entrylist 双向链表)

下图是对象头中的 MarkWord:MarkWord 结构之所以搞得这么复杂,是因为需要节省内存,让同一个内存区域在不同阶段有不同的用处(图有误,hashcode 占 32 位)

  • 指向线程栈中锁记录的指针 = 持有锁线程的 lockRecord 的指针
  • 指向重量级锁的指针 = 指向 monitor 的指针

ReentrantLock

属性:可重入锁、独占锁、公平锁 or 非公平锁

特点:可限时等待、可响应中断、可实现选择性通知

使用场景:阻塞队列 ArrayBlockingQueue LinkedBlockingDeque 通过 Condition 实现队列满插入和队列空取出的阻塞;作为读写锁 ReentrantReadWriteLock 的父类

源码结构:

public class ReentrantLock implements Lock {
// 同步控制器(指向公平锁或非公平锁)
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer{
final boolean nonfairTryAcquire(int acquires) {...}
protected final boolean tryRelease(int releases) {...}
}
// 不公平锁
static final class NonfairSync extends Sync{
final void lock() {...}
protected final boolean tryAcquire(int acquires) {...}
}
// 公平锁
static final class FairSync extends Sync {
final void lock() {...}
protected final boolean tryAcquire(int acquires) {...}
}
// 实现 Lock 接口
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public void unlock() {
sync.release(1);
}
}

ReentrantLock 独占锁运行过程(可重入锁的 state 表示重入的次数,会大于1)

AQS 独占模式获取锁

加锁失败时阻塞

ReentrantLock在 CAS 加锁失败之后会将 Thread 封装成一个Node类型的对象加入CLH队列中

然后调用LockSupport.park(this)进行阻塞(LockSupport是一个 native 方法实现的工具类,在 hotspot 源码中通过mutex来实现的)

与 synchronized 对比

比较 ReentrantLock synchronized
如何实现任务的等待-唤醒? Condition 类 await/signal,可以有选择性的进行线程通知 Object 的 wait/notify,被通知的线程是由 JVM 选择的
如何释放锁? 需要手动释放锁 出代码块后自动释放
底层如何实现? API 实现 JVM 内部锁升级策略
是否可实现可中断锁、公平锁? 可以 不可以
占用资源大不大? 通过自旋CASUnsafe.park/unpark挂起唤醒线程 锁升级策略:轻量级锁CAS不会阻塞挂起;重量级锁才会和ReentrantLock一样park/unpark

可中断锁:获取锁的过程中可以被中断,不需要一直等到获取锁之后才能进行其他逻辑处理。

不可中断锁:一旦线程申请了锁,就只能等到拿到锁以后才能进行其他的逻辑处理。

AbstractQueuedSynchronizer 抽象类

ReentrantLock、倒计时器、信号量的 Sync 和线程池的 worker 都是基于 AQS 实现的。AQS 封装了线程的入队与出队、状态更新以及阻塞与唤醒等底层细节,通过重写tryAcquire(int)tryRelease(int)等方法,实现类可以实现具体的资源控制逻辑。

AQS 核心思想:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,通过 CLH 锁实现线程阻塞等待以及被唤醒时锁分配

CLH 锁:自旋锁的一种改进,利用 FIFO 双端队列Node 节点 维护等待获取资源的线程队列,state 成员变量表示同步状态,每个节点中包括了线程的引用、 当前节点在队列中的状态、前驱节点和后继节点。

源码结构:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
// 表示锁的持有数量
private volatile int state;
// 双向队列(链表实现)
private transient volatile Node head;
private transient volatile Node tail;
// 本地方法 eg:unsafe.compareAndSwapInt
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 封装线程的节点
static final class Node {
// 标识当前节点对应的等待状态(不非负数时,仅作为普通标识,下文有详细介绍)
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
// 共享锁:Semaphore、CountDownLatch
static final Node SHARED = new Node();
// 独占锁:ReentrantLock
static final Node EXCLUSIVE = null;
// 条件队列中,表示下一个节点
// 同步队列中,表示当前节点想要获取的是排他锁还是共享锁
Node nextWaiter;
}
// 条件队列,保存有触发条件的 Node,实现选择性通知
// 作用类似于 Object 内置的监视器方法:wait notify notifyAll
public class ConditionObject implements Condition{
private transient Node firstWaiter;
private transient Node lastWaiter;
}
// 超时中断
static final long spinForTimeoutThreshold = 1000L;
}
  • waitStatus 状态枚举值(初始值为 0,表示初始化状态)【使用情况】
    • CANCELLED 表示线程已经被取消【响应中断】
    • SIGNAL 表示后继节点的线程需要被唤醒【同步队列】
    • CONDITION 表示线程在条件队列中等待某个条件满足【条件队列】
    • PROPAGATE 表示共享模式下的资源传播【共享锁】

实现类可以是独占锁或者共享锁

  • 独占锁:tryAcquire-tryRelease
  • 共享锁:tryAcquireShared-tryReleaseShared

线程调度逻辑:当线程尝试获取资源失败时,会创建一个Node节点并将当前线程包装进去,然后利用CAS算法将其安全地加入到等待队列的尾部,并阻塞。Link

image-20240415164535374

在释放资源时,AQS会根据资源管理策略从队列中选择合适的节点并唤醒对应线程。

CLH 锁

Craig, Landin, and Hagersten locks 实现线程阻塞等待以及被唤醒时锁分配的机制

CLH 锁是对自旋锁的一种改良,是一种隐式的链表队列

采用模板方法设计模式

模板方法:抽象出步骤的执行顺序作为抽象方法,具体的实现方法交给子类实现。

自定义 Synchronizer 时需要重写几个 AQS 提供的钩子方法(通过钩子方法控制 state 为何值时代表加锁成功/失败、解锁成功/失败):tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared、isHeldExclusively

  • 解释一下信号量、倒计时器、可重入锁的 state 的含义

LockSupport 类

操作 Node 中的 thread,实现线程的阻塞 park 和解除阻塞 unparkLink

具体来说,操作”许可“(多次 unpark 也只能获取一次许可,不可叠加),底层通过mutex(互斥量)和condition(条件变量)实现。【mutex存在用户态和内核态的切换】

Java 线程中断方法 方法意义
Thread.currentThread().interrupt() 将线程的状态置为"中断"
Thread.interrupted() 获取当前线程的中断状态,并且会清除线程的状态标记,静态方法
Thread.isInterrupted() 获取调用该方法的对象所表示的线程,不会清除线程的状态标记,实例方法阻塞和中断的区别?

阻塞和中断的区别?

Lock 接口

方法 描述
lock(): void 加锁(ReentrantLock 默认不可响应中断)
lockInterruptibly(): void 加锁,阻塞等待锁的过程中,可以相应中断
tryLock(): boolean 非阻塞获取锁
tryLock(long, TimeUnit): boolean 时间段内获取锁
unlock(): void 解锁
newCondition(): Condition 获取条件等待队列

公平锁 & 非公平锁

AbstractQueuedSynchronizer.hasQueuedPredecessors() 线程在获取锁前,先判断一下自己在不在队列的首位,只有队首线程能被运行

ReentrantReadWriteLock

继承自 ReentrantLock

读锁是共享锁,写锁是独占锁;读读不互斥、读写互斥、写写互斥;写锁可以升级成读锁

  • 共享锁:一把锁可以被多个线程同时获得。
  • 独占锁:一把锁只能被一个线程获得。

StampedLock

不是直接实现 LockReadWriteLock接口,而是基于 CLH 锁 实现的

Semaphore

信号量,可选择公平 / 非公平锁。

控制同时访问特定资源的线程数量,通常用于那些资源有明确访问数量限制的场景,比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)

CountDownLatch

倒计时器:await()阻塞当前线程,当 count为零(即 state)时,唤醒所有被阻塞的线程。

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用

CyclicBarrier

循环栅栏:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程都会继续干活。