0%

Java AQS原理以及应用

Java中最常用的锁ReentrantLock就是基于AQS来实现的。网上已经有很多的资料来讲解AQS的原理。因此这里只是写一下个人比较难以理解的点,用以学习。

这里贴一篇非常好的AQS的文章,讲的非常详细,看完了还是可以学到很多东西。

从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队 (meituan.com)

这里就记录下ReentrantLock的非公平锁。

首先是acquire(int)函数,当tryAcquire函数获取锁失败后,将会把线程加入等待队列。

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

关键就在于这个acquireQueued函数,addWaiter()函数就是新建一个节点,然后将节点放入等待队列的末尾,上面放的那篇文章已经非常详细,这里就不写了。

接下来看看acquireQueued()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这里可以看到进入函数之后,会进入自旋,首先找到前驱节点,如果前驱节点已经是head虚节点了,即当前线程可以获得锁,则调用tryAcquire()函数尝试获取锁。获取成功则将当前节点设置为虚节点,然后退出自旋,返回线程是否被中断。

如果前驱节点不是head,那么将会进入shouldParkAfterFailedAcquire()函数,该函数用于判断线程是否应该被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

进入后先检测前驱节点的状态ws。

下面是waitStatus的常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
  1. 当前驱节点已经是唤醒状态,也就是Node.SIGNAL的情况下,当前线程就可以直接阻塞,返回true。

  2. 如果ws大于0,即前驱节点为取消状态,那么就进入do-while循环,不断寻找前驱节点,直到找到一个不是取消状态的节点。然后将不是取消状态的节点的next直接指向当前节点(也就是直接跳过中间被取消的节点)。

  3. 否则代表其他状态,则通过cas尝试将状态设置为SIGNAL。

当返回true以后,将会调用parkAndCheckInterrupt()函数,这里进入后会调用unsafe的park方法,将线程阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

// LockSupport.java
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

这里将会设置阻塞对象parkBlocker,这是一个Thread类的私有成员。

1
2
3
4
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

设置完成后就调用UNSAFE.park()阻塞线程。

这里的parkBlocker是用来记录线程是被哪个对象阻塞的,用于线程监控和分析,通过LockSupport.getBlocker()函数就可以获取parkBlocker。