本文整理了Java中reactor.core.publisher.Flux.distinct()
方法的一些代码示例,展示了Flux.distinct()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.distinct()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:distinct
[英]For each Subscriber, track elements from this Flux that have been seen and filter out duplicates.
The values themselves are recorded into a HashSet for distinct detection. Use distinct(Object::hashcode) if you want a more lightweight approach that doesn't retain all the objects, but is more susceptible to falsely considering two elements as distinct due to a hashcode collision.
[中]对于每个订阅者,从这个流量中跟踪已经看到的元素并过滤掉重复的元素。
这些值本身被记录到一个散列集中以进行不同的检测。如果您想要一种更轻量级的方法,即不保留所有对象,但更容易由于哈希代码冲突而错误地将两个元素视为不同的,请使用distinct(Object::hashcode)。
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_errorFromUpstreamFailure() {
return Arrays.asList(
scenario(f -> f.distinct()));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<SUnionCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Keys must not be null!");
if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeys())) {
return super.sUnion(Mono.just(command));
}
Flux<ByteBuffer> result = Flux.merge(command.getKeys().stream().map(cmd::smembers).collect(Collectors.toList()))
.distinct();
return Mono.just(new CommandResponse<>(command, result));
}));
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorError() {
return Arrays.asList(
scenario(f -> f.distinct(d -> {
throw exception();
})),
scenario(f -> f.distinct(d -> null))
);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void keyExtractorNull() {
Flux.never().distinct(null);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(
scenario(f -> f.distinct()).producer(3, i -> item(0))
.receiveValues((item(0)))
.receiverDemand(2),
scenario(f -> f.distinct())
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanBeEnforcedToDispatchValuesHavingDistinctKeys() {
// "A Flux can be enforced to dispatch values having distinct keys"
// given: "a composable with values 1 to 4 with duplicate keys"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));
// when: "the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinct(it -> it % 3)
.collectList()
.toProcessor();
tap.subscribe();
// then: "collected should be without duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanBeEnforcedToDispatchDistinctValues() {
// "A Flux can be enforced to dispatch distinct values"
// given:"a composable with values 1 to 4 with duplicates"
Flux<Integer> s = Flux.fromIterable(Arrays.asList(1, 2, 3, 1, 2, 3, 4));
// when:"the values are filtered and result is collected"
MonoProcessor<List<Integer>> tap = s.distinct()
.collectList()
.toProcessor();
tap.subscribe();
// then:"collected should be without duplicates"
assertThat(tap.block()).containsExactly(1, 2, 3, 4);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void boundaryFused() {
Flux.range(1, 10000)
.publishOn(Schedulers.single())
.map(v -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + v + Thread.currentThread().getName()))
.share()
.publishOn(Schedulers.elastic())
.distinct()
.as(StepVerifier::create)
.expectFusion()
.expectNext("single")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void boundaryFusion() {
Flux.range(1, 10000)
.publishOn(Schedulers.single())
.map(t -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + t + Thread.currentThread().getName()))
.concatMap(Flux::just)
.publishOn(Schedulers.elastic())
.distinct()
.as(StepVerifier::create)
.expectFusion()
.expectNext("single")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void boundaryFusionDelayError() {
Flux.range(1, 10000)
.publishOn(Schedulers.single())
.map(t -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + t + Thread.currentThread().getName()))
.concatMapDelayError(Flux::just)
.publishOn(Schedulers.elastic())
.distinct()
.as(StepVerifier::create)
.expectFusion()
.expectNext("single")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void distinctDefaultDoesntRetainObjects() throws InterruptedException {
RetainedDetector retainedDetector = new RetainedDetector();
Flux<DistinctDefault> test = Flux.range(1, 100)
.map(i -> retainedDetector.tracked(new DistinctDefault(i)))
.distinct();
StepVerifier.create(test)
.expectNextCount(100)
.verifyComplete();
System.gc();
Thread.sleep(500);
assertThat(retainedDetector.finalizedCount())
.as("none retained")
.isEqualTo(100);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void issue422(){
Flux<Integer> source = Flux.create((sink) -> {
for (int i = 0; i < 300; i++) {
sink.next(i);
}
sink.complete();
});
Flux<Integer> cached = source.cache();
long cachedCount = cached.concatMapIterable(Collections::singleton)
.distinct().count().block();
//System.out.println("source: " + sourceCount);
System.out.println("cached: " + cachedCount);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void keyExtractorThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).distinct(k -> {
throw new RuntimeException("forced failure");
}).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
//see https://github.com/reactor/reactor-core/issues/577
public void collectionSupplierLimitedFifo() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 1, 3, 4, 1, 1, 1, 1, 2);
StepVerifier.create(flux.distinct(Flux.identityFunction(), () -> new NaiveFifoQueue<>(5)))
.expectNext(1, 2, 3, 4, 5, 6, 1, 2)
.verifyComplete();
StepVerifier.create(flux.distinct(Flux.identityFunction(),
() -> new NaiveFifoQueue<>(3)))
.expectNext(1, 2, 3, 4, 5, 6, 1, 3, 4, 2)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void someDistinct() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, 2, 2, 3, 4, 5, 6, 1, 2, 7, 7, 8, 9, 9, 10, 10, 10)
.distinct(k -> k)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allSame() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
.distinct(k -> k)
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allDistinct() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.distinct(k -> k)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void withKeyExtractorSame() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).distinct(k -> k % 3).subscribe(ts);
ts.assertValues(1, 2, 3)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allDistinctHide() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.hide()
.distinct(k -> k)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allSameBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.just(1, 1, 1, 1, 1, 1, 1, 1, 1)
.distinct(k -> k)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
内容来源于网络,如有侵权,请联系作者删除!