我需要限制同时处理同一资源的客户端数量
所以我尝试将模拟信号
lock.lock();
try {
do work
} finally {
lock.unlock();
}
但是以非阻塞的方式和Reactor库连接。我得到了这样的东西。
但我有个问题:
有没有更好的办法
或者可能有人了解已实施的解决方案
或者也许这不是在被动的世界里应该做的,有另一种方法来解决这样的问题?
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class NonblockingLock {
private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);
private String currentOwner;
private final AtomicInteger lockCounter = new AtomicInteger();
private final FluxSink<Boolean> notifierSink;
private final Flux<Boolean> notifier;
private final String resourceId;
public NonblockingLock(String resourceId) {
this.resourceId = resourceId;
EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
notifier = processor.startWith(true);
}
/**
* Nonblocking version of
* <pre><code>
* lock.lock();
* try {
* do work
* } finally {
* lock.unlock();
* }
* </code></pre>
* */
public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
Objects.requireNonNull(owner, "owner");
return notifier.filter(it -> tryAcquire(owner))
.next()
.transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
.doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
.doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
.flatMapMany(it -> work)
.doFinally(s -> {
if (tryRelease(owner)) {
LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);
notifierSink.next(true);
}
});
}
private boolean tryAcquire(String owner) {
boolean acquired;
synchronized (this) {
if (currentOwner == null) {
currentOwner = owner;
}
acquired = currentOwner.equals(owner);
if (acquired) {
lockCounter.incrementAndGet();
}
}
return acquired;
}
private boolean tryRelease(String owner) {
boolean released = false;
synchronized (this) {
if (currentOwner.equals(owner)) {
int count = lockCounter.decrementAndGet();
if (count == 0) {
currentOwner = null;
released = true;
}
}
}
return released;
}
}
我想应该是这样的
@Test
public void processWithLock() throws Exception {
NonblockingLock lock = new NonblockingLock("work");
String client1 = "client1";
String client2 = "client2";
Flux<String> requests = getWork(client1, lock)
//emulate async request for resource by another client
.mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
//emulate async request for resource by the same client
.mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
StepVerifier.create(requests)
.expectSubscription()
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client2)
.expectNext(client2)
.expectNext(client2)
.expectComplete()
.verify(Duration.ofMillis(5000));
}
private static Flux<String> getWork(String client, NonblockingLock lock) {
return lock.processWithLock(client, null,
Flux.interval(Duration.ofMillis(300))
.take(3)
.map(i -> client)
.log(client)
);
}
3条答案
按热度按时间bfnvny8b1#
既然Reactor已经引入了
Sinks
,那么实现这样的锁就比较容易了,我已经编写了a library,用它可以编写如下代码:它在内部使用
Sinks.Empty
队列来跟踪锁请求,每次解锁时,它只从队列中轮询并向Mono
发出ON_COMPLETE
信号,这可能比使用Sinks.many().multicast()
向所有请求者广播稍好一些,它利用了Sinks.Empty
不能多次发出的特性。因此取消锁定(对于想要设置超时或处理复杂情况的人)将阻止ON_COMPLETE
的发射,反之亦然。通过将
Flux.using
Package 在锁上,可以确保锁在所有类似try-finally
的情况下都能正确解锁。如果你感兴趣的话,这里是实现的一部分。最初的答案是
synchronized
,可能在竞争条件下阻塞,下面的代码用CAS操作重写,以便锁是非阻塞的。(在库中,现在所有的锁都用CAS操作实现。)x一个一个一个一个x一个一个二个x
此外,如果您希望将客户机数量限制为N而不是一个,则库提供了
ReactiveSemaphore
,它对应于java.util.concurrent.Semaphore
。6ojccjat2#
我有一个解决方案,可以用相同的参数来独占调用远程服务。也许对你的情况会有帮助。
它基于即时
tryLock
(如果资源忙碌,则出错)和Mono.retryWhen
(“等待”释放)。所以我用
LockData
类来存储锁的元数据LockCommand
接口是LockData上阻塞操作的抽象UnlockEventsRegistry
接口是解锁事件侦听器收集器的抽象。Lock
类可以使用锁 Package 源Mono,使用解锁 Package CacheMono编写器。现在如果我必须用CacheMono Package 我的Mono并锁定,我可以这样做:
您可以在GitHub上找到带有示例的代码和测试
bq9c1y663#
我知道这已经有了一些合理的答案,但是我认为有一个(主观上)更简单的解决方案,它利用
flatMap
(在类似信号量的用例中)或concatMap
(在lock
/synchronized
用例中)来控制并行化。我也不喜欢所提供的解决方案使用并发集合或原子引用,它们在内部使用锁。此解决方案仅使用Sink和Reactor操作符来支持锁。未订阅的发布者也不会使用锁。
用法:
示例测试-在本测试中,我们创建10个Mono,它们在1秒后发出一个值,并尝试并行运行所有Mono,但我们将它们 Package 在池大小为2的ReactiveSemaphore中,以便并行运行的Mono不超过2个: