reactor.core.publisher.Operators.onNextError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(124)

本文整理了Java中reactor.core.publisher.Operators.onNextError()方法的一些代码示例,展示了Operators.onNextError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onNextError()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onNextError

Operators.onNextError介绍

[英]Find the OnNextFailureStrategy to apply to the calling operator (which could be a local error mode defined in the Context) and apply it. For poll(), prefer #onNextPollError(Object,Throwable,Context) as it returns a RuntimeException.

Cancels the Subscription and return a Throwable if errors are fatal for the error mode, in which case the operator should call onError with the returned error. On the contrary, if the error mode allows the sequence to continue, does not cancel the Subscription and returns null.

Typical usage pattern differs depending on the calling method:

  • onNext: check for a throwable return value and call Subscriber#onError(Throwable) if not null, otherwise perform a direct Subscription#request(long) on the upstream.
  • tryOnNext: check for a throwable return value and call Subscriber#onError(Throwable) if not null, otherwise return false to indicate value was not consumed and more must be tried.
  • poll: use #onNextPollError(Object,Throwable,Context) instead.
    [中]找到要应用于呼叫操作员的OnExtFailureStrategy(可能是上下文中定义的本地错误模式)并应用它。对于poll(),首选#onNextPollError(对象、Throwable、上下文),因为它返回RuntimeException。
    如果错误对错误模式来说是致命的,则取消订阅并返回可丢弃的,在这种情况下,操作员应使用返回的错误调用OneError。相反,如果错误模式允许序列继续,则不会取消订阅并返回null。
    典型的使用模式因调用方法而异:
    *onNext:检查可丢弃的返回值,如果不为空,则呼叫订户#onError(可丢弃),否则在上游执行直接订阅#请求(long)。
    *tryOnNext:检查可丢弃的返回值,如果不为空,则呼叫订户#OneError(可丢弃),否则返回false以指示未使用该值,必须尝试更多。
    *轮询:改用#onNextPollError(对象、可丢弃、上下文)。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return;
  }
  R v;
  try {
    v = Objects.requireNonNull(mapper.apply(t),
        "The mapper returned a null value.");
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
    if (e_ != null) {
      onError(e_);
    }
    else {
      s.request(1);
    }
    return;
  }
  actual.onNext(v);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return;
  }
  R v;
  try {
    v = Objects.requireNonNull(mapper.apply(t),
        "The mapper returned a null value.");
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
    if (e_ != null) {
      onError(e_);
    }
    else {
      s.request(1);
    }
    return;
  }
  actual.onNext(v);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t,  this.ctx);
    return;
  }
  boolean b;
  try {
    b = predicate.test(t);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e,  this.ctx, s);
    if (e_ != null) {
      onError(e_);
    }
    else {
      s.request(1);
    }
    Operators.onDiscard(t,  this.ctx);
    return;
  }
  if (b) {
    actual.onNext(t);
  }
  else {
    Operators.onDiscard(t,  this.ctx);
    s.request(1);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t,  this.ctx);
    return;
  }
  boolean b;
  try {
    b = predicate.test(t);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e,  this.ctx, s);
    if (e_ != null) {
      onError(e_);
    }
    else {
      s.request(1);
    }
    Operators.onDiscard(t,  this.ctx);
    return;
  }
  if (b) {
    actual.onNext(t);
  }
  else {
    s.request(1);
    Operators.onDiscard(t,  this.ctx);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return true;
  }
  R v;
  try {
    v = Objects.requireNonNull(mapper.apply(t),
        "The mapper returned a null value.");
    return actual.tryOnNext(v);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
    if (e_ != null) {
      onError(e_);
      return true;
    }
    else {
      return false;
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return true;
  }
  R v;
  try {
    v = Objects.requireNonNull(mapper.apply(t),
        "The mapper returned a null value.");
    return actual.tryOnNext(v);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
    if (e_ != null) {
      done = true;
      actual.onError(e_);
      return true;
    }
    else {
      return false;
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t,  this.ctx);
    return false;
  }
  boolean b;
  try {
    b = predicate.test(t);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e,  this.ctx, s);
    if (e_ != null) {
      onError(e_);
    }
    Operators.onDiscard(t,  this.ctx);
    return false;
  }
  if (b) {
    return actual.tryOnNext(t);
  }
  else {
    Operators.onDiscard(t,  this.ctx);
    return false;
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t, this.ctx);
    return false;
  }
  boolean b;
  try {
    b = predicate.test(t);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
    if (e_ != null) {
      onError(e_);
    }
    Operators.onDiscard(t, this.ctx);
    return false;
  }
  if (b) {
    actual.onNext(t);
    return true;
  }
  Operators.onDiscard(t, this.ctx);
  return false;
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t,  this.ctx);
    return false;
  }
  boolean b;
  try {
    b = predicate.test(t);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e,  this.ctx, s);
    if (e_ != null) {
      onError(e_);
    }
    Operators.onDiscard(t,  this.ctx);
    return false;
  }
  if (b) {
    actual.onNext(t);
  }
  else {
    Operators.onDiscard(t,  this.ctx);
  }
  return b;
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t, this.ctx);
    return false;
  }
  boolean b;
  try {
    b = predicate.test(t);
  }
  catch (Throwable e) {
    Throwable e_ = Operators.onNextError(t, e, this.ctx, s);
    if (e_ != null) {
      onError(e_);
    }
    Operators.onDiscard(t, this.ctx);
    return false;
  }
  if (b) {
    return actual.tryOnNext(t);
  }
  else {
    Operators.onDiscard(t, this.ctx);
    return false;
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return false;
  }
  final Consumer<? super T> nextHook = parent.onNextCall();
  if (nextHook != null) {
    try {
      nextHook.accept(t);
    }
    catch (Throwable e) {
      Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
      if (e_ == null) {
        return false;
      }
      else {
        onError(e_);
        return true;
      }
    }
  }
  return actual.tryOnNext(t);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (sourceMode == ASYNC) {
    actual.onNext(null);
  }
  else {
    if (done) {
      Operators.onNextDropped(t, actual.currentContext());
      return;
    }
    R v;
    try {
      v = Objects.requireNonNull(mapper.apply(t),
          "The mapper returned a null value.");
    }
    catch (Throwable e) {
      Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
      if (e_ != null) {
        onError(e_);
      }
      else {
        s.request(1);
      }
      return;
    }
    actual.onNext(v);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (sourceMode == ASYNC) {
    actual.onNext(null);
  }
  else {
    if (done) {
      Operators.onNextDropped(t, actual.currentContext());
      return;
    }
    R v;
    try {
      v = Objects.requireNonNull(mapper.apply(t),
          "The mapper returned a null value.");
    }
    catch (Throwable e) {
      Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
      if (e_ != null) {
        onError(e_);
      }
      else {
        s.request(1);
      }
      return;
    }
    actual.onNext(v);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public boolean tryOnNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return false;
  }
  final Consumer<? super T> nextHook = parent.onNextCall();
  if (nextHook != null) {
    try {
      nextHook.accept(t);
    }
    catch (Throwable e) {
      Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
      if (e_ == null) {
        return false;
      }
      else {
        onError(e_);
        return true;
      }
    }
  }
  return actual.tryOnNext(t);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return;
  }
  final Consumer<? super T> nextHook = parent.onNextCall();
  if(nextHook != null) {
    try {
      nextHook.accept(t);
    }
    catch (Throwable e) {
      Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
      if (e_ == null) {
        request(1);
        return;
      }
      else {
        onError(e_);
        return;
      }
    }
  }
  actual.onNext(t);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (done) {
    Operators.onNextDropped(t, actual.currentContext());
    return;
  }
  final Consumer<? super T> nextHook = parent.onNextCall();
  if (nextHook != null) {
    try {
      nextHook.accept(t);
    }
    catch (Throwable e) {
      Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
      if (e_ == null) {
        request(1);
        return;
      }
      else {
        onError(e_);
        return;
      }
    }
  }
  actual.onNext(t);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onNextErrorModeLocalStrategy() {
  List<Object> nextDropped = new ArrayList<>();
  List<Object> errorDropped = new ArrayList<>();
  Hooks.onNextDropped(nextDropped::add);
  Hooks.onErrorDropped(errorDropped::add);
  Hooks.onNextError(OnNextFailureStrategy.STOP);
  Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  Exception error = new IllegalStateException("boom");
  DeferredSubscription s = new Operators.DeferredSubscription();
  try {
    assertThat(s.isCancelled()).as("s initially cancelled").isFalse();
    Throwable e = Operators.onNextError("foo", error, c, s);
    assertThat(e).isNull();
    assertThat(nextDropped).containsExactly("foo");
    assertThat(errorDropped).containsExactly(error);
    assertThat(s.isCancelled()).as("s cancelled").isFalse();
  }
  finally {
    Hooks.resetOnNextDropped();
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextError();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onNextFailureWithStrategyMatchingDoesntCancel() {
  Context context = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, new OnNextFailureStrategy() {
    @Override
    public boolean test(Throwable error, @Nullable Object value) {
      return true;
    }
    @Nullable
    @Override
    public Throwable process(Throwable error, @Nullable Object value,
        Context context) {
      return null;
    }
  });
  Operators.DeferredSubscription s = new Operators.DeferredSubscription();
  Throwable t = Operators.onNextError("foo", new NullPointerException("bar"), context, s);
  assertThat(t).as("exception processed").isNull();
  assertThat(s.isCancelled()).as("subscription cancelled").isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onNextFailureWithStrategyMatchingButNotNullDoesCancel() {
  Context context = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, new OnNextFailureStrategy() {
    @Override
    public boolean test(Throwable error, @Nullable Object value) {
      return true;
    }
    @Override
    public Throwable process(Throwable error, @Nullable Object value,
        Context context) {
      return error;
    }
  });
  Operators.DeferredSubscription s = new Operators.DeferredSubscription();
  Throwable t = Operators.onNextError("foo", new NullPointerException("bar"), context, s);
  assertThat(t).as("exception processed")
         .isNotNull()
         .isInstanceOf(NullPointerException.class)
         .hasNoSuppressedExceptions()
         .hasNoCause();
  assertThat(s.isCancelled()).as("subscription cancelled").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onNextFailureWithStrategyNotMatchingDoesCancel() {
  Context context = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, new OnNextFailureStrategy() {
    @Override
    public boolean test(Throwable error, @Nullable Object value) {
      return false;
    }
    @Override
    public Throwable process(Throwable error, @Nullable Object value,
        Context context) {
      return error;
    }
  });
  Operators.DeferredSubscription s = new Operators.DeferredSubscription();
  Throwable t = Operators.onNextError("foo", new NullPointerException("bar"), context, s);
  assertThat(t).as("exception processed")
         .isNotNull()
         .isInstanceOf(NullPointerException.class)
         .hasNoSuppressedExceptions()
         .hasNoCause();
  assertThat(s.isCancelled()).as("subscription cancelled").isTrue();
}

相关文章