已锁定7天。此时正在解决disputes about this question’s content。当前不接受新答案或交互。
我有一个非常简单的React堆Kafka消费者。
但是,为了正确处理消息,应该事先执行几个@Postconstruct。
@Postconstruct将初始化内存中的一些对象,准备数据等...
如果消息开始被使用,但是@Postconstruct没有完成,那么消息处理肯定会失败。
这个@Postconstruct需要一些时间来完成,大概需要3秒左右。因此,目前,在我的代码中,我只处理消息,在前30秒,所有消息处理都将失败,我将把失败的消息放在另一个主题中。
在某个时候,大约30秒后,@Postconstruct完成,消息被正常处理,很高兴。
然而,我发现这是非常多余的,我有另一个主题和另一个工作的存在只是在这里重新处理消息失败在这3秒钟。
有没有一种更简单的方法,告诉React堆Kafka只开始消费,只有当@后构造完成(也许通过发送一些信号在应用程序),或者甚至只是等待3 ish秒?
到目前为止,我尝试了这个:
public Flux<String> myConsumer() {
return KafkaReceiver
.create(receiverOptions)
.receive()
.delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
.flatMap((oneMessage) ->
Mono.deferContextual(contextView -> {
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
try (scope) {
return consume(oneMessage);
}
}), 500)
.name("greeting.call") //1
.tag("latency", "low") //2
.tap(Micrometer.observation(observationRegistry));
但是,它并不是在做我所期望的事情,它只是在拖延。
1条答案
按热度按时间8hhllhi21#
你不应该在构造后的方法中将
subscribe()
应用到flux中,而应该在start()
中实现SmartLifecycle
并订阅。或者使用事件侦听器,并且仅在刷新上下文后订阅。
这样,在您开始接收记录之前,所有bean都将完全连接起来。