final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
phaser.register(); // Equivalent to countUp
// do some work asynchronously, invoking
// phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete
public class CountLatch {
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int arg) {
return count.get() == releaseValue ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long releaseValue;
public CountLatch(final long initial, final long releaseValue) {
this.releaseValue = releaseValue;
this.count = new AtomicLong(initial);
this.sync = new Sync();
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public long countUp() {
final long current = count.incrementAndGet();
if (current == releaseValue) {
sync.releaseShared(0);
}
return current;
}
public long countDown() {
final long current = count.decrementAndGet();
if (current == releaseValue) {
sync.releaseShared(0);
}
return current;
}
public long getCount() {
return count.get();
}
}
public class CountingLatch
{
/**
* Synchronization control for CountingLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer
{
private Sync()
{
}
private Sync(final int initialState)
{
setState(initialState);
}
int getCount()
{
return getState();
}
protected int tryAcquireShared(final int acquires)
{
return getState()==0 ? 1 : -1;
}
protected boolean tryReleaseShared(final int delta)
{
// Decrement count; signal when transition to zero
for(; ; ){
final int c=getState();
final int nextc=c+delta;
if(nextc<0){
return false;
}
if(compareAndSetState(c,nextc)){
return nextc==0;
}
}
}
}
private final Sync sync;
public CountingLatch()
{
sync=new Sync();
}
public CountingLatch(final int initialCount)
{
sync=new Sync(initialCount);
}
public void increment()
{
sync.releaseShared(1);
}
public int getCount()
{
return sync.getCount();
}
public void decrement()
{
sync.releaseShared(-1);
}
public void await() throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public boolean await(final long timeout) throws InterruptedException
{
return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
}
}
# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )
软件过程中:
# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )
6条答案
按热度按时间ou6hu8tu1#
您还可以使用Phaser (java.util.concurrent.Phaser)
uqxowvwt2#
java.util.concurrent.Semaphore似乎符合要求。
(*)好吧,没有内置的方法来等待信号量变为 * 不可用 *。我想你应该为
acquire
编写自己的 Package 器,先执行tryAcquire
,如果失败,则触发你的“忙碌事件”(并继续使用普通的acquire
)。每个人都需要调用你的 Package 器。也许子类Semaphore?ohfgkhjo3#
您可以使用下面这样的简单实现,而不是从AQS开始,它有点幼稚(它是同步的,而不是AQS无锁算法),但除非您希望在满足的场景中使用它,否则它可能已经足够好了。
注意:我没有深入测试它,但它似乎表现如预期:
输出:
pw9qyyiw4#
对于那些需要基于AQS的解决方案的用户,以下是适合我的解决方案:
用初始值和目标值初始化同步器,一旦达到目标值(通过向上计数和/或向下计数),等待的线程将被释放。
i7uq4tfw5#
我需要一个,并使用与CountDownLatch相同的策略构建了它,CountDownLatch使用AQS(非阻塞),这个类也非常类似(如果不完全相同)为Apache Camel创建的类,我认为它也比JDK Phaser更轻,这将像JDK中的CountDownLact一样,它不会让你倒数到零以下,而是允许你倒数和递增:
导入java.util.并发时间单位;导入java.util.并发.锁.抽象队列同步器;
kcwpcxri6#
这是
CounterLatch
的变体,可从Apache站点获得。由于众所周知的原因,它们的版本在变量(
AtomicInteger
)为给定值时阻塞调用者线程。但是,调整这段代码非常容易,您可以选择Apache版本所做的事情,或者...说"在这里等待直到**计数器达到某个值"。可以说,后者将具有更多的适用性。在我的特殊情况下,我匆匆完成了这个,因为我想检查所有的"块"是否都已在
SwingWorker.process()
中发布...但后来我发现了它的其他用途。这里是用Jython编写的,Jython是世界上最好的语言(TM),我将在适当的时候推出Java版本。
注意,Apache版本使用关键字
volatile
表示signal
和released
,在Jython中我不认为这样存在,但是使用AtomicInteger
和AtomicBoolean
应该确保在任何线程中没有值"过期"。示例用法:
在SwingWorker构造函数中:
在软件中发布:
软件过程中:
在等待块处理停止的线程中: