Kafka客户端库(confluent-kafka-go):在auto.offset.reset = latest的情况下,消费者和生产者之间的同步

ru9i0ody  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(130)

我有一个用例,我想在Kafka上实现同步请求/响应。例如,当用户发送一个HTTP请求时,我想在一个特定的kafka输入主题上生成一个消息,该消息触发一个数据流,最终在一个输出主题上生成一个响应。然后,我想使用来自输出主题的消息,并将响应返回给调用者。
工作流程为:HTTP请求-〉在输入主题上生成消息-〉(使用输入主题的消息-〉应用逻辑-〉在输出主题上生成消息)-〉使用输出主题的消息-〉HTTP响应。
为了实现这种情况,在接收到第一个HTTP请求时,我希望能够动态地创建一个消费者,它将从输出主题消费,* 在 * 产生输入主题上的消息之前。否则,输出主题上的消息有可能“丢失”。在我的情况下,消费者有一个随机的group.id,并且由于应用程序的原因,有auto.offset.reset = latest
我的问题是如何确保消费者在生成消息之前已经准备好了。我确保在生成消息之前调用SubscribeTopics。但是在目前为止的测试中,当没有提交的偏移量并且Kafka将偏移量重置为最新时,有一种可能性,消息丢失,我的消费者从来没有读过,因为Kafka * 有时 * 认为消费者注册 * 后 * 消息已经产生。
到目前为止,我的解决方法是在创建消费者之后休眠一段时间,以便在生成消息之前允许Kafka继续执行commit reset工作流。
我还尝试在一个重新平衡回调(由订阅主题的使用者触发)中实现逻辑,其中我使用offset = latest为主题分区调用assign,但这似乎没有解决我的问题。
希望有比睡觉更好的解决办法。

kpbwa7wx

kpbwa7wx1#

大多数HTTP客户端库都有一个隐式超时。不能保证你的消费者会消费一个事件,或者下游生产者会向“响应主题”发送数据。
相反,让您的初始请求立即返回带有某个跟踪ID的201 Accepted状态(例如,如果您进行请求验证,则返回400)。然后要求按ID轮询GET请求,以获取请求主体中带有404状态或200 +某个状态字段的状态更新。
您将需要一个数据库来存储中间状态。

相关问题