文章目录
- 前言
- 重要成员
- 构造器
- 获取信号量(减小state)
- 非公平实现的tryAcquireShared
- 公平实现的tryAcquireShared
- 释放信号量(增加state)
- 工具方法
- tryAcquire
- reducePermits
- drainPermits
- 总结
前言
Semaphore在多线程协作中常常用来控制公共资源的访问,限制同时访问数量。
打个比方,Semaphore就像是一个装有令牌(permit
)的黑箱子,拿到令牌的人才能去做爱做的事情,谁都可以从里面拿走若干令牌,谁都可以把新的令牌扔到里面去,但Semaphore从来不记载谁拿走的令牌。
重要成员
和CountDownLatch一样,Semaphore依赖的也是AQS的共享锁,核心属性也是AQS的state成员。在Semaphore中,使用内部类继承AQS使用。
构造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
默认构造器使用非公平锁,可以通过参数fair
来得到公平锁。参数permits
最终会赋值给AQS的state成员。
获取信号量(减小state)
Semaphore方法 | 调用的AQS方法 | 是否阻塞 | 是否响应中断 | 是否超时机制 | 返回值及含义 |
---|---|---|---|---|---|
acquire() | sync.acquireSharedInterruptibly(1) | ✓ | ✓ | - | void |
acquire(int permits) | sync.acquireSharedInterruptibly(permits) | ✓ | ✓ | - | void |
acquireUninterruptibly() | sync.acquireShared(1) | ✓ | - | - | void |
acquireUninterruptibly(int permits) | sync.acquireShared(permits) | ✓ | - | - | void |
tryAcquire(long timeout, TimeUnit unit) | sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) | ✓ | ✓ | ✓ | boolean 返回时是否获得了锁 |
tryAcquire(int permits, long timeout, TimeUnit unit) | sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)) | ✓ | ✓ | ✓ | boolean 返回时是否获得了锁 |
tryAcquire() | sync.nonfairTryAcquireShared(1) >= 0 | - | boolean 返回时是否获得了锁 |
- 上面表格中,所有没有参数的Semaphore方法,实质上都是在获得单位1的信号量。
- 调用的AQS方法,其实只有三种,
acquireShared
(共享锁的获取与释放中已经进行了讲解),acquireSharedInterruptibly
和tryAcquireSharedNanos
(CountDownLatch源码解析中已经进行了讲解)。这三者的区别已经在表格列出。- 需要响应中断,方法声明会抛出中断异常。
- 有超时机制,就需要用返回值区别 获得锁返回 和 超时都没获得到锁 两种情况。
- 这三个方法都需要调用到AQS子类实现的
tryAcquireShared
,该方法用来获取共享锁,子类可以将其实现公平锁或是非公平锁。
nonfairTryAcquireShared
其实不大应该放在上表里面,因为它根本不是AQS的方法,只是AQS子类的新加方法。因为它根本没有阻塞等待的过程,只是简单的try一次,成功失败听天由命,所以它根本不会阻塞。
表格中,带permits
参数的方法可以获得单位不为1的信号量,但是方法中对permits
参数的检查要求是>=0
,也就是说,允许线程获得0单位的信号量,虽然我感觉这样没有任何意义。
这里我们再复习一下tryAcquireShared
返回值的含义:
- 如果返回值大于0,说明获取共享锁成功,并且后续获取也可能获取成功。
- 如果返回值等于0,说明获取共享锁成功,但后续获取可能不会成功。
- 如果返回值小于0,说明获取共享锁失败。
非公平实现的tryAcquireShared
//NonfairSync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
//Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 方法使用了自旋,这是合理且必要的。共享锁是共享的,自然可能有多个线程正在同时执行上面的代码,即使失败了也不能退出循环,而是应该失败后再得到当前值,然后再次CAS尝试。
- 如果
remaining
算出来小于0,说明剩余信号量已经不够拿的了,那就直接返回remaining
这个负数(表达获取共享锁失败),不做CAS操作。 - 如果
remaining
算出来大于等于0,说明剩余信号量够拿的,紧接着如果CAS设置成功,就返回remaining
这个大于等于0的数(表达获取共享锁成功)。 - 这个方法想要退出,只有当前线程拿到了想要数量的信号量,或剩余信号量已经不够拿。
公平实现的tryAcquireShared
//FairSync
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
区别就是要先判断同步队列中是否已经有节点了(hasQueuedPredecessors
),如果有那同步队列中的节点属于是排在当前线程之前的,所以只好直接返回-1。
释放信号量(增加state)
释放信号量就不用区别什么公平不公平了。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
- 考虑多线程,使用自旋保证
releases
单位的信号量能够释放到位。 - 只有CAS设置成功,或溢出int型的范围,才能退出这个循环。
工具方法
tryAcquire
上面表格里,一直没有好好讲tryAcquire
,因为它在里面属于一个异类,没有阻塞等待的过程。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
而nonfairTryAcquireShared
的实现也看过了,里面根本没有调用过LockSupport.park
,确实没有阻塞。不要被自旋所迷惑,自旋也不是阻塞,而且这个自旋过程一般情形下很快就会结束。
简而言之,Semaphore#tryAcquire
的作用就是尝试 一次性的、非公平的 获得锁动作。注意这种一次性动作一定要是非公平实现的,不然大部分情况下(同步队列中只要有一个线程在等待),这种一次性动作肯定不能成功。这也是为什么要把非公平实现放到NonfairSync
和FairSync
的父类里的一个公共方法里。
注意,返回值进行了处理,如果获得共享锁成功(nonfairTryAcquireShared
返回值>=0
),返回true;如果获得共享锁失败(nonfairTryAcquireShared
返回值<0
),返回false。
reducePermits
//Semaphore
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
//Sync
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
- 这个方法的作用也是获得信号量,只不过这个函数相比
nonfairTryAcquireShared
的实现,它允许改变后的信号量是负数。 - 自旋的过程。想要退出函数,只有CAS操作成功或者向下溢出了。
if (next > current)
分支进入的原因只能是int型变量向下溢出,因为reductions
被保证是一个>=0
的数。- 是
protected
方法,用起来不是那么方便。
//Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
放一下nonfairTryAcquireShared
的实现作对比。
drainPermits
//Semaphore
public int drainPermits() {
return sync.drainPermits();
}
//Sync
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
自旋的过程,直到信号量被清空为0。