public class Blocking {
private static final ConcurrentHashMap<String, Semaphore> internalMap = new ConcurrentHashMap<>();
public static <O> O byKey(String keys, Supplier<O> action) {
var semaphores = new Semaphore[1];
try {
semaphores[0] = internalMap.computeIfAbsent(keys, k -> new Semaphore(1));
semaphores[0].acquire();
return action.get();
} finally {
internalMap.remove(keys);
semaphores[0].release();
}
}
}
public class LongThreadSafe implements Runnable{
ConcurrentHashMap<Long,Long> map;
public LongThreadSafe(ConcurrentHashMap map) {
this.map = map;
}
@Override
public void run() {
List<Long> list = Arrays.asList(2L, 3L, 4L, 5L, 23L, 43L);
for (Long requestId:list) {
//we don't have any problem if multiple threads are updating value
Long previousValue = map.putIfAbsent(requestId, requestId);
if (previousValue == null){
//run your task
//update database record using (requestId)
map.remove(requestId);
}else {
System.out.println("Current requestId: "+requestId+" is being processed by another thread");
}
}
}
}
class client{
public static void main(String[] args) {
ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<>();
Thread t1 = new Thread(new LongThreadSafe(map));
Thread t2 = new Thread(new LongThreadSafe(map));
t1.start();
t2.start();
}
}
// e.g. part of a constructor
locks = new Object[1024];
for(int k=0;k<locks.length;k++){
locks[k]=new Object();
}
// part of the processing function
int hash = requestId.hashCode();
int lockIndex = hashToIndex(hash, locks.length);
synchronized(locks[lockIndex)){
... do your stuff
}
public static int hashToIndex(int hash, int length) {
if(length <= 0) {
throw new IllegalArgumentException();
}
if (hash == Integer.MIN_VALUE) {
return 0;
}
return abs(hash) % length;
}
4条答案
按热度按时间sbdsn5lh1#
注意:我没有试过显示的代码。关闭责任不在RequestProcessor中。你可以在这个类中添加一个shutdown方法,这个方法可以委托给被 Package 的执行器。
字符串
3qpi33ja2#
编辑:经过一些讨论,这是不安全的使用!:)
我已经写了一些类似的东西,但它的明确未经测试的生产。我做了一些测试,但很难测试这样的东西。
这个想法是有一个内部静态并发散列表,它为每个键存储“信号量”。每个线程将尝试在这个Map中寻找信号量的外观,如果它不存在,则创建它。
字符串
用途:
型
mcdcgff03#
您需要执行2个操作
上述2个操作需要是原子的,可以通过使用锁(使用同步的隐式)或外部锁来实现。无论哪种方式,都将导致争用,因为每个线程都需要在执行任何操作之前获取锁
这里使用ConcurrentHashMap很有用。因为putIfAbsent是原子的,并且它在内部使用桶级锁,这可以减少每个requestId的争用。您可以在下面引用其中一个实现的代码片段
字符串
fnx2tebb4#
一个简单的方法来控制线程之间对某个对象的访问,并拥有无限的此类对象的供应,就是使用条带锁。
带区锁是一个锁数组(或者只是对象)。
因此,根据对象的散列,您可以确定这个锁数组中的索引,然后获取它。
举例说明该方法:
字符串
条带锁的最大优点是您不需要处理锁的销毁,因为它们可以在程序运行期间保留。所以你不会得到任何垃圾,你有很好的和简单的代码。
这是一个非常基本的解决方案;如果锁被保持很长一段时间,那么这可能导致争用(甚至是错误争用,因为不同的requestIdMap到相同的锁)。
在这种情况下,重组该计划可能是一个更好的解决方案。例如,你可以有一个单线程执行器的数组,并根据上面的hashToIndex函数对requestId的哈希值将requestId抛入执行器。这样就保证了相同的requestId由相同的线程处理,因此不需要锁定。