reactor.core.publisher.Flux.push()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(3.1k)|赞(0)|评价(0)|浏览(297)

本文整理了Java中reactor.core.publisher.Flux.push()方法的一些代码示例,展示了Flux.push()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.push()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:push

Flux.push介绍

[英]Programmatically create a Flux with the capability of emitting multiple elements from a single-threaded producer through the FluxSink API. For a multi-threaded capable alternative, see #create(Consumer).

This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

For example:

Flux.<String>push(emitter -> { 
ActionListener al = e -> { 
emitter.next(textField.getText()); 
}; 
// without cleanup support: 
button.addActionListener(al); 
// with cleanup support: 
button.addActionListener(al); 
emitter.onDispose(() -> { 
button.removeListener(al); 
}); 
});

[中]以编程方式创建一个通量,该通量能够通过FluxSink API从一个单线程生产者发出多个元素。有关支持多线程的替代方案,请参见#创建(使用者)。
如果您想适应其他单线程多值异步API,并且不担心取消和背压(如果下游无法跟上,则通过缓冲所有信号来处理),则此流量工厂非常有用。
例如:

Flux.<String>push(emitter -> { 
ActionListener al = e -> { 
emitter.next(textField.getText()); 
}; 
// without cleanup support: 
button.addActionListener(al); 
// with cleanup support: 
button.addActionListener(al); 
emitter.onDispose(() -> { 
button.removeListener(al); 
}); 
});

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTestPush() {
  StepVerifier.create(Flux.push(s -> IntStream.range(0, 10).forEach(i -> s.next(s
      .currentContext()
                              .get(AtomicInteger.class)
                              .incrementAndGet())))
              .take(10)
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxPushOnRequest() {
  AtomicInteger index = new AtomicInteger(1);
  AtomicInteger onRequest = new AtomicInteger();
  Flux<Integer> created = Flux.push(s -> {
    s.onRequest(n -> {
      onRequest.incrementAndGet();
      assertThat(n).isEqualTo(Long.MAX_VALUE);
      for (int i = 0; i < 5; i++) {
        s.next(index.getAndIncrement());
      }
      s.complete();
    });
  }, OverflowStrategy.BUFFER);
  StepVerifier.create(created, 0)
        .expectSubscription()
        .thenAwait()
        .thenRequest(1)
        .expectNext(1)
        .thenRequest(2)
        .expectNext(2, 3)
        .thenRequest(2)
        .expectNext(4, 5)
        .expectComplete()
        .verify();
  assertThat(onRequest.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxPush() {
  Flux<String> created = Flux.push(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  });
  assertThat(created.getPrefetch()).isEqualTo(-1);
  StepVerifier.create(created)
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Flux<String> failingMany(String name) {
 return Flux.defer(
   () ->
     Flux.push(
       sink -> {
        sink.next("Echo:" + name);
        sink.next("Echo:" + name);
        sink.error(new RuntimeException("Echo:" + name));
       }));
}

代码示例来源:origin: io.scalecube/scalecube-gateway-examples

@Override
public Flux<String> failingMany(String name) {
 return Flux.defer(
   () ->
     Flux.push(
       sink -> {
        sink.next("Echo:" + name);
        sink.next("Echo:" + name);
        sink.error(new RuntimeException("Echo:" + name));
       }));
}

相关文章

Flux类方法