java—使用reactor netty httpclient,如何配置客户机以使用flux发布服务器向服务器发送多个条目?

gmol1639  于 2021-07-07  发布在  Java
关注(0)|答案(1)|浏览(439)

更具体的问题是为什么flux.fromiterable()不能与reactor-netty-httpclient一起工作?这个简单的例子很好用。所有10项都由flux publisher发出:

public class ConcurrentLinkedQueueFluxTest {

    public static final void main(String[] args) {

        List<Integer> aList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
        aList.stream().forEach(i -> clq.add(i));

        Flux.fromIterable(clq)
                .subscribe(new ReactiveSubscriber<Integer>());
    }
}

使用reactivesubscriber:

class ReactiveSubscriber<T> extends BaseSubscriber<T> {

    private Subscription subscription;

    @Override
    public void hookOnSubscribe(Subscription s) {
        System.out.println("In hookOnSubscribe");
        this.subscription = s;
        subscription.request(1);
    }

    @Override
    public void hookOnNext(T response) {
        System.out.println("In hookOnNext: "+response.toString());
        subscription.request(1);
    }

    @Override
    public void hookOnError(Throwable t) {
        System.out.println(t.getLocalizedMessage());
    }

    @Override
    public void hookOnComplete() {
        System.out.println("In hookOnComplete");
        subscription.cancel();
    }
}

如下所示,如果我将类似的订阅服务器与httpclient.send(flux.fromiterable())一起使用,则只会发出一个项,而不是队列中的所有项。因此,由于创建通量的flux.fromiterable()方法不能与httpclient一起使用,因此有些配置不正确。
对于实际的生产代码问题,我已经包括了队列定义、客户机方法、订阅者、服务器方法和一个日志,其中只显示5个项目中的1个从客户机发送到服务器。看起来,即使httpclient send()方法从队列中加载了一个flux对象,也只发送一个项,尽管队列中有5个项。
要发送到服务器的项目位于bytebuf类型的队列中:

private ConcurrentLinkedQueue<ByteBuf> electionRequestQueue;

    public ElectionTransactionRequest() {
        electionRequestQueue = new ConcurrentLinkedQueue<ByteBuf>();
    }

客户端方法是:

public void task() {

        log.debug("Queue size: "+electionRequestQueue.size());

        ElectionTransactionSubscriber etSubscriber = new ElectionTransactionSubscriber();

        HttpClient.create()
             .tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
             .port(61005)
             .protocol(HttpProtocol.HTTP11)
             .post()
             .uri("/election/transaction")
             .send(Flux.fromIterable(electionRequestQueue))
             .responseContent()
             .aggregate()
             .asByteArray()
             .subscribe(etSubscriber);
     }

订户定义为:

class ElectionTransactionSubscriber extends BaseSubscriber<byte[]> {

    private static final Logger log = LoggerFactory.getLogger(ElectionTransactionSubscriber.class);

    private Subscription subscription;

    @Override
    public void hookOnSubscribe(Subscription s) {
        log.debug("In hookOnSubscribe");
        this.subscription = s;
        subscription.request(1);
    }

    @Override
    public void hookOnNext(byte[] response) {
        log.info("In hookOnNext");
        subscription.request(1);
    }

    @Override
    public void hookOnError(Throwable t) {
        log.error(t.getLocalizedMessage());
    }

    @Override
    public void hookOnComplete() {
        log.debug("In hookOnComplete");
        subscription.cancel();
    }
}

服务器端在方法中定义:

public void start() {

        disposableServer =
            HttpServer.create()
                .host("localhost")
                .port(61005)
                .protocol(HttpProtocol.HTTP11)
                .route(routes ->
                    routes
                        .post("/election/transaction",
                            (request, response) -> response.send(request
                                                                .receive()
                                                                .aggregate()
                                                                .flatMap(aggregatedBody ->
                                                                    electionTransactionHandler.electionTransactionResponse(aggregatedBody)))))
                        .bindNow();
        disposableServer.onDispose().block();

    }

当客户机运行时,队列中有5个项目,但只有一个项目被发送到服务器,如日志中所示。在订阅服务器中,只发送来自flux发布服务器的1个项目后,将调用hookoncomplete()方法。

2020-12-01 12:03:56,442 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionRequest: Queue size: 5

2020-12-01 12:03:56,539 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnSubscribe

2020-12-01 12:03:56,746 INFO  [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnNext

2020-12-01 12:03:56,746 DEBUG [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnComplete
3zwtqj6y

3zwtqj6y1#

缓冲可能是最容易使用的。它发出缓冲区(它们是集合) — 默认情况下,列表)。所以在您的情况下,订阅者将收到一个列表

StepVerifier.create(
    Flux.range(1, 10)
        .buffer(5, 3) //overlapping buffers
    )
        .expectNext(Arrays.asList(1, 2, 3, 4, 5))
        .expectNext(Arrays.asList(4, 5, 6, 7, 8))
        .expectNext(Arrays.asList(7, 8, 9, 10))
        .expectNext(Collections.singletonList(10))
        .verifyComplete();

=========================
这直接来自React堆堆芯的文件:
三种配料
当您有很多元素并且希望将它们分批处理时,reactor中有三种广泛的解决方案:分组、窗口和缓冲。这三者在概念上是相近的,因为它们将通量重新分配到一个集合中。分组和窗口创建通量,同时将聚合缓冲到集合中。使用通量分组
分组是将源流拆分为多个批的行为,每个批都匹配一个键。
关联的运算符是groupby。
每个组都表示为groupedflux,它允许您通过调用其key()方法来检索键。
这些小组的内容没有必要的连续性。一旦一个源元素产生了一个新的键,这个键的组就会被打开,并且与该键匹配的元素最终会出现在这个组中(可以同时打开几个组)。
这意味着:

Are always disjoint (a source element belongs to one and only one group).

Can contain elements from different places in the original sequence.

Are never empty.

以下示例按值是偶数还是奇数对值进行分组:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .groupBy(i -> i % 2 == 0 ? "even" : "odd")
        .concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
                .map(String::valueOf) //map to string
                .startWith(g.key())) //start with the group's key
    )
    .expectNext("odd", "1", "3", "5", "11", "13")
    .expectNext("even", "2", "4", "6", "12")
    .verifyComplete();

警告分组最适合于组数从中到低的情况。还必须强制使用这些组(例如由flatmap使用),以便groupby继续从上游获取数据并为更多的组提供数据。有时,这两个约束会成倍增加并导致挂起,例如当基数较高且使用组的flatmap的并发性太低时。使用flux
窗口化是根据大小、时间、边界定义 predicate 或边界定义发布者的标准将源流拆分为窗口的行为。
关联的运算符有window、windowtimeout、windowuntil、windowwhile和windowwhen。
与groupby不同,groupby根据传入的键随机重叠,窗口(大多数时间)是按顺序打开的。
不过,有些变体仍然可以重叠。例如,在window(int maxsize,int skip)中,maxsize参数是窗口关闭后的元素数,skip参数是源中打开新窗口后的元素数。因此,如果maxsize>skip,则会在前一个窗口关闭之前打开一个新窗口,两个窗口重叠。
以下示例显示重叠窗口:

StepVerifier.create(
    Flux.range(1, 10)
        .window(5, 3) //overlapping windows
        .concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
    )
        .expectNext(1, 2, 3, 4, 5)
        .expectNext(4, 5, 6, 7, 8)
        .expectNext(7, 8, 9, 10)
        .expectNext(10)
        .verifyComplete();

注意:使用反向配置(maxsize<skip),源中的一些元素被删除,并且不是任何窗口的一部分。
在通过windowuntil和windowwhile进行基于 predicate 的窗口化的情况下,具有与 predicate 不匹配的后续源元素也会导致空窗口,如下例所示:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .windowWhile(i -> i % 2 == 0)
        .concatMap(g -> g.defaultIfEmpty(-1))
    )
        .expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
        .expectNext(2, 4, 6) // triggered by 11
        .expectNext(12) // triggered by 13
        // however, no empty completion window is emitted (would contain extra matching elements)
        .verifyComplete();

通量缓冲
缓冲与窗口类似,但有以下扭曲:它不发射窗口(每个窗口都是通量),而是发射缓冲区(每个窗口都是集合) — 默认情况下,列表)。
用于缓冲的运算符与用于窗口的运算符相同:buffer、buffertimeout、bufferuntil、bufferwhile和bufferwhen。
当相应的窗口操作符打开一个窗口时,缓冲操作符创建一个新集合并开始向其中添加元素。当窗口关闭时,缓冲操作符发出集合。
缓冲还可能导致源元素丢失或缓冲区重叠,如下例所示:

StepVerifier.create(
    Flux.range(1, 10)
        .buffer(5, 3) //overlapping buffers
    )
        .expectNext(Arrays.asList(1, 2, 3, 4, 5))
        .expectNext(Arrays.asList(4, 5, 6, 7, 8))
        .expectNext(Arrays.asList(7, 8, 9, 10))
        .expectNext(Collections.singletonList(10))
        .verifyComplete();

与窗口化不同,bufferuntil和bufferwhile不会发出空缓冲区,如下例所示:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .bufferWhile(i -> i % 2 == 0)
    )
    .expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
    .expectNext(Collections.singletonList(12)) // triggered by 13
    .verifyComplete();

https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/advancedfeatures.adoc

相关问题