我有一个用例,我想在Kafka上实现同步请求/响应。例如,当用户发送一个HTTP请求时,我想在一个特定的kafka输入主题上生成一个消息,该消息触发一个数据流,最终在一个输出主题上生成一个响应。然后,我想使用来自输出主题的消息,并将响应返回给调用者。
工作流程为:HTTP请求-〉在输入主题上生成消息-〉(使用输入主题的消息-〉应用逻辑-〉在输出主题上生成消息)-〉使用输出主题的消息-〉HTTP响应。
为了实现这种情况,在接收到第一个HTTP请求时,我希望能够动态地创建一个消费者,它将从输出主题消费,* 在 * 产生输入主题上的消息之前。否则,输出主题上的消息有可能“丢失”。在我的情况下,消费者有一个随机的group.id
,并且由于应用程序的原因,有auto.offset.reset
= latest
。
我的问题是如何确保消费者在生成消息之前已经准备好了。我确保在生成消息之前调用SubscribeTopics
。但是在目前为止的测试中,当没有提交的偏移量并且Kafka将偏移量重置为最新时,有一种可能性,消息丢失,我的消费者从来没有读过,因为Kafka * 有时 * 认为消费者注册 * 后 * 消息已经产生。
到目前为止,我的解决方法是在创建消费者之后休眠一段时间,以便在生成消息之前允许Kafka继续执行commit reset工作流。
我还尝试在一个重新平衡回调(由订阅主题的使用者触发)中实现逻辑,其中我使用offset = latest为主题分区调用assign
,但这似乎没有解决我的问题。
希望有比睡觉更好的解决办法。
1条答案
按热度按时间kpbwa7wx1#
大多数HTTP客户端库都有一个隐式超时。不能保证你的消费者会消费一个事件,或者下游生产者会向“响应主题”发送数据。
相反,让您的初始请求立即返回带有某个跟踪ID的201 Accepted状态(例如,如果您进行请求验证,则返回400)。然后要求按ID轮询GET请求,以获取请求主体中带有404状态或200 +某个状态字段的状态更新。
您将需要一个数据库来存储中间状态。