java 可递增的锁存器

vlurs2pr  于 2022-12-25  发布在  Java
关注(0)|答案(6)|浏览(125)

是否有人知道是否有任何锁存器实现可以实现以下功能:

  • 具有递减锁存器值的方法,或者在值为零时等待
  • 具有用于等待锁存器值为零的方法
  • 有一个方法,用于将数字添加到锁存器的值
ou6hu8tu

ou6hu8tu1#

您还可以使用Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
    phaser.register(); // Equivalent to countUp
    // do some work asynchronously, invoking
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete
uqxowvwt

uqxowvwt2#

java.util.concurrent.Semaphore似乎符合要求。

  • 获取()或获得(n)
  • 也获得()(不确定我是否理解这里的区别)(*)
  • 释放()或释放(n)

(*)好吧,没有内置的方法来等待信号量变为 * 不可用 *。我想你应该为acquire编写自己的 Package 器,先执行tryAcquire,如果失败,则触发你的“忙碌事件”(并继续使用普通的acquire)。每个人都需要调用你的 Package 器。也许子类Semaphore?

ohfgkhjo

ohfgkhjo3#

您可以使用下面这样的简单实现,而不是从AQS开始,它有点幼稚(它是同步的,而不是AQS无锁算法),但除非您希望在满足的场景中使用它,否则它可能已经足够好了。

public class CountUpAndDownLatch {
    private CountDownLatch latch;
    private final Object lock = new Object();

    public CountUpAndDownLatch(int count) {
        this.latch = new CountDownLatch(count);
    }

    public void countDownOrWaitIfZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() == 0) {
                lock.wait();
            }
            latch.countDown();
            lock.notifyAll();
        }
    }

    public void waitUntilZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() != 0) {
                lock.wait();
            }
        }
    }

    public void countUp() { //should probably check for Integer.MAX_VALUE
        synchronized(lock) {
            latch = new CountDownLatch((int) latch.getCount() + 1);
            lock.notifyAll();
        }
    }

    public int getCount() {
        synchronized(lock) {
            return (int) latch.getCount();
        }
    }
}

注意:我没有深入测试它,但它似乎表现如预期:

public static void main(String[] args) throws InterruptedException {
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
    Runnable up = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN UP " + latch.getCount());
                latch.countUp();
                System.out.println("UP " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable downOrWait = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN DOWN " + latch.getCount());
                latch.countDownOrWaitIfZero();
                System.out.println("DOWN " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable waitFor0 = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("WAIT FOR ZERO " + latch.getCount());
                latch.waitUntilZero();
                System.out.println("ZERO " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };
    new Thread(waitFor0).start();
    up.run();
    downOrWait.run();
    Thread.sleep(100);
    downOrWait.run();
    new Thread(up).start();
    downOrWait.run();
}

输出:

IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0
pw9qyyiw

pw9qyyiw4#

对于那些需要基于AQS的解决方案的用户,以下是适合我的解决方案:

public class CountLatch {

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return count.get() == releaseValue ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long releaseValue;

    public CountLatch(final long initial, final long releaseValue) {
        this.releaseValue = releaseValue;
        this.count = new AtomicLong(initial);
        this.sync = new Sync();
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public long countUp() {
        final long current = count.incrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long countDown() {
        final long current = count.decrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long getCount() {
        return count.get();
    }
}

用初始值和目标值初始化同步器,一旦达到目标值(通过向上计数和/或向下计数),等待的线程将被释放。

i7uq4tfw

i7uq4tfw5#

我需要一个,并使用与CountDownLatch相同的策略构建了它,CountDownLatch使用AQS(非阻塞),这个类也非常类似(如果不完全相同)为Apache Camel创建的类,我认为它也比JDK Phaser更轻,这将像JDK中的CountDownLact一样,它不会让你倒数到零以下,而是允许你倒数和递增:
导入java.util.并发时间单位;导入java.util.并发.锁.抽象队列同步器;

public class CountingLatch
{
  /**
   * Synchronization control for CountingLatch.
   * Uses AQS state to represent count.
   */
  private static final class Sync extends AbstractQueuedSynchronizer
  {
    private Sync()
    {
    }

    private Sync(final int initialState)
    {
      setState(initialState);
    }

    int getCount()
    {
      return getState();
    }

    protected int tryAcquireShared(final int acquires)
    {
      return getState()==0 ? 1 : -1;
    }

    protected boolean tryReleaseShared(final int delta)
    {
      // Decrement count; signal when transition to zero
      for(; ; ){
        final int c=getState();
        final int nextc=c+delta;
        if(nextc<0){
          return false;
        }
        if(compareAndSetState(c,nextc)){
          return nextc==0;
        }
      }
    }
  }

  private final Sync sync;

  public CountingLatch()
  {
    sync=new Sync();
  }

  public CountingLatch(final int initialCount)
  {
    sync=new Sync(initialCount);
  }

  public void increment()
  {
    sync.releaseShared(1);
  }

  public int getCount()
  {
    return sync.getCount();
  }

  public void decrement()
  {
    sync.releaseShared(-1);
  }

  public void await() throws InterruptedException
  {
    sync.acquireSharedInterruptibly(1);
  }

  public boolean await(final long timeout) throws InterruptedException
  {
    return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
  }
}
kcwpcxri

kcwpcxri6#

这是CounterLatch的变体,可从Apache站点获得。
由于众所周知的原因,它们的版本在变量(AtomicInteger)为给定值时阻塞调用者线程。
但是,调整这段代码非常容易,您可以选择Apache版本所做的事情,或者...说"在这里等待
直到**计数器达到某个值"。可以说,后者将具有更多的适用性。在我的特殊情况下,我匆匆完成了这个,因为我想检查所有的"块"是否都已在SwingWorker.process()中发布...但后来我发现了它的其他用途。
这里是用Jython编写的,Jython是世界上最好的语言(TM),我将在适当的时候推出Java版本。

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

注意,Apache版本使用关键字volatile表示signalreleased,在Jython中我不认为这样存在,但是使用AtomicIntegerAtomicBoolean应该确保在任何线程中没有值"过期"。
示例用法:
在SwingWorker构造函数中:

self.publication_counter_latch = CounterLatch()

在软件中发布:

# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

软件过程中:

# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )

在等待块处理停止的线程中:

worker.publication_counter_latch.await()

相关问题