本文整理了Java中reactor.core.publisher.Operators.onNextError()
方法的一些代码示例,展示了Operators.onNextError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onNextError()
方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onNextError
[英]Find the OnNextFailureStrategy to apply to the calling operator (which could be a local error mode defined in the Context) and apply it. For poll(), prefer #onNextPollError(Object,Throwable,Context) as it returns a RuntimeException.
Cancels the Subscription and return a Throwable if errors are fatal for the error mode, in which case the operator should call onError with the returned error. On the contrary, if the error mode allows the sequence to continue, does not cancel the Subscription and returns null.
Typical usage pattern differs depending on the calling method:
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
return;
}
actual.onNext(v);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
return;
}
actual.onNext(v);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
return;
}
boolean b;
try {
b = predicate.test(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
Operators.onDiscard(t, this.ctx);
return;
}
if (b) {
actual.onNext(t);
}
else {
Operators.onDiscard(t, this.ctx);
s.request(1);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
return;
}
boolean b;
try {
b = predicate.test(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
Operators.onDiscard(t, this.ctx);
return;
}
if (b) {
actual.onNext(t);
}
else {
s.request(1);
Operators.onDiscard(t, this.ctx);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return true;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
return actual.tryOnNext(v);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
return true;
}
else {
return false;
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return true;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
return actual.tryOnNext(v);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
done = true;
actual.onError(e_);
return true;
}
else {
return false;
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
return false;
}
boolean b;
try {
b = predicate.test(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
if (e_ != null) {
onError(e_);
}
Operators.onDiscard(t, this.ctx);
return false;
}
if (b) {
return actual.tryOnNext(t);
}
else {
Operators.onDiscard(t, this.ctx);
return false;
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
return false;
}
boolean b;
try {
b = predicate.test(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
if (e_ != null) {
onError(e_);
}
Operators.onDiscard(t, this.ctx);
return false;
}
if (b) {
actual.onNext(t);
return true;
}
Operators.onDiscard(t, this.ctx);
return false;
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
return false;
}
boolean b;
try {
b = predicate.test(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
if (e_ != null) {
onError(e_);
}
Operators.onDiscard(t, this.ctx);
return false;
}
if (b) {
actual.onNext(t);
}
else {
Operators.onDiscard(t, this.ctx);
}
return b;
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
return false;
}
boolean b;
try {
b = predicate.test(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
if (e_ != null) {
onError(e_);
}
Operators.onDiscard(t, this.ctx);
return false;
}
if (b) {
return actual.tryOnNext(t);
}
else {
Operators.onDiscard(t, this.ctx);
return false;
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return false;
}
final Consumer<? super T> nextHook = parent.onNextCall();
if (nextHook != null) {
try {
nextHook.accept(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ == null) {
return false;
}
else {
onError(e_);
return true;
}
}
}
return actual.tryOnNext(t);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (sourceMode == ASYNC) {
actual.onNext(null);
}
else {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
return;
}
actual.onNext(v);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (sourceMode == ASYNC) {
actual.onNext(null);
}
else {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
return;
}
actual.onNext(v);
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return false;
}
final Consumer<? super T> nextHook = parent.onNextCall();
if (nextHook != null) {
try {
nextHook.accept(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ == null) {
return false;
}
else {
onError(e_);
return true;
}
}
}
return actual.tryOnNext(t);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
final Consumer<? super T> nextHook = parent.onNextCall();
if(nextHook != null) {
try {
nextHook.accept(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ == null) {
request(1);
return;
}
else {
onError(e_);
return;
}
}
}
actual.onNext(t);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
final Consumer<? super T> nextHook = parent.onNextCall();
if (nextHook != null) {
try {
nextHook.accept(t);
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ == null) {
request(1);
return;
}
else {
onError(e_);
return;
}
}
}
actual.onNext(t);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onNextErrorModeLocalStrategy() {
List<Object> nextDropped = new ArrayList<>();
List<Object> errorDropped = new ArrayList<>();
Hooks.onNextDropped(nextDropped::add);
Hooks.onErrorDropped(errorDropped::add);
Hooks.onNextError(OnNextFailureStrategy.STOP);
Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
Exception error = new IllegalStateException("boom");
DeferredSubscription s = new Operators.DeferredSubscription();
try {
assertThat(s.isCancelled()).as("s initially cancelled").isFalse();
Throwable e = Operators.onNextError("foo", error, c, s);
assertThat(e).isNull();
assertThat(nextDropped).containsExactly("foo");
assertThat(errorDropped).containsExactly(error);
assertThat(s.isCancelled()).as("s cancelled").isFalse();
}
finally {
Hooks.resetOnNextDropped();
Hooks.resetOnErrorDropped();
Hooks.resetOnNextError();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onNextFailureWithStrategyMatchingDoesntCancel() {
Context context = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, new OnNextFailureStrategy() {
@Override
public boolean test(Throwable error, @Nullable Object value) {
return true;
}
@Nullable
@Override
public Throwable process(Throwable error, @Nullable Object value,
Context context) {
return null;
}
});
Operators.DeferredSubscription s = new Operators.DeferredSubscription();
Throwable t = Operators.onNextError("foo", new NullPointerException("bar"), context, s);
assertThat(t).as("exception processed").isNull();
assertThat(s.isCancelled()).as("subscription cancelled").isFalse();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onNextFailureWithStrategyMatchingButNotNullDoesCancel() {
Context context = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, new OnNextFailureStrategy() {
@Override
public boolean test(Throwable error, @Nullable Object value) {
return true;
}
@Override
public Throwable process(Throwable error, @Nullable Object value,
Context context) {
return error;
}
});
Operators.DeferredSubscription s = new Operators.DeferredSubscription();
Throwable t = Operators.onNextError("foo", new NullPointerException("bar"), context, s);
assertThat(t).as("exception processed")
.isNotNull()
.isInstanceOf(NullPointerException.class)
.hasNoSuppressedExceptions()
.hasNoCause();
assertThat(s.isCancelled()).as("subscription cancelled").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onNextFailureWithStrategyNotMatchingDoesCancel() {
Context context = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, new OnNextFailureStrategy() {
@Override
public boolean test(Throwable error, @Nullable Object value) {
return false;
}
@Override
public Throwable process(Throwable error, @Nullable Object value,
Context context) {
return error;
}
});
Operators.DeferredSubscription s = new Operators.DeferredSubscription();
Throwable t = Operators.onNextError("foo", new NullPointerException("bar"), context, s);
assertThat(t).as("exception processed")
.isNotNull()
.isInstanceOf(NullPointerException.class)
.hasNoSuppressedExceptions()
.hasNoCause();
assertThat(s.isCancelled()).as("subscription cancelled").isTrue();
}
内容来源于网络,如有侵权,请联系作者删除!