reactor.core.publisher.Flux.distinct()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(327)

本文整理了Java中reactor.core.publisher.Flux.distinct()方法的一些代码示例,展示了Flux.distinct()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.distinct()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:distinct

Flux.distinct介绍

[英]For each Subscriber, track elements from this Flux that have been seen and filter out duplicates.

The values themselves are recorded into a HashSet for distinct detection. Use distinct(Object::hashcode) if you want a more lightweight approach that doesn't retain all the objects, but is more susceptible to falsely considering two elements as distinct due to a hashcode collision.
[中]对于每个订阅者,从这个流量中跟踪已经看到的元素并过滤掉重复的元素。
这些值本身被记录到一个散列集中以进行不同的检测。如果您想要一种更轻量级的方法,即不保留所有对象,但更容易由于哈希代码冲突而错误地将两个元素视为不同的,请使用distinct(Object::hashcode)。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_errorFromUpstreamFailure() {
  return Arrays.asList(
      scenario(f -> f.distinct()));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<SUnionCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Keys must not be null!");
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeys())) {
      return super.sUnion(Mono.just(command));
    }
    Flux<ByteBuffer> result = Flux.merge(command.getKeys().stream().map(cmd::smembers).collect(Collectors.toList()))
        .distinct();
    return Mono.just(new CommandResponse<>(command, result));
  }));
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorError() {
  return Arrays.asList(
      scenario(f -> f.distinct(d -> {
        throw exception();
      })),
      scenario(f -> f.distinct(d -> null))
  );
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void keyExtractorNull() {
  Flux.never().distinct(null);
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(
      scenario(f -> f.distinct()).producer(3, i -> item(0))
                    .receiveValues((item(0)))
                    .receiverDemand(2),
      scenario(f -> f.distinct())
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys() {
//        "A Flux can be enforced to dispatch values having distinct keys"
//        given: "a composable with values 1 to 4 with duplicate keys"
    Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));

//        when: "the values are filtered and result is collected"
    MonoProcessor<List<Integer>> tap = s.distinct(it -> it % 3)
                      .collectList()
                      .toProcessor();
    tap.subscribe();

//        then: "collected should be without duplicates"
    assertThat(tap.block()).containsExactly(1, 2, 3);
  }

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanBeEnforcedToDispatchDistinctValues() {
//        "A Flux can be enforced to dispatch distinct values"
//        given:"a composable with values 1 to 4 with duplicates"
    Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));

//        when:"the values are filtered and result is collected"
    MonoProcessor<List<Integer>> tap = s.distinct()
                      .collectList()
                      .toProcessor();
    tap.subscribe();

//        then:"collected should be without duplicates"
    assertThat(tap.block()).containsExactly(1, 2, 3, 4);
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void boundaryFused() {
  Flux.range(1, 10000)
    .publishOn(Schedulers.single())
    .map(v -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + v + Thread.currentThread().getName()))
    .share()
    .publishOn(Schedulers.elastic())
    .distinct()
    .as(StepVerifier::create)
    .expectFusion()
    .expectNext("single")
    .expectComplete()
    .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void boundaryFusion() {
  Flux.range(1, 10000)
    .publishOn(Schedulers.single())
    .map(t -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + t + Thread.currentThread().getName()))
    .concatMap(Flux::just)
    .publishOn(Schedulers.elastic())
    .distinct()
    .as(StepVerifier::create)
    .expectFusion()
    .expectNext("single")
    .expectComplete()
    .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void boundaryFusionDelayError() {
  Flux.range(1, 10000)
    .publishOn(Schedulers.single())
    .map(t -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + t + Thread.currentThread().getName()))
    .concatMapDelayError(Flux::just)
    .publishOn(Schedulers.elastic())
    .distinct()
    .as(StepVerifier::create)
    .expectFusion()
    .expectNext("single")
    .expectComplete()
    .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void distinctDefaultDoesntRetainObjects() throws InterruptedException {
  RetainedDetector retainedDetector = new RetainedDetector();
  Flux<DistinctDefault> test = Flux.range(1, 100)
                   .map(i -> retainedDetector.tracked(new DistinctDefault(i)))
                   .distinct();
  StepVerifier.create(test)
        .expectNextCount(100)
        .verifyComplete();
  System.gc();
  Thread.sleep(500);
  assertThat(retainedDetector.finalizedCount())
      .as("none retained")
      .isEqualTo(100);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void issue422(){
  Flux<Integer> source = Flux.create((sink) -> {
    for (int i = 0; i < 300; i++) {
      sink.next(i);
    }
    sink.complete();
  });
  Flux<Integer> cached = source.cache();
  long cachedCount = cached.concatMapIterable(Collections::singleton)
               .distinct().count().block();
  //System.out.println("source: " + sourceCount);
  System.out.println("cached: " + cachedCount);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void keyExtractorThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).distinct(k -> {
    throw new RuntimeException("forced failure");
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
//see https://github.com/reactor/reactor-core/issues/577
public void collectionSupplierLimitedFifo() {
  Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 1, 3, 4, 1, 1, 1, 1, 2);
  StepVerifier.create(flux.distinct(Flux.identityFunction(), () -> new NaiveFifoQueue<>(5)))
        .expectNext(1, 2, 3, 4, 5, 6, 1, 2)
        .verifyComplete();
  StepVerifier.create(flux.distinct(Flux.identityFunction(),
          () -> new NaiveFifoQueue<>(3)))
        .expectNext(1, 2, 3, 4, 5, 6, 1, 3, 4, 2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void someDistinct() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, 2, 2, 3, 4, 5, 6, 1, 2, 7, 7, 8, 9, 9, 10, 10, 10)
    .distinct(k -> k)
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void allSame() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
    .distinct(k -> k)
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void allDistinct() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .distinct(k -> k)
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void withKeyExtractorSame() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).distinct(k -> k % 3).subscribe(ts);
  ts.assertValues(1, 2, 3)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void allDistinctHide() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .hide()
    .distinct(k -> k)
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void allSameBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
    .distinct(k -> k)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

相关文章

Flux类方法