本文整理了Java中io.github.resilience4j.bulkhead.Bulkhead
类的一些代码示例,展示了Bulkhead
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Bulkhead
类的具体详情如下:
包路径:io.github.resilience4j.bulkhead.Bulkhead
类名称:Bulkhead
[英]A Bulkhead instance is thread-safe can be used to decorate multiple requests. A Bulkhead represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of threads/actors involved in a particular flow, etc). In order to execute an operation protected by this bulkhead, a permission must be obtained by calling Bulkhead#isCallPermitted()If the bulkhead is full, no additional operations will be permitted to execute until space is available. Once the operation is complete, regardless of the result, client needs to call Bulkhead#onComplete() in order to maintain integrity of internal bulkhead state.
[中]一个线程安全的隔板实例可用于修饰多个请求。隔板表示限制并行操作数量的实体。它不假设也不强制使用任何特定的并发和/或io模型。这些详细信息留给客户机管理。根据底层并发/io模型,此隔板可用于卸载,并在合理的情况下限制资源使用(即限制特定流中涉及的线程/参与者数量等)。为了执行受此隔板保护的操作,必须通过调用隔板#iscallapproved()获得许可。如果隔板已满,则在空间可用之前,不允许执行其他操作。一旦操作完成,不管结果如何,客户端都需要调用隔板#onComplete(),以保持内部隔板状态的完整性。
代码示例来源:origin: resilience4j/resilience4j
private Object handleOther(MethodInvocation invocation, io.github.resilience4j.bulkhead.Bulkhead bulkhead, RecoveryFunction<?> recoveryFunction) throws Throwable {
boolean permission = bulkhead.isCallPermitted();
if (!permission) {
Throwable t = new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
return recoveryFunction.apply(t);
}
try {
if (Thread.interrupted()) {
throw new IllegalStateException("Thread was interrupted during permission wait");
}
return invocation.proceed();
} catch (Exception e) {
return recoveryFunction.apply(e);
} finally {
bulkhead.onComplete();
}
}
}
代码示例来源:origin: resilience4j/resilience4j
private BulkheadMetrics(String prefix, Iterable<Bulkhead> bulkheads) {
requireNonNull(prefix);
requireNonNull(bulkheads);
bulkheads.forEach(bulkhead -> {
String name = bulkhead.getName();
//number of available concurrent calls as an integer
metricRegistry.register(name(prefix, name, AVAILABLE_CONCURRENT_CALLS),
(Gauge<Integer>) () -> bulkhead.getMetrics().getAvailableConcurrentCalls());
}
);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void testCreateWithDefaults() {
// when
Bulkhead bulkhead = Bulkhead.ofDefaults("test");
// then
assertThat(bulkhead).isNotNull();
assertThat(bulkhead.getBulkheadConfig()).isNotNull();
}
代码示例来源:origin: resilience4j/resilience4j
public static void isCallPermitted(Bulkhead bulkhead) {
if(!bulkhead.isCallPermitted()) {
throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
}
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() throws Exception {
// Given
Disposable disposable = mock(Disposable.class);
MaybeObserver childObserver = mock(MaybeObserver.class);
MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver);
bulkhead.isCallPermitted();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
decoratedObserver.onSubscribe(disposable);
// When
((Disposable) decoratedObserver).dispose();
// Then
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void testBulkhead() throws InterruptedException {
bulkhead.isCallPermitted();
bulkhead.isCallPermitted();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
bulkhead.isCallPermitted();
bulkhead.onComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
bulkhead.onComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2);
bulkhead.isCallPermitted();
testSubscriber.assertValueCount(6)
.assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_FINISHED, CALL_FINISHED, CALL_PERMITTED);
}
代码示例来源:origin: resilience4j/resilience4j
private void releaseBulkhead() {
if (permitted.compareAndSet(Permit.ACQUIRED, Permit.RELEASED)) {
bulkhead.onComplete();
}
}
}
代码示例来源:origin: resilience4j/resilience4j
@Override
protected Throwable getThrowable() {
return new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldReturnFailureWithRuntimeException() {
// Given
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
Bulkhead bulkhead = Bulkhead.of("test", config);
bulkhead.isCallPermitted();
//v When
CheckedRunnable checkedRunnable = Bulkhead.decorateCheckedRunnable(bulkhead, () -> {throw new RuntimeException("BAM!");});
Try result = Try.run(checkedRunnable);
//Then
assertThat(result.isFailure()).isTrue();
assertThat(result.failed().get()).isInstanceOf(RuntimeException.class);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldReturnTheCorrectName() {
Bulkhead bulkhead = registry.bulkhead("test");
assertThat(bulkhead).isNotNull();
assertThat(bulkhead.getName()).isEqualTo("test");
assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(25);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(25);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldConsumeOnCallRejectedEvent() {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
// When
bulkhead.getEventPublisher()
.onCallRejected(event ->
logger.info(event.getEventType().toString()));
bulkhead.isCallPermitted();
Try.ofSupplier(Bulkhead.decorateSupplier(bulkhead,helloWorldService::returnHelloWorld));
// Then
then(logger).should(times(1)).info("CALL_REJECTED");
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldInvokeAsyncApply() throws ExecutionException, InterruptedException {
// tag::shouldInvokeAsyncApply[]
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
// When
Supplier<String> decoratedSupplier = Bulkhead.decorateSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");
CompletableFuture<String> future = CompletableFuture.supplyAsync(decoratedSupplier)
.thenApply(value -> value + " world'");
String result = future.get();
// Then
assertThat(result).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
// end::shouldInvokeAsyncApply[]
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldDecorateCompletionStageAndReturnWithExceptionAtAsyncStage() throws ExecutionException, InterruptedException {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
BDDMockito.given(helloWorldService.returnHelloWorld()).willThrow(new RuntimeException("BAM! At async stage"));
// When
Supplier<CompletionStage<String>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);
Supplier<CompletionStage<String>> decoratedCompletionStageSupplier =
Bulkhead.decorateCompletionStage(bulkhead, completionStageSupplier);
CompletionStage<String> decoratedCompletionStage = decoratedCompletionStageSupplier.get();
// Then the helloWorldService should be invoked 1 time
assertThatThrownBy(decoratedCompletionStage.toCompletableFuture()::get)
.isInstanceOf(ExecutionException.class).hasCause(new RuntimeException("BAM! At async stage"));
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
);
chain1.get("stream/events", ctx -> {
Seq<Flux<BulkheadEvent>> eventStreams = bulkheadRegistry.getAllBulkheads().map(bulkhead -> ReactorAdapter.toFlux(bulkhead.getEventPublisher()));
Function<BulkheadEvent, String> data = b -> Jackson.getObjectWriter(chain1.getRegistry()).writeValueAsString(BulkheadEventDTO.createEventDTO(b));
ServerSentEvents events = ServerSentEvents.serverSentEvents(Flux.merge(eventStreams), e -> e.id(BulkheadEvent::getBulkheadName).event(c -> c.getEventType().name()).data(data));
chain1.get("stream/events/:name", ctx -> {
String bulkheadName = ctx.getPathTokens().get("name");
Bulkhead bulkhead = bulkheadRegistry.getAllBulkheads().find(b -> b.getName().equals(bulkheadName))
.getOrElseThrow(() -> new IllegalArgumentException(String.format("bulkhead with name %s not found", bulkheadName)));
Function<BulkheadEvent, String> data = b -> Jackson.getObjectWriter(chain1.getRegistry()).writeValueAsString(BulkheadEventDTO.createEventDTO(b));
ServerSentEvents events = ServerSentEvents.serverSentEvents(ReactorAdapter.toFlux(bulkhead.getEventPublisher()), e -> e.id(BulkheadEvent::getBulkheadName).event(c -> c.getEventType().name()).data(data));
ctx.render(events);
});
String bulkheadName = ctx.getPathTokens().get("name");
String eventType = ctx.getPathTokens().get("type");
Bulkhead bulkhead = bulkheadRegistry.getAllBulkheads().find(b -> b.getName().equals(bulkheadName))
.getOrElseThrow(() -> new IllegalArgumentException(String.format("bulkhead with name %s not found", bulkheadName)));
Flux<BulkheadEvent> eventStream = ReactorAdapter.toFlux(bulkhead.getEventPublisher())
.filter(event -> event.getEventType() == BulkheadEvent.Type.valueOf(eventType.toUpperCase()));
Function<BulkheadEvent, String> data = b -> Jackson.getObjectWriter(chain1.getRegistry()).writeValueAsString(BulkheadEventDTO.createEventDTO(b));
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldEmitEvent() {
StepVerifier.create(
Mono.just("Event")
.transform(BulkheadOperator.of(bulkhead)))
.expectNext("Event")
.verifyComplete();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldDecorateRunnableAndReturnWithSuccess() throws Throwable {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
//When
Bulkhead.decorateRunnable(bulkhead, helloWorldService::sayHelloWorld)
.run();
//Then
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorld();
}
代码示例来源:origin: resilience4j/resilience4j
@Override
protected boolean isCallPermitted() {
return bulkhead.isCallPermitted();
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldConsumeOnCallFinishedEventOnComplete() throws Exception {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
// When
bulkhead.getEventPublisher()
.onCallFinished(event ->
logger.info(event.getEventType().toString()));
bulkhead.onComplete();
// Then
then(logger).should(times(1)).info("CALL_FINISHED");
}
}
代码示例来源:origin: resilience4j/resilience4j
@Test
public void shouldDecorateConsumerAndReturnWithSuccess() throws Throwable {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
// When
Bulkhead.decorateConsumer(bulkhead, helloWorldService::sayHelloWorldWithName)
.accept("Tom");
// Then
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorldWithName("Tom");
}
代码示例来源:origin: resilience4j/resilience4j
@Test(expected = NullPointerException.class)
public void testConstructorWithNullName() {
BulkheadExports.ofSupplier(null, () -> singleton(Bulkhead.ofDefaults("foo")));
}
内容来源于网络,如有侵权,请联系作者删除!