java.util.concurrent.Semaphore类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(165)

本文整理了Java中java.util.concurrent.Semaphore类的一些代码示例,展示了Semaphore类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Semaphore类的具体详情如下:
包路径:java.util.concurrent.Semaphore
类名称:Semaphore

Semaphore介绍

[英]A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each #acquire blocks if necessary until a permit is available, and then takes it. Each #release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. For example, here is a class that uses a semaphore to control access to a pool of items:

class Pool public void putItem(Object x)  
if (markAsUnused(x)) 
available.release(); 
} 
// Not a particularly efficient data structure; just for demo 
protected Object[] items = ... whatever kinds of items being managed 
protected boolean[] used = new boolean[MAX_AVAILABLE]; 
protected synchronized Object getNextAvailableItem()  
for (int i = 0; i < MAX_AVAILABLE; ++i)  
if (!used[i])  
used[i] = true; 
return items[i]; 
} 
} 
return null; // not reached 
} 
protected synchronized boolean markAsUnused(Object item)  
for (int i = 0; i < MAX_AVAILABLE; ++i)  
if (item == items[i])  
if (used[i])  
used[i] = false; 
return true; 
} else 
return false; 
} 
} 
return false; 
} 
}}

Before obtaining an item each thread must acquire a permit from the semaphore, guaranteeing that an item is available for use. When the thread has finished with the item it is returned back to the pool and a permit is returned to the semaphore, allowing another thread to acquire that item. Note that no synchronization lock is held when #acquire is called as that would prevent an item from being returned to the pool. The semaphore encapsulates the synchronization needed to restrict access to the pool, separately from any synchronization needed to maintain the consistency of the pool itself.

A semaphore initialized to one, and which is used such that it only has at most one permit available, can serve as a mutual exclusion lock. This is more commonly known as a binary semaphore, because it only has two states: one permit available, or zero permits available. When used in this way, the binary semaphore has the property (unlike many java.util.concurrent.locks.Lockimplementations), that the "lock" can be released by a thread other than the owner (as semaphores have no notion of ownership). This can be useful in some specialized contexts, such as deadlock recovery.

The constructor for this class optionally accepts a fairness parameter. When set false, this class makes no guarantees about the order in which threads acquire permits. In particular, barging is permitted, that is, a thread invoking #acquire can be allocated a permit ahead of a thread that has been waiting - logically the new thread places itself at the head of the queue of waiting threads. When fairness is set true, the semaphore guarantees that threads invoking any of the #acquire() methods are selected to obtain permits in the order in which their invocation of those methods was processed (first-in-first-out; FIFO). Note that FIFO ordering necessarily applies to specific internal points of execution within these methods. So, it is possible for one thread to invoke acquire before another, but reach the ordering point after the other, and similarly upon return from the method. Also note that the untimed #tryAcquire() methods do not honor the fairness setting, but will take any permits that are available.

Generally, semaphores used to control resource access should be initialized as fair, to ensure that no thread is starved out from accessing a resource. When using semaphores for other kinds of synchronization control, the throughput advantages of non-fair ordering often outweigh fairness considerations.

This class also provides convenience methods to #acquire(int) and #release(int) multiple permits at a time. Beware of the increased risk of indefinite postponement when these methods are used without fairness set true.

Memory consistency effects: Actions in a thread prior to calling a "release" method such as release()happen-before actions following a successful "acquire" method such as acquire()in another thread.
[中]计数信号灯。从概念上讲,信号量维护一组许可。每个#如有必要,获取区块,直到获得许可证,然后获取。每次#释放都会增加一个许可证,可能会释放一个阻止收单机构。但是,没有使用实际的许可对象;信号量只是保持可用数字的计数,并相应地进行操作。
信号量通常被用来限制线程的数量,使其无法访问某些(物理或逻辑)资源。例如,下面是一个类,它使用信号量来控制对项目池的访问:

class Pool public void putItem(Object x)  
if (markAsUnused(x)) 
available.release(); 
} 
// Not a particularly efficient data structure; just for demo 
protected Object[] items = ... whatever kinds of items being managed 
protected boolean[] used = new boolean[MAX_AVAILABLE]; 
protected synchronized Object getNextAvailableItem()  
for (int i = 0; i < MAX_AVAILABLE; ++i)  
if (!used[i])  
used[i] = true; 
return items[i]; 
} 
} 
return null; // not reached 
} 
protected synchronized boolean markAsUnused(Object item)  
for (int i = 0; i < MAX_AVAILABLE; ++i)  
if (item == items[i])  
if (used[i])  
used[i] = false; 
return true; 
} else 
return false; 
} 
} 
return false; 
} 
}}

在获取一个项目之前,每个线程都必须从信号量获取一个许可证,以保证该项目可供使用。当线程处理完该项目后,它将返回到池中,并向信号量返回一个许可证,允许另一个线程获取该项目。请注意,在调用#acquire时不会保持同步锁,因为这会阻止项目返回池。信号量封装了限制对池的访问所需的同步,与维护池本身一致性所需的任何同步分开。
一个初始化为1的信号量,其使用方式是它最多只有一个可用的许可证,可以用作互斥锁。这通常被称为二进制信号量,因为它只有两种状态:一个许可证可用,或者零个许可证可用。以这种方式使用时,二进制信号量具有这样的属性(与许多java.util.concurrent.locks.lock实现不同),即“锁”可以由所有者以外的线程释放(因为信号量没有所有权的概念)。这在某些特定环境中很有用,例如死锁恢复。
这个类的构造函数可以选择接受一个fairity参数。当设置为false时,此类不保证线程获取许可的顺序。特别是,允许bargging,也就是说,调用#acquire的线程可以在等待的线程之前分配一个许可证——从逻辑上讲,新线程将自己置于等待线程队列的最前面。当公平性设置为true时,信号量保证调用任何#acquire()方法的线程被选中,以按照调用这些方法的顺序(先进先出;FIFO)获得许可。请注意,FIFO排序必然适用于这些方法中的特定内部执行点。因此,一个线程可以在另一个线程之前调用acquire,但在另一个线程之后到达排序点,同样地,在从方法返回时也是如此。还要注意的是,untimed#tryAcquire()方法不支持公平性设置,但将接受任何可用的许可。
通常,用于控制资源访问的信号量应该被初始化为公平的,以确保没有线程因访问资源而耗尽。在将信号量用于其他类型的同步控制时,非公平排序的吞吐量优势往往超过公平性考虑。
该类还提供了方便的方法,可以一次#获取(int)和#发布(int)多个许可证。当使用这些方法时,如果没有将公平设置为真,那么要小心无限期推迟的风险增加。
内存一致性影响:在调用“release”方法(如release(){happen-before)之前的线程中的操作在另一个线程中成功的“acquire”方法(如acquire()之后的操作)。

代码示例

代码示例来源:origin: netty/netty

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
  if (unit == null) {
    throw new NullPointerException("unit");
  }
  if (inEventLoop()) {
    throw new IllegalStateException("cannot await termination of the current thread");
  }
  if (threadLock.tryAcquire(timeout, unit)) {
    threadLock.release();
  }
  return isTerminated();
}

代码示例来源:origin: google/guava

@Override
 public Semaphore get() {
  return new Semaphore(permits, false);
 }
});

代码示例来源:origin: redisson/redisson

public void addCall(final FSTRunnable toRun) throws InterruptedException {
  gateway.acquire();
  if (jobs[curIdx] == null) {
    jobs[curIdx] = toRun;
  } else {
    jobs[curIdx].sem.acquire();
    jobs[curIdx].sem.release();
    jobs[curIdx] = toRun;
  }
  toRun.sem = sems[curIdx];
  toRun.sem.acquire();
  OrderedRunnable ord = orderedRunnableCache[curIdx];
  ord.toRun = toRun;
  curIdx = (curIdx + 1) % threads;
  orderedPool.execute(ord);
  pool.execute(toRun);
}

代码示例来源:origin: redisson/redisson

public void waitForFinish() throws InterruptedException {
  final Semaphore sem = new Semaphore(0);
  orderedPool.execute(new Runnable() {
    @Override
    public void run() {
      sem.release();
    }
  });
  sem.acquire();
}

代码示例来源:origin: pentaho/pentaho-kettle

@Override public synchronized void resume() {
 if ( paused.getAndSet( false ) ) {
  assert acceptingRowsSemaphore.availablePermits() == 0;
  acceptingRowsSemaphore.release();
 }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onNext(Notification<T> args) {
  boolean wasNotAvailable = value.getAndSet(args) == null;
  if (wasNotAvailable) {
    notify.release();
  }
}

代码示例来源:origin: Alluxio/alluxio

@Override
public void lockInterruptibly() throws InterruptedException {
 mAvailable.acquire(mPermits);
}

代码示例来源:origin: Alluxio/alluxio

@Override
public boolean tryLock() {
 return mAvailable.tryAcquire(mPermits);
}

代码示例来源:origin: neo4j/neo4j

@AfterEach
private void refillSemaphore()
{
  // This ensures that no threads end up stuck
  semaphore.drainPermits();
  semaphore.release( Integer.MAX_VALUE );
}

代码示例来源:origin: pentaho/pentaho-kettle

@Test
@SuppressWarnings ( "unchecked" )
public void errorLoggedIfInterruptedInPause() throws InterruptedException {
 streamSource.acceptingRowsSemaphore = semaphore;
 when( semaphore.availablePermits() ).thenReturn( 1 );
 streamSource.logChannel = logChannel;
 doThrow( new InterruptedException( "interrupt" ) )
  .when( semaphore ).acquire();
 streamSource.pause();
 verify( logChannel ).logError( any() );
}

代码示例来源:origin: resilience4j/resilience4j

/**
 * {@inheritDoc}
 */
@Override
public int getAvailablePermissions() {
  return semaphore.availablePermits();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public void onNext(Notification<T> args) {
  boolean wasNotAvailable = value.getAndSet(args) == null;
  if (wasNotAvailable) {
    notify.release();
  }
}

代码示例来源:origin: robolectric/robolectric

public void lock() {
 try {
  mLock.acquire();
 } catch (InterruptedException e) {
  throw new RuntimeException(e);
 }
}

代码示例来源:origin: apache/storm

public void feed(Object tuples) {
  Semaphore sem = new Semaphore(0);
  ((List) RegisteredGlobalState.getState(_semaphoreId)).add(sem);
  ((List) RegisteredGlobalState.getState(_id)).add(tuples);
  try {
    sem.acquire();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
}

代码示例来源:origin: Alluxio/alluxio

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
 return mAvailable.tryAcquire(mPermits, time, unit);
}

代码示例来源:origin: resilience4j/resilience4j

void refreshLimit() {
  int permissionsToRelease = this.rateLimiterConfig.get().getLimitForPeriod() - semaphore.availablePermits();
  semaphore.release(permissionsToRelease);
}

代码示例来源:origin: resilience4j/resilience4j

@Override
  public int getAvailableConcurrentCalls() {
    return semaphore.availablePermits();
  }
}

代码示例来源:origin: redisson/redisson

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
  if (unit == null) {
    throw new NullPointerException("unit");
  }
  if (inEventLoop()) {
    throw new IllegalStateException("cannot await termination of the current thread");
  }
  if (threadLock.tryAcquire(timeout, unit)) {
    threadLock.release();
  }
  return isTerminated();
}

代码示例来源:origin: redisson/redisson

@Override
  public void run() {
    sem.release();
  }
});

代码示例来源:origin: prestodb/presto

@Override
 public Semaphore get() {
  return new Semaphore(permits, false);
 }
});

相关文章