我有一个从kafka队列读取的quarkus应用程序,处理消息可能需要几秒钟。它们可以并行处理。但是,我无法让kafka消费者使用React式消息传递应用程序提供的工作线程池从队列中并行读取消息。
我的代码:
public class SlowKafkaConsumer {
@Blocking(ordered = false)
@Incoming("topic")
public void onMessage(String message) throws InterruptedException {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " " + Instant.now().toString() + " " + message);
}
}
我用thread.sleep来模拟一个慢消费者,但是我把 @Blocking
以及 ordered=false
在方法上。在这些文档中,我希望能够同时处理20条(默认线程池大小)消息。
但是,我得到的程序输出是:
vert.x-worker-thread-0 2020-11-13T13:25:09.867905300Z Hello
vert.x-worker-thread-1 2020-11-13T13:25:10.885309700Z Hello
vert.x-worker-thread-2 2020-11-13T13:25:11.887364600Z Hello
vert.x-worker-thread-3 2020-11-13T13:25:12.889545300Z Hello
正如您所看到的,每个消息都由不同的线程处理(我可以看到它最终使用了多达20个不同的线程),但是每个消息都是按顺序处理的,每个消息之间有1秒的间隔,而不是并发处理20个。
有人能帮我指出哪里出了问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!