java 带电抗器的无阻塞重入式船闸

lyr7nygr  于 2023-01-16  发布在  Java
关注(0)|答案(3)|浏览(177)

我需要限制同时处理同一资源的客户端数量
所以我尝试将模拟信号

lock.lock();
try {
     do work
} finally {
    lock.unlock();
}

但是以非阻塞的方式和Reactor库连接。我得到了这样的东西。
但我有个问题:
有没有更好的办法
或者可能有人了解已实施的解决方案
或者也许这不是在被动的世界里应该做的,有另一种方法来解决这样的问题?

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class NonblockingLock {
    private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);

    private String currentOwner;
    private final AtomicInteger lockCounter = new AtomicInteger();
    private final FluxSink<Boolean> notifierSink;
    private final Flux<Boolean> notifier;
    private final String resourceId;

    public NonblockingLock(String resourceId) {
        this.resourceId = resourceId;
        EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
        notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
        notifier = processor.startWith(true);
    }

    /**
     * Nonblocking version of
     * <pre><code>
     *     lock.lock();
     *     try {
     *         do work
     *     } finally {
     *         lock.unlock();
     *     }
     * </code></pre>
     * */
    public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
        Objects.requireNonNull(owner, "owner");
        return notifier.filter(it -> tryAcquire(owner))
                .next()
                .transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
                .doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
                .doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
                .flatMapMany(it -> work)
                .doFinally(s -> {
                    if (tryRelease(owner)) {
                        LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);
                        notifierSink.next(true);
                    }
                });
    }

    private boolean tryAcquire(String owner) {
        boolean acquired;
        synchronized (this) {
            if (currentOwner == null) {
                currentOwner = owner;
            }
            acquired = currentOwner.equals(owner);
            if (acquired) {
                lockCounter.incrementAndGet();
            }
        }
        return acquired;
    }

    private boolean tryRelease(String owner) {
        boolean released = false;
        synchronized (this) {
            if (currentOwner.equals(owner)) {
                int count = lockCounter.decrementAndGet();
                if (count == 0) {
                    currentOwner = null;
                    released = true;
                }
            }
        }
        return released;
    }
}

我想应该是这样的

@Test
public void processWithLock() throws Exception {
    NonblockingLock lock = new NonblockingLock("work");
    String client1 = "client1";
    String client2 = "client2";
    Flux<String> requests = getWork(client1, lock)
            //emulate async request for resource by another client
            .mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
            //emulate async request for resource by the same client
            .mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
    StepVerifier.create(requests)
            .expectSubscription()
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client2)
            .expectNext(client2)
            .expectNext(client2)
            .expectComplete()
            .verify(Duration.ofMillis(5000));
}
private static Flux<String> getWork(String client, NonblockingLock lock) {
    return lock.processWithLock(client, null,
            Flux.interval(Duration.ofMillis(300))
                    .take(3)
                    .map(i -> client)
                    .log(client)
    );
}
bfnvny8b

bfnvny8b1#

既然Reactor已经引入了Sinks,那么实现这样的锁就比较容易了,我已经编写了a library,用它可以编写如下代码:

import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;

Flux<String> getWork(String client, Duration delay, Lock lock) {
    return Mono.delay(delay)
              .flatMapMany(l -> lock.withLock(() ->
                  Flux.interval(Duration.ofMillis(300))
                      .take(3)
                      .map(i -> client)
                      .log(client)));
}

它在内部使用Sinks.Empty队列来跟踪锁请求,每次解锁时,它只从队列中轮询并向Mono发出ON_COMPLETE信号,这可能比使用Sinks.many().multicast()向所有请求者广播稍好一些,它利用了Sinks.Empty不能多次发出的特性。因此取消锁定(对于想要设置超时或处理复杂情况的人)将阻止ON_COMPLETE的发射,反之亦然。
通过将Flux.using Package 在锁上,可以确保锁在所有类似try-finally的情况下都能正确解锁。
如果你感兴趣的话,这里是实现的一部分。最初的答案是synchronized,可能在竞争条件下阻塞,下面的代码用CAS操作重写,以便锁是非阻塞的。(在库中,现在所有的锁都用CAS操作实现。)
x一个一个一个一个x一个一个二个x
此外,如果您希望将客户机数量限制为N而不是一个,则库提供了ReactiveSemaphore,它对应于java.util.concurrent.Semaphore

6ojccjat

6ojccjat2#

我有一个解决方案,可以用相同的参数来独占调用远程服务。也许对你的情况会有帮助。
它基于即时tryLock(如果资源忙碌,则出错)和Mono.retryWhen(“等待”释放)。
所以我用LockData类来存储锁的元数据

public final class LockData {
    // Lock key to identify same operation (same cache key, for example).
    private final String key;
    // Unique identifier for equals and hashCode.
    private final String uuid;
    // Date and time of the acquiring for lock duration limiting.
    private final OffsetDateTime acquiredDateTime;
    ...
}

LockCommand接口是LockData上阻塞操作的抽象

public interface LockCommand {

    Tuple2<Boolean, LockData> tryLock(LockData lockData);

    void unlock(LockData lockData);
    ...
}

UnlockEventsRegistry接口是解锁事件侦听器收集器的抽象。

public interface UnlockEventsRegistry {
    // initialize event listeners collection when acquire lock
    Mono<Void> add(LockData lockData);

    // notify event listeners and remove collection when release lock
    Mono<Void> remove(LockData lockData);

    // register event listener for given lockData
    Mono<Boolean> register(LockData lockData, Consumer<Integer> unlockEventListener);
}

Lock类可以使用锁 Package 源Mono,使用解锁 Package CacheMono编写器。

public final class Lock {
    private final LockCommand lockCommand;
    private final LockData lockData;
    private final UnlockEventsRegistry unlockEventsRegistry;
    private final EmitterProcessor<Integer> unlockEvents;
    private final FluxSink<Integer> unlockEventSink;

    public Lock(LockCommand lockCommand, String key, UnlockEventsRegistry unlockEventsRegistry) {
        this.lockCommand = lockCommand;
        this.lockData = LockData.builder()
                .key(key)
                .uuid(UUID.randomUUID().toString())
                .build();
        this.unlockEventsRegistry = unlockEventsRegistry;
        this.unlockEvents = EmitterProcessor.create(false);
        this.unlockEventSink = unlockEvents.sink();
    }

    ...

    public final <T> Mono<T> tryLock(Mono<T> source, Scheduler scheduler) {
        return Mono.fromCallable(() -> lockCommand.tryLock(lockData))
                .subscribeOn(scheduler)
                .flatMap(isLocked -> {
                    if (isLocked.getT1()) {
                        return unlockEventsRegistry.add(lockData)
                                .then(source
                                        .switchIfEmpty(unlock().then(Mono.empty()))
                                        .onErrorResume(throwable -> unlock().then(Mono.error(throwable))));
                    } else {
                        return Mono.error(new LockIsNotAvailableException(isLocked.getT2()));
                    }
                });
    }

    public Mono<Void> unlock(Scheduler scheduler) {
        return Mono.<Void>fromRunnable(() -> lockCommand.unlock(lockData))
                .then(unlockEventsRegistry.remove(lockData))
                .subscribeOn(scheduler);
    }

    public <KEY, VALUE> BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> unlockAfterCacheWriter(
            BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> cacheWriter) {
        Objects.requireNonNull(cacheWriter);
        return cacheWriter.andThen(voidMono -> voidMono.then(unlock())
                .onErrorResume(throwable -> unlock()));
    }

    public final <T> UnaryOperator<Mono<T>> retryTransformer() {
        return mono -> mono
                .doOnError(LockIsNotAvailableException.class,
                        error -> unlockEventsRegistry.register(error.getLockData(), unlockEventSink::next)
                                .doOnNext(registered -> {
                                    if (!registered) unlockEventSink.next(0);
                                })
                                .then(Mono.just(2).map(unlockEventSink::next)
                                        .delaySubscription(lockCommand.getMaxLockDuration()))
                                .subscribe())
                .doOnError(throwable -> !(throwable instanceof LockIsNotAvailableException),
                        ignored -> unlockEventSink.next(0))
                .retryWhen(errorFlux -> errorFlux.zipWith(unlockEvents, (error, integer) -> {
                    if (error instanceof LockIsNotAvailableException) return integer;
                    else throw Exceptions.propagate(error);
                }));
    }
}

现在如果我必须用CacheMono Package 我的Mono并锁定,我可以这样做:

private Mono<String> getCachedLockedMono(String cacheKey, Mono<String> source, LockCommand lockCommand, UnlockEventsRegistry unlockEventsRegistry) {
    Lock lock = new Lock(lockCommand, cacheKey, unlockEventsRegistry);

    return CacheMono.lookup(CACHE_READER, cacheKey)
            // Lock and double check
            .onCacheMissResume(() -> lock.tryLock(Mono.fromCallable(CACHE::get).switchIfEmpty(source)))
            .andWriteWith(lock.unlockAfterCacheWriter(CACHE_WRITER))
            // Retry if lock is not available
            .transform(lock.retryTransformer());
}

您可以在GitHub上找到带有示例的代码和测试

bq9c1y66

bq9c1y663#

我知道这已经有了一些合理的答案,但是我认为有一个(主观上)更简单的解决方案,它利用flatMap(在类似信号量的用例中)或concatMap(在lock/synchronized用例中)来控制并行化。我也不喜欢所提供的解决方案使用并发集合或原子引用,它们在内部使用锁。
此解决方案仅使用Sink和Reactor操作符来支持锁。未订阅的发布者也不会使用锁。

public class ReactiveSemaphore {

    /**
     * This can be thought of as a queue of lock handles. The first argument of the tuple is a signaler that accepts a value
     * value when a lock is available. The second argument is a Mono that completes when the lock is released.
     */
    private final Sinks.Many<Tuple2<Sinks.One<Boolean>, Mono<Boolean>>> taskQueue;
    private final Sinks.One<Boolean> close = Sinks.one();

    /**
     * Creates a ReactiveSemaphore that only allows one Publisher to be subscribed at a time. Executed by order
     * of subscription.
     */
    public ReactiveSemaphore() {
        this(1);
    }

    /**
     * Creates a ReactiveSemaphore that allows up to poolSize Publishers to be subscribed in parallel.
     * @param poolSize The number of allowed subscriptions to run in parallel.
     */
    public ReactiveSemaphore(int poolSize) {
        taskQueue = Sinks.many().unicast().onBackpressureBuffer();

        Flux<Boolean> tasks;
        if (poolSize <= 1)
            // We could use flatMap with parallelism of 1, but that seems weird
            tasks = taskQueue
                    .asFlux()
                    .concatMap(ReactiveSemaphore::dispatchTask);
        else {
            tasks = taskQueue
                    .asFlux()
                    .flatMap(ReactiveSemaphore::dispatchTask, poolSize);
        }

        tasks
                .takeUntilOther(close.asMono())
                .subscribe();
    }

    private static Mono<Boolean> dispatchTask(Tuple2<Sinks.One<Boolean>, Mono<Boolean>> task) {
        task.getT1().tryEmitValue(true); // signal that lock is available and consume lock
        return task.getT2(); // return Mono that completes when lock is released
    }

    @PreDestroy
    private void cleanup() {
        close.tryEmitValue(true);
    }

    public <T> Publisher<T> lock(Publisher<T> publisher) {
        return Flux.defer(() -> this.waitForNext(publisher));
    }

    public <T> Mono<T> lock(Mono<T> publisher) {
        return Mono.defer(() -> this.waitForNext(publisher).next());
    }

    public <T> Flux<T> lock(Flux<T> publisher) {
        return Flux.defer(() -> this.waitForNext(publisher));
    }

    /**
     * Waits for an available lock in the taskQueue. When ReactiveSemaphore is ready, a lock will be allocated for the task
     * and will not be released until the provided task errors or completes. For this reason this operation should
     * only be performed on a hot publisher (a publisher that has been subscribed to). Therefore, this method should
     * always be wrapped inside a call to {@link Flux#defer(Supplier)} or {@link Mono#defer(Supplier)}.
     * @param task The task to execute once the ReactiveSemaphore has an available lock.
     * @return The task wrapped in a Flux
     * @param <T> The type of value returned by the task
     */
    private <T> Flux<T> waitForNext(Publisher<T> task) {
        var ready = Sinks.<Boolean>one();
        var release = Sinks.<Boolean>one();
        taskQueue.tryEmitNext(Tuples.of(ready, release.asMono()));
        return ready.asMono()
                .flatMapMany(ignored -> Flux.from(task))
                .doOnComplete(() -> release.tryEmitValue(true))
                .doOnError(err -> release.tryEmitValue(true));
    }
}

用法:

ReactiveSemaphore semaphore = new ReactiveSemaphore();
semaphore.lock(someFluxMonoOrPublisher);

示例测试-在本测试中,我们创建10个Mono,它们在1秒后发出一个值,并尝试并行运行所有Mono,但我们将它们 Package 在池大小为2的ReactiveSemaphore中,以便并行运行的Mono不超过2个:

@Test
public void testParallelExecution() {
    var semaphore = new ReactiveSemaphore(2);
    var monos = IntStream.range(0, 10)
            .mapToObj(i -> Mono.fromSupplier(() -> {
                        log.info("Executing Mono {}", i);
                        return i;
                    })
                    .delayElement(Duration.ofMillis(1000)))
            .map(mono -> semaphore.lock(mono));

    var allMonos = Flux.fromStream(monos).flatMap(m -> m).doOnNext(v -> log.info("Got value {}", v));

    StepVerifier.create(allMonos)
            .expectNextCount(10)
            .verifyComplete();
}

/* OUTPUT:
12:52:40.752 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 0
12:52:40.755 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 1
12:52:41.762 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Got value 0
12:52:41.765 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Executing Mono 2
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Got value 1
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Executing Mono 3
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 2
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 4
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 3
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 5
12:52:43.790 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 6
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 4
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 5
12:52:43.791 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 7
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 6
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 7
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 8
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 8

相关问题