reactor.core.publisher.Operators.cancelledSubscription()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(181)

本文整理了Java中reactor.core.publisher.Operators.cancelledSubscription()方法的一些代码示例,展示了Operators.cancelledSubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.cancelledSubscription()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:cancelledSubscription

Operators.cancelledSubscription介绍

[英]A singleton Subscription that represents a cancelled subscription instance and should not be leaked to clients as it represents a terminal state.
If algorithms need to hand out a subscription, replace this with a singleton subscription because there is no standard way to tell if a Subscription is cancelled or not otherwise.
[中]表示已取消订阅实例的单例订阅,不应泄露给客户端,因为它表示终端状态。
如果算法需要分发订阅,请将其替换为单例订阅,因为没有标准方法来判断订阅是否被取消。

代码示例

代码示例来源:origin: resilience4j/resilience4j

@Override
public boolean isDisposed() {
  return subscription == Operators.cancelledSubscription();
}

代码示例来源:origin: reactor/reactor-core

void cancelMain() {
  Subscription s = main;
  if (s != Operators.cancelledSubscription()) {
    s = MAIN.getAndSet(this, Operators.cancelledSubscription());
    if (s != null && s != Operators.cancelledSubscription()) {
      s.cancel();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

void cancelOther() {
  Subscription s = other;
  if (s != Operators.cancelledSubscription()) {
    s = OTHER.getAndSet(this, Operators.cancelledSubscription());
    if (s != null && s != Operators.cancelledSubscription()) {
      s.cancel();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onNext(T t) {
  if (s != Operators.cancelledSubscription()) {
    value = t;
    parent.innerNext();
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (s != Operators.cancelledSubscription()) {
    parent.innerError(this, t);
  }
}

代码示例来源:origin: reactor/reactor-core

/**
 * Indicates whether this {@code MonoProcessor} has been interrupted via cancellation.
 *
 * @return {@code true} if this {@code MonoProcessor} is cancelled, {@code false}
 * otherwise.
 */
public boolean isCancelled() {
  return subscription == Operators.cancelledSubscription() && !isTerminated();
}

代码示例来源:origin: reactor/reactor-core

@Override
  @Nullable
  public Object scanUnsafe(Attr key) {
    if (key == Attr.PARENT) return  s;
    if (key == Attr.ACTUAL) return parent;
    if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
    if (key == Attr.PREFETCH) return prefetch;
    return null;
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
@Nullable
public Object scanUnsafe(Attr key) {
  if (key == Attr.PARENT) return s;
  if (key == Attr.ACTUAL) return parent;
  if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
  return null;
}

代码示例来源:origin: reactor/reactor-core

@Override
@Nullable
public Object scanUnsafe(Attr key) {
  if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
  if (key == Attr.PARENT) return s;
  if (key == Attr.TERMINATED) return done;
  if (key == Attr.ACTUAL) return parent;
  if (key == Attr.BUFFERED) return value != null ? 1 : 0;
  if (key == Attr.PREFETCH) return Integer.MAX_VALUE;
  return null;
}

代码示例来源:origin: reactor/reactor-core

@Override
  @Nullable
  public Object scanUnsafe(Attr key) {
    if (key == Attr.TERMINATED) return done;
    if (key == Attr.PARENT) return  s;
    if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
    if (key == Attr.PREFETCH) return batchSize;
    if (key == Attr.ERROR) return error;
    return null;
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
  public void dispose() {
    Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
    if (s != null && s != Operators.cancelledSubscription()) {
      s.cancel();
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
@Nullable
public Object scanUnsafe(Attr key) {
  if (key == Attr.PARENT) return subscription;
  if (key == Attr.ACTUAL) return parent;
  if (key == Attr.TERMINATED) return done && (queue == null || queue.isEmpty());
  if (key == Attr.CANCELLED) return subscription == Operators.cancelledSubscription();
  if (key == Attr.BUFFERED) return queue == null ? 0 : queue.size();
  if (key == Attr.PREFETCH) return prefetch;
  return null;
}

代码示例来源:origin: reactor/reactor-core

@Override
  @Nullable
  public Object scanUnsafe(Attr key) {
    if (key == Attr.PARENT) return s;
    if (key == Attr.ACTUAL) return parent;
    if (key == Attr.TERMINATED) return done && (queue == null || queue.isEmpty());
    if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
    if (key == Attr.BUFFERED) return queue == null ? 0 : queue.size();
    if (key == Attr.PREFETCH) return prefetch;
    return null;
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onSubscribe(Subscription s) {
  if (!MAIN.compareAndSet(this, null, s)) {
    s.cancel();
    if (main != Operators.cancelledSubscription()) {
      Operators.reportSubscriptionSet();
    }
    return;
  }
  s.request(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onError(Throwable t) {
  if (subscription != Operators.cancelledSubscription()) {
    SUBSCRIPTION.lazySet(this, Operators.cancelledSubscription());
    parent.boundaryError(this, t);
  }
  else {
    Operators.onErrorDropped(t, parent.ctx);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void onComplete() {
  SUBSCRIPTION.lazySet(this, Operators.cancelledSubscription());
  parent.openComplete(this);
}

代码示例来源:origin: reactor/reactor-core

@Override
@Nullable
public Object scanUnsafe(Attr key) {
  if (key == Attr.PARENT) return main;
  if (key == Attr.CANCELLED) return main == Operators.cancelledSubscription();
  return InnerOperator.super.scanUnsafe(key);
}

代码示例来源:origin: reactor/reactor-core

@Override
@Nullable
public Object scanUnsafe(Attr key) {
  if (key == Attr.PARENT) return s;
  if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
  if (key == Attr.TERMINATED) return done;
  return super.scanUnsafe(key);
}

代码示例来源:origin: reactor/reactor-core

@Nullable
final Subscription cancel() {
  Subscription s =
      this.getAndSet(Operators.cancelledSubscription());
  if (s != null && s != Operators.cancelledSubscription()) {
    s.cancel();
    if(establishedFusionMode == Fuseable.ASYNC) {
      qs.clear();
    }
  }
  return s;
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMainCancelled() {
  EmitterProcessor test = EmitterProcessor.create();
  test.onSubscribe(Operators.cancelledSubscription());
  assertThat(test.scan(CANCELLED)).isTrue();
  assertThat(test.isCancelled()).isTrue();
}

相关文章