阻止quarkus/vertxReact消息中的kafka消息处理程序

nukf8bse  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(240)

我有一个从kafka队列读取的quarkus应用程序,处理消息可能需要几秒钟。它们可以并行处理。但是,我无法让kafka消费者使用React式消息传递应用程序提供的工作线程池从队列中并行读取消息。
我的代码:

public class SlowKafkaConsumer {

    @Blocking(ordered = false)
    @Incoming("topic")
    public void onMessage(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " " + Instant.now().toString() + " " + message);
    }
}

我用thread.sleep来模拟一个慢消费者,但是我把 @Blocking 以及 ordered=false 在方法上。在这些文档中,我希望能够同时处理20条(默认线程池大小)消息。
但是,我得到的程序输出是:

vert.x-worker-thread-0 2020-11-13T13:25:09.867905300Z Hello
vert.x-worker-thread-1 2020-11-13T13:25:10.885309700Z Hello
vert.x-worker-thread-2 2020-11-13T13:25:11.887364600Z Hello
vert.x-worker-thread-3 2020-11-13T13:25:12.889545300Z Hello

正如您所看到的,每个消息都由不同的线程处理(我可以看到它最终使用了多达20个不同的线程),但是每个消息都是按顺序处理的,每个消息之间有1秒的间隔,而不是并发处理20个。
有人能帮我指出哪里出了问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题