reactor.core.publisher.Flux.defer()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(1086)

本文整理了Java中reactor.core.publisher.Flux.defer()方法的一些代码示例,展示了Flux.defer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.defer()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:defer

Flux.defer介绍

[英]Lazily supply a Publisher every time a Subscription is made on the resulting Flux, so the actual source instantiation is deferred until each subscribe and the Supplier can create a subscriber-specific instance. If the supplier doesn't generate a new instance however, this operator will effectively behave like #from(Publisher).
[中]每次对生成的流量进行订阅时,都会延迟提供发布服务器,因此实际的源实例化将延迟到每次订阅,并且供应商可以创建特定于订阅服务器的实例。但是,如果供应商不生成新实例,则该操作符的行为实际上类似于#from(Publisher)。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public Flux<?> apply(Single<?> source) {
    return Flux.defer(() -> RxReactiveStreams.toPublisher(source));
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public Publisher<?> apply(Observable<?> source) {
    return Flux.defer(() -> new PublisherAdapter<>(source));
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public Publisher<?> apply(Completable source) {
    return Flux.defer(() -> RxReactiveStreams.toPublisher(source));
  }
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public Publisher<?> apply(Single<?> source) {
    return Flux.defer(() -> RxReactiveStreams.toPublisher(source));
  }
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<InstanceEvent> find(InstanceId id) {
  return Flux.defer(() -> Flux.fromIterable(eventLog.getOrDefault(id, Collections.emptyList())));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<InstanceEvent> findAll() {
  return Flux.defer(() -> Flux.fromIterable(eventLog.values())
                .flatMapIterable(Function.identity())
                .sort(byTimestampAndIdAndVersion));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Relay buffers from the given {@link Publisher} until the total
 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
 * the given maximum byte count, or until the publisher is complete.
 * @param publisher the publisher to filter
 * @param maxByteCount the maximum byte count
 * @return a flux whose maximum byte count is {@code maxByteCount}
 */
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
  Assert.notNull(publisher, "Publisher must not be null");
  Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
  return Flux.defer(() -> {
    AtomicLong countDown = new AtomicLong(maxByteCount);
    return Flux.from(publisher)
        .map(buffer -> {
          long remainder = countDown.addAndGet(-buffer.readableByteCount());
          if (remainder < 0) {
            int length = buffer.readableByteCount() + (int) remainder;
            return buffer.slice(0, length);
          }
          else {
            return buffer;
          }
        })
        .takeUntil(buffer -> countDown.get() <= 0);
  }); // no doOnDiscard necessary, as this method does not drop buffers
}

代码示例来源:origin: spring-projects/spring-framework

Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
return Flux.defer(() -> {
  AtomicLong countDown = new AtomicLong(maxByteCount);
  return Flux.from(publisher)

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public Flux<?> apply(Observable<?> source) {
    return Flux.defer(() -> Flux.from((Publisher<?>) RxReactiveStreams.toPublisher(source)));
  }
}

代码示例来源:origin: spring-projects/spring-framework

private Flux<DataBuffer> getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
  byte[] endBoundary = getAsciiBytes("\r\n--" + boundaryString + "--");
  return Flux.defer(() -> Flux.just(
      bufferFactory.allocateBuffer(endBoundary.length).write(endBoundary)));
}

代码示例来源:origin: spring-projects/spring-framework

private Flux<DataBuffer> getRegionPrefix(DataBufferFactory bufferFactory, byte[] startBoundary,
    byte[] contentType, ResourceRegion region) {
  return Flux.defer(() -> Flux.just(
      bufferFactory.allocateBuffer(startBoundary.length).write(startBoundary),
      bufferFactory.allocateBuffer(contentType.length).write(contentType),
      bufferFactory.wrap(ByteBuffer.wrap(getContentRangeHeader(region))))
  );
}

代码示例来源:origin: org.springframework/spring-core

private Flux<DataBuffer> getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
  byte[] endBoundary = getAsciiBytes("\r\n--" + boundaryString + "--");
  return Flux.defer(() -> Flux.just(
      bufferFactory.allocateBuffer(endBoundary.length).write(endBoundary)));
}

代码示例来源:origin: org.springframework/spring-core

private Flux<DataBuffer> getRegionPrefix(DataBufferFactory bufferFactory, byte[] startBoundary,
    byte[] contentType, ResourceRegion region) {
  return Flux.defer(() -> Flux.just(
      bufferFactory.allocateBuffer(startBoundary.length).write(startBoundary),
      bufferFactory.allocateBuffer(contentType.length).write(contentType),
      bufferFactory.wrap(ByteBuffer.wrap(getContentRangeHeader(region))))
  );
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingLastError2() {
  Flux.defer(() -> Mono.error(new RuntimeException("test")))
    .subscribeOn(scheduler)
    .blockLast(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void deferStream(){
    AtomicInteger i = new AtomicInteger();

    Flux<Integer> source =
        Flux.defer(() -> Flux.just(i.incrementAndGet()));

    Assert.assertEquals(source.blockLast().intValue(), 1);
    Assert.assertEquals(source.blockLast().intValue(), 2);
    Assert.assertEquals(source.blockLast().intValue(), 3);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingLastError() {
  Flux.defer(() -> Mono.error(new RuntimeException("test")))
    .subscribeOn(scheduler)
    .blockLast();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testMonoThenManySupplier() {
  AssertSubscriber<String> ts = AssertSubscriber.create();
  Flux<String> test = Mono.just(1).thenMany(Flux.defer(() -> Flux.just("A", "B")));
  test.subscribe(ts);
  ts.assertValues("A", "B");
  ts.assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void supplierThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>defer(() -> {
    throw new RuntimeException("forced failure");
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void supplierReturnsNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>defer(() -> null).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void thenManySupplier(){
  StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.defer(() -> Flux.just("test", "test2"))))
        .expectNext("test", "test2")
        .verifyComplete();
}

相关文章

Flux类方法