我正在考虑下面的用例,并想验证这种方法在概念上是否有效。
目标是在spring中公开一个长时间运行的serversentedevent(sse)端点,为每个传入连接重播相同的kafka主题(使用一些特定于用户的过滤)。
苏格兰和南方能源公司是这样暴露的:
@GetMapping("/sse")
public SseEmitter sse() {
SseEmitter sseEmitter = new SseEmitter();
Executors
.newSingleThreadExecutor()
.execute(() -> dummyDataProducer.generate() // kafka ultimately
.forEach(payload -> {
try {
sseEmitter.send(payload);
} catch (IOException ex) {
sseEmitter.completeWithError(ex);
}
}));
return sseEmitter;
}
从另一边,有一个 KafkaListener
方法( ConcurrentKafkaListenerContainerFactory
已使用):
@KafkaListener(topics = "${app.kafka.topic1}")
public void receive(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer id,
@Payload Object payload) {
// do something ...
}
据我所知,kafka消费者应用程序使用一个线程从单个主题读取数据。这在某种程度上违背了使用sse的想法,即为每个传入连接创建一个专用的长时间运行的线程。
对于这个用例,这是一种有效的方法吗?如果是这样,如何正确地完成这一点?
暂无答案!
目前还没有任何答案,快来回答吧!