本文整理了Java中reactor.core.publisher.Operators.onRejectedExecution()
方法的一些代码示例,展示了Operators.onRejectedExecution()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.onRejectedExecution()
方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:onRejectedExecution
[英]Return a wrapped RejectedExecutionException which can be thrown by the operator. This exception denotes that an execution was rejected by a reactor.core.scheduler.Scheduler, notably when it was already disposed.
Wrapping is done by calling both Exceptions#failWithRejected(Throwable) and #onOperatorError(Subscription,Throwable,Object,Context) (with the passed Subscription).
[中]返回可由操作员抛出的包装的RejectedExecutionException。此异常表示执行被反应器拒绝。果心调度程序。调度程序,尤其是当它已被处置时。
通过调用异常#failWithRejected(Throwable)和#onOperatorError(Subscription,Throwable,Object,Context)(使用传递的订阅)完成包装。
代码示例来源:origin: reactor/reactor-core
/**
* Return a wrapped {@link RejectedExecutionException} which can be thrown by the
* operator. This exception denotes that an execution was rejected by a
* {@link reactor.core.scheduler.Scheduler}, notably when it was already disposed.
* <p>
* Wrapping is done by calling both {@link Exceptions#bubble(Throwable)} and
* {@link #onOperatorError(Subscription, Throwable, Object, Context)}.
*
* @param original the original execution error
* @param context a context that might hold a local error consumer
*
*/
public static RuntimeException onRejectedExecution(Throwable original, Context context) {
return onRejectedExecution(original, null, null, null, context);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void cancel() {
if (CANCELLED.compareAndSet(this, 0, 1)) {
try {
scheduler.schedule(this);
}
catch (RejectedExecutionException ree) {
throw Operators.onRejectedExecution(ree, actual.currentContext());
}
}
}
}
代码示例来源:origin: reactor/reactor-core
void requestUpstream(final long n, final Subscription s) {
if (!requestOnSeparateThread || Thread.currentThread() == THREAD.get(this)) {
s.request(n);
}
else {
try {
worker.schedule(() -> s.request(n));
}
catch (RejectedExecutionException ree) {
if(!worker.isDisposed()) {
//FIXME should not throw but if we implement strict
// serialization like in StrictSubscriber, onNext will carry an
// extra cost
throw Operators.onRejectedExecution(ree, this, null, null,
actual.currentContext());
}
}
}
}
代码示例来源:origin: reactor/reactor-core
void trySchedule(long n, Subscription s) {
if (Thread.currentThread() == THREAD.get(this)) {
s.request(n);
}
else {
try {
worker.schedule(() -> s.request(n));
}
catch (RejectedExecutionException ree) {
if (!worker.isDisposed()) {
actual.onError(Operators.onRejectedExecution(ree,
this,
null,
null,
actual.currentContext()));
}
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onRejectedExecutionWithoutDataSignalDelegatesToErrorLocal() {
BiFunction<Throwable, Object, Throwable> localHook = (e, v) ->
new IllegalStateException("boom_" + v, e);
Context c = Context.of(Hooks.KEY_ON_OPERATOR_ERROR, localHook);
IllegalArgumentException failure = new IllegalArgumentException("foo");
final Throwable throwable = Operators.onRejectedExecution(failure, null, null, null, c);
assertThat(throwable).isInstanceOf(IllegalStateException.class)
.hasMessage("boom_null")
.hasNoSuppressedExceptions();
assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
.hasMessage("Scheduler unavailable")
.hasCause(failure);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onRejectedExecutionWithDataSignalDelegatesToErrorLocal() {
BiFunction<Throwable, Object, Throwable> localHook = (e, v) ->
new IllegalStateException("boom_" + v, e);
Context c = Context.of(Hooks.KEY_ON_OPERATOR_ERROR, localHook);
IllegalArgumentException failure = new IllegalArgumentException("foo");
final Throwable throwable = Operators.onRejectedExecution(failure, null,
null, "bar", c);
assertThat(throwable).isInstanceOf(IllegalStateException.class)
.hasMessage("boom_bar")
.hasNoSuppressedExceptions();
assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
.hasMessage("Scheduler unavailable")
.hasCause(failure);
}
代码示例来源:origin: reactor/reactor-core
Disposable newPeriod() {
try {
return worker.schedulePeriodically(new ConsumerIndexHolder(producerIndex,
this), timespan, timespan, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
actual.onError(Operators.onRejectedExecution(e, s, null, null, actual.currentContext()));
return Disposables.disposed();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onRejectedExecutionLocalTakesPrecedenceOverOnOperatorError() {
BiFunction<Throwable, Object, Throwable> localOperatorErrorHook = (e, v) ->
new IllegalStateException("boom_" + v, e);
BiFunction<Throwable, Object, Throwable> localReeHook = (e, v) ->
new IllegalStateException("rejected_" + v, e);
Context c = Context.of(
Hooks.KEY_ON_OPERATOR_ERROR, localOperatorErrorHook,
Hooks.KEY_ON_REJECTED_EXECUTION, localReeHook);
IllegalArgumentException failure = new IllegalArgumentException("foo");
final Throwable throwable = Operators.onRejectedExecution(failure, null,
null, "bar", c);
assertThat(throwable).isInstanceOf(IllegalStateException.class)
.hasMessage("rejected_bar")
.hasNoSuppressedExceptions();
assertThat(throwable.getCause()).isInstanceOf(RejectedExecutionException.class)
.hasMessage("Scheduler unavailable")
.hasCause(failure);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testOnRejectedWithOutsideRee() {
RejectedExecutionException original = new RejectedExecutionException("outside");
Exception suppressed = new Exception("suppressed");
RuntimeException test = Operators.onRejectedExecution(original,
null, suppressed, null, Context.empty());
assertThat(test)
.isNotSameAs(original)
.isInstanceOf(RejectedExecutionException.class)
.hasMessage("Scheduler unavailable")
.hasCause(original)
.hasSuppressedException(suppressed);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
this.done = true;
try {
this.task = scheduler.schedule(() -> complete(t), delay, unit);
}
catch (RejectedExecutionException ree) {
throw Operators.onRejectedExecution(ree, this, null, t,
actual.currentContext());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testOnRejectedWithReactorRee() {
Exception originalCause = new Exception("boom");
RejectedExecutionException original = Exceptions.failWithRejected(originalCause);
Exception suppressed = new Exception("suppressed");
RuntimeException test = Operators.onRejectedExecution(original,
null, suppressed, null, Context.empty());
assertThat(test)
.isSameAs(original)
.hasSuppressedException(suppressed);
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super Long> actual) {
MonoDelayRunnable r = new MonoDelayRunnable(actual);
actual.onSubscribe(r);
try {
r.setCancel(timedScheduler.schedule(r, delay, unit));
}
catch (RejectedExecutionException ree) {
if(r.cancel != OperatorDisposables.DISPOSED) {
actual.onError(Operators.onRejectedExecution(ree, r, null, null,
actual.currentContext()));
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
FluxSubscribeOnCallable.CallableSubscribeOnSubscription<T> parent =
new FluxSubscribeOnCallable.CallableSubscribeOnSubscription<>(actual, callable, scheduler);
actual.onSubscribe(parent);
try {
parent.setMainFuture(scheduler.schedule(parent));
}
catch (RejectedExecutionException ree) {
if(parent.state != FluxSubscribeOnCallable.CallableSubscribeOnSubscription.HAS_CANCELLED) {
actual.onError(Operators.onRejectedExecution(ree, actual.currentContext()));
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
CallableSubscribeOnSubscription<T> parent =
new CallableSubscribeOnSubscription<>(actual, callable, scheduler);
actual.onSubscribe(parent);
try {
Disposable f = scheduler.schedule(parent);
parent.setMainFuture(f);
}
catch (RejectedExecutionException ree) {
if(parent.state != CallableSubscribeOnSubscription.HAS_CANCELLED) {
actual.onError(Operators.onRejectedExecution(ree, actual.currentContext()));
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super Long> actual) {
Worker w = timedScheduler.createWorker();
IntervalRunnable r = new IntervalRunnable(actual, w);
actual.onSubscribe(r);
try {
w.schedulePeriodically(r, initialDelay, period, unit);
}
catch (RejectedExecutionException ree) {
if (!r.cancelled) {
actual.onError(Operators.onRejectedExecution(ree, r, null, null,
actual.currentContext()));
}
}
}
代码示例来源:origin: reactor/reactor-core
void trySchedule(
@Nullable Subscription subscription,
@Nullable Throwable suppressed,
@Nullable Object dataSignal) {
if(future != null){
return;
}
try {
future = this.scheduler.schedule(this);
}
catch (RejectedExecutionException ree) {
actual.onError(Operators.onRejectedExecution(ree, subscription,
suppressed, dataSignal, actual.currentContext()));
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler.Worker worker = scheduler.createWorker();
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
actual, worker);
actual.onSubscribe(parent);
try {
worker.schedule(parent);
}
catch (RejectedExecutionException ree) {
if (parent.s != Operators.cancelledSubscription()) {
actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
actual.currentContext()));
}
}
}
代码示例来源:origin: reactor/reactor-core
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
T v = value;
if (v == null) {
ScheduledEmpty parent = new ScheduledEmpty(actual);
actual.onSubscribe(parent);
try {
parent.setFuture(scheduler.schedule(parent));
}
catch (RejectedExecutionException ree) {
if (parent.future != OperatorDisposables.DISPOSED) {
actual.onError(Operators.onRejectedExecution(ree,
actual.currentContext()));
}
}
}
else {
actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler));
}
}
代码示例来源:origin: reactor/reactor-core
void trySchedule(
@Nullable Subscription subscription,
@Nullable Throwable suppressed,
@Nullable Object dataSignal) {
if (WIP.getAndIncrement(this) != 0) {
return;
}
try {
worker.schedule(this);
}
catch (RejectedExecutionException ree) {
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
actual.currentContext()));
}
}
代码示例来源:origin: reactor/reactor-core
void trySchedule(
@Nullable Subscription subscription,
@Nullable Throwable suppressed,
@Nullable Object dataSignal) {
if (WIP.getAndIncrement(this) != 0) {
return;
}
try {
worker.schedule(this);
}
catch (RejectedExecutionException ree) {
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
actual.currentContext()));
}
}
内容来源于网络,如有侵权,请联系作者删除!