本文整理了Java中reactor.core.publisher.Hooks
类的一些代码示例,展示了Hooks
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks
类的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
[英]A set of overridable lifecycle hooks that can be used for cross-cutting added behavior on Flux/ Mono operators.
[中]一组可重写的生命周期挂钩,可用于在Flux/Mono操作符上添加横切行为。
代码示例来源:origin: reactor/reactor-core
@Test
public void onLastOperatorReset() {
Hooks.onLastOperator("some", p -> p);
assertThat(Hooks.onLastOperatorHook).isNotNull();
assertThat(Hooks.getOnLastOperatorHooks()).hasSize(1);
Hooks.resetOnLastOperator();
assertThat(Hooks.onLastOperatorHook).isNull();
assertThat(Hooks.getOnLastOperatorHooks()).isEmpty();
}
代码示例来源:origin: reactor/reactor-core
private void plugHooks() {
Hooks.onErrorDropped(droppedErrors::offer);
Hooks.onNextDropped(droppedElements::offer);
Hooks.onOperatorError((t, d) -> {
operatorErrors.offer(Tuples.of(Optional.ofNullable(t), Optional.ofNullable(d)));
return t;
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void resumeDropErrorHookFails() {
AtomicReference<Object> value = new AtomicReference<>();
UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
Hooks.onNextDropped(value::set);
Hooks.onErrorDropped(v -> { throw failure; });
try {
OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
String data = "foo";
Throwable exception = new NullPointerException("foo");
Throwable t = strategy.process(exception, data, Context.empty());
assertThat(t)
.hasMessage("error hook")
.hasSuppressedException(exception);
assertThat(value.get()).isEqualTo("foo");
}
finally {
Hooks.resetOnErrorDropped();
Hooks.resetOnNextDropped();
}
}
代码示例来源:origin: reactor/reactor-core
public void unplugHooks() {
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
Hooks.resetOnOperatorError();
}
代码示例来源:origin: reactor/reactor-core
final void resetHooks() {
Hooks.resetOnErrorDropped();
Hooks.resetOnNextDropped();
Hooks.resetOnEachOperator();
Hooks.resetOnOperatorError();
Hooks.resetOnLastOperator();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void accumulatingHooks() throws Exception {
AtomicReference<String> ref = new AtomicReference<>();
Hooks.onNextDropped(d -> {
ref.set(d.toString());
});
Hooks.onNextDropped(d -> {
ref.set(ref.get()+"bar");
});
Hooks.onErrorDropped(d -> {
ref.set(d.getMessage());
});
Hooks.onErrorDropped(d -> {
ref.set(ref.get()+"bar");
});
Hooks.resetOnErrorDropped();
Hooks.onOperatorError((error, d) -> {
ref.set(d.toString());
return new Exception("bar");
});
Hooks.onOperatorError((error, d) -> {
ref.set(ref.get()+error.getMessage());
return error;
Hooks.resetOnOperatorError();
Hooks.onEachOperator(h -> {
代码示例来源:origin: reactor/reactor-core
@Test
public void onOperatorError() {
AtomicReference<Object> errorValue = new AtomicReference<Object>();
Hooks.onOperatorError((error, d) -> {
errorValue.set(d);
return error;
});
Flux<Integer> f1 = Mono.just(1).flatMapMany(i -> Flux.error(new Exception("test")));
StepVerifier.create(f1).verifyErrorMessage("test");
assertThat(errorValue.get()).isEqualTo(1);
Flux<Integer> f2 = Mono.just(2).flatMapMany(i -> {
throw new RuntimeException("test");
});
StepVerifier.create(f2).verifyErrorMessage("test");
assertThat(errorValue.get()).isEqualTo(2);
Flux<Integer> f3 = Flux.just(3, 6, 9).flatMap(i -> Flux.error(new Exception("test")));
StepVerifier.create(f3).verifyErrorMessage("test");
assertThat(errorValue.get()).isEqualTo(3);
Flux<Integer> f4 = Flux.just(4, 8, 12).flatMap(i -> {
throw new RuntimeException("test");
});
StepVerifier.create(f4).verifyErrorMessage("test");
assertThat(errorValue.get()).isEqualTo(4);
Hooks.resetOnOperatorError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeHookErrorDropped() {
Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("complete")));
try {
Mono.just("foo")
.subscribe(v -> {},
e -> {},
() -> { throw new IllegalStateException("complete");});
}
finally {
Hooks.resetOnErrorDropped();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testOnLastPublisher() throws Exception {
List<Publisher> l = new ArrayList<>();
Hooks.onLastOperator(p -> {
System.out.println(Scannable.from(p).parents().count());
System.out.println(Scannable.from(p).stepName());
l.add(p);
return p;
});
StepVerifier.create(Flux.just(1, 2, 3)
.map(m -> m)
.takeUntilOther(Mono.never())
.flatMap(d -> Mono.just(d).hide()))
.expectNext(1, 2, 3)
.verifyComplete();
Hooks.resetOnLastOperator();
assertThat(l).hasSize(5);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testTrace() throws Exception {
Hooks.onOperatorDebug();
try {
Mono.fromCallable(() -> {
throw new RuntimeException();
})
.map(d -> d)
.block();
}
catch(Exception e){
e.printStackTrace();
Assert.assertTrue(e.getSuppressed()[0].getMessage().contains("MonoCallable"));
return;
}
finally {
Hooks.resetOnOperatorDebug();
}
throw new IllegalStateException();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onOperatorErrorReset() {
Hooks.onOperatorError("some", (t, v) -> t);
assertThat(Hooks.onOperatorErrorHook).isNotNull();
assertThat(Hooks.getOnOperatorErrorHooks()).hasSize(1);
Hooks.resetOnOperatorError();
assertThat(Hooks.onOperatorErrorHook).isNull();
assertThat(Hooks.getOnOperatorErrorHooks()).isEmpty();
}
代码示例来源:origin: reactor/reactor-core
/**
* Add a {@link Publisher} operator interceptor for the last operator created
* in every flow ({@link Flux} or {@link Mono}). The passed function is applied
* to the original operator {@link Publisher} and can return a different {@link Publisher},
* on the condition that it generically maintains the same data type as the original.
* <p>
* Note that sub-hooks are cumulative, but invoking this method twice with the same
* instance (or any instance that has the same `toString`) will result in only a single
* instance being applied. See {@link #onLastOperator(String, Function)} for a variant
* that allows you to name the sub-hooks (and thus replace them or remove them individually
* later on). Can be fully reset via {@link #resetOnLastOperator()}.
* <p>
* This pointcut function cannot make use of {@link Flux}, {@link Mono} or
* {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
* operator calls would effectively invoke onEachOperator from onEachOperator.
*
* @param onLastOperator the sub-hook: a function to intercept last operation call
* (e.g. {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
*
* @see #onLastOperator(String, Function)
* @see #resetOnLastOperator(String)
* @see #resetOnLastOperator()
* @see #onEachOperator(Function)
*/
public static void onLastOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onLastOperator) {
onLastOperator(onLastOperator.toString(), onLastOperator);
}
代码示例来源:origin: reactor/reactor-core
@SuppressWarnings("unchecked")
void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
.findFirst()
.get();
assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
Hooks.onNextDropped(v -> {
});
s.onNext(0);
Hooks.resetOnNextDropped();
}
代码示例来源:origin: reactor/reactor-core
/**
* Add a custom error mapping, overriding the default one. Custom mapping can be an
* accumulation of several sub-hooks each subsequently added via this method.
* <p>
* Note that sub-hooks are cumulative, but invoking this method twice with the same
* instance (or any instance that has the same `toString`) will result in only a single
* instance being applied. See {@link #onOperatorError(String, BiFunction)} for a variant
* that allows you to name the sub-hooks (and thus replace them or remove them individually
* later on). Can be fully reset via {@link #resetOnOperatorError()}.
* <p>
* For reference, the default mapping is to unwrap the exception and, if the second
* parameter is another exception, to add it to the first as suppressed.
*
* @param onOperatorError an operator error {@link BiFunction} mapper, returning an arbitrary exception
* given the failure and optionally some original context (data or error).
*
* @see #onOperatorError(String, BiFunction)
* @see #resetOnOperatorError(String)
* @see #resetOnOperatorError()
*/
public static void onOperatorError(BiFunction<? super Throwable, Object, ? extends Throwable> onOperatorError) {
onOperatorError(onOperatorError.toString(), onOperatorError);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verboseExtension() {
Queue<String> q = new LinkedTransferQueue<>();
Hooks.onEachOperator(p -> {
q.offer(p.toString());
return p;
});
Hooks.onOperatorDebug();
Hooks.resetOnEachOperator();
Hooks.onEachOperator(p -> {
q.offer(p.toString());
return p;
Hooks.resetOnEachOperator();
代码示例来源:origin: reactor/reactor-core
@Before
public void populateDebug() {
if (testName.getMethodName().equals("debuggingCommonStacktrace")) {
toDebug = scatterAndGather(urls());
}
else if (testName.getMethodName().startsWith("debuggingActivated")) {
Hooks.onOperatorDebug();
toDebug = scatterAndGather(urls());
}
}
代码示例来源:origin: reactor/reactor-core
};
Hooks.onOperatorError("1", hook1);
Hooks.onOperatorError("2", hook2);
Hooks.onOperatorErrorHook.apply(new IllegalStateException("boom"), "foo");
assertThat(Hooks.getOnOperatorErrorHooks())
.containsOnlyKeys("1", "2");
assertThat(Hooks.getOnOperatorErrorHooks().values())
.containsExactly(hook1, hook2);
assertThat(applied).containsExactly("h1", "h2");
Hooks.onOperatorError("1", hook3);
Hooks.onOperatorErrorHook.apply(new IllegalStateException("boom2"), "bar");
assertThat(Hooks.getOnOperatorErrorHooks())
.containsOnlyKeys("1", "2");
assertThat(Hooks.getOnOperatorErrorHooks().values())
.containsExactly(hook3, hook2);
assertThat(applied).containsExactly("h3", "h2");
代码示例来源:origin: reactor/reactor-core
};
Hooks.onLastOperator("1", hook1);
Hooks.onLastOperator("2", hook2);
Hooks.onLastOperatorHook.apply(s -> {});
assertThat(Hooks.getOnLastOperatorHooks())
.containsOnlyKeys("1", "2");
assertThat(Hooks.getOnLastOperatorHooks().values())
.containsExactly(hook1, hook2);
assertThat(applied).containsExactly("h1", "h2");
Hooks.onLastOperator("1", hook3);
Hooks.onLastOperatorHook.apply(s -> {});
assertThat(Hooks.getOnLastOperatorHooks())
.containsOnlyKeys("1", "2");
assertThat(Hooks.getOnLastOperatorHooks().values())
.containsExactly(hook3, hook2);
assertThat(applied).containsExactly("h3", "h2");
代码示例来源:origin: reactor/reactor-core
@Test
public void parallelModeFused() {
Hooks.onOperatorDebug();
Hooks.onEachOperator(p -> {
System.out.println(Scannable.from(p).stepName());
return p;
代码示例来源:origin: reactor/reactor-core
@Test
public void onEachOperatorReset() {
Hooks.onEachOperator("some", p -> p);
assertThat(Hooks.onEachOperatorHook).isNotNull();
assertThat(Hooks.getOnEachOperatorHooks()).hasSize(1);
Hooks.resetOnEachOperator();
assertThat(Hooks.onEachOperatorHook).isNull();
assertThat(Hooks.getOnEachOperatorHooks()).isEmpty();
}
内容来源于网络,如有侵权,请联系作者删除!