本文整理了Java中reactor.core.publisher.Flux.generate()
方法的一些代码示例,展示了Flux.generate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.generate()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:generate
[英]Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state. The stateSupplier may return null.
[中]通过一个消费者回调和一些状态,通过编程一个接一个地生成信号,从而创建流量。stateprovider可以返回null。
代码示例来源:origin: spring-projects/spring-framework
/**
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readByteChannel(
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.using(channelSupplier,
channel -> {
ReadableByteChannelGenerator generator =
new ReadableByteChannelGenerator(channel, dataBufferFactory,
bufferSize);
return Flux.generate(generator);
},
DataBufferUtils::closeChannel)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: spring-projects/spring-framework
private Flux<String> chunks1K() {
return Flux.generate(sink -> {
StringBuilder sb = new StringBuilder();
do {
for (char c : "0123456789".toCharArray()) {
sb.append(c);
if (sb.length() + 1 == 1024) {
sink.next(sb.append("\n").toString());
return;
}
}
} while (true);
});
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<? extends Resource> transform(String content, Resource resource,
ResourceTransformerChain chain, ServerWebExchange exchange) {
if (!content.startsWith(MANIFEST_HEADER)) {
if (logger.isTraceEnabled()) {
logger.trace(exchange.getLogPrefix() +
"Skipping " + resource + ": Manifest does not start with 'CACHE MANIFEST'");
}
return Mono.just(resource);
}
return Flux.generate(new LineInfoGenerator(content))
.concatMap(info -> processLine(info, exchange, resource, chain))
.reduce(new ByteArrayOutputStream(), (out, line) -> {
writeToByteArrayOutputStream(out, line + "\n");
return out;
})
.map(out -> {
String hash = DigestUtils.md5DigestAsHex(out.toByteArray());
writeToByteArrayOutputStream(out, "\n" + "# Hash: " + hash);
return new TransformedResource(resource, out.toByteArray());
});
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void stateSupplierNull() {
Flux.generate(null, (s, o) -> s, s -> {
});
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void generatorNull() {
Flux.generate(() -> 1, null, s -> {
});
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void stateConsumerNull() {
Flux.generate(() -> 1, (s, o) -> s, null);
}
代码示例来源:origin: reactor/reactor-core
@Override
public Publisher<Long> createPublisher(long elements) {
return Flux.<Long, Long>generate(() -> 0L, (cursor, s) -> {
if(cursor < elements && cursor < elements) {
s.next(cursor);
}
else if(cursor == elements){
s.complete();
}
return cursor + 1;
})
.map(data -> data * 10)
.map( data -> data / 10)
.log("log-test", Level.FINE);
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void errorAfterMultipleItems() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> {
int i = ++idx;
subscriber.next(String.valueOf(i));
if (i == 3) {
subscriber.error(error);
}
return i;
});
Mono<Void> completion = publisher.as(this::sendOperator);
Signal<Void> signal = completion.materialize().block();
assertNotNull(signal);
assertSame("Unexpected signal: " + signal, error, signal.getThrowable());
assertEquals(3, this.writer.items.size());
assertEquals("1", this.writer.items.get(0));
assertEquals("2", this.writer.items.get(1));
assertEquals("3", this.writer.items.get(2));
assertSame(error, this.writer.error);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generatorMultipleOnErrors() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
o.error(new RuntimeException("forced failure"));
o.error(new RuntimeException("forced failure"));
}).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void stateConsumerCalled() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger stateConsumer = new AtomicInteger();
Flux.<Integer, Integer>generate(() -> 1, (s, o) -> {
o.complete();
return s;
}, stateConsumer::set).subscribe(ts);
ts.assertNoValues()
.assertComplete()
.assertNoError();
Assert.assertEquals(1, stateConsumer.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generatorThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
throw new RuntimeException("forced failure");
}).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextTest() {
StepVerifier.create(Flux.generate(s -> s.next(s.currentContext()
.get(AtomicInteger.class)
.incrementAndGet()))
.take(10)
.subscriberContext(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void stateSupplierThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer, Integer>generate(() -> {
throw new RuntimeException("forced failure");
}, (s, o) -> {
o.next(1);
return s;
}).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generateError() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
o.error(new RuntimeException("forced failure"));
}).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generateEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
o.complete();
}).subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generatorMultipleOnCompletes() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
o.complete();
o.complete();
}).subscribe(ts);
ts.assertNoValues()
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generatorMultipleOnNexts() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
o.next(1);
o.next(1);
}).subscribe(ts);
ts.assertValues(1)
.assertNotComplete()
.assertError(IllegalStateException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generateJust() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>generate(o -> {
o.next(1);
o.complete();
}).subscribe(ts);
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generateRange() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer, Integer>generate(() -> 1, (s, o) -> {
if (s < 11) {
o.next(s);
}
else {
o.complete();
}
return s + 1;
}).subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void generateJustBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.<Integer>generate(o -> {
o.next(1);
o.complete();
}).subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!