我正在尝试处理一个用例,它需要处理来自Kafka主题的消息,这个主题只有1分钟。Kafka有没有办法只读一分钟的信息?提前谢谢。
gwo2fgha1#
您可能能够利用0.11.0.0中引入的重置偏移工具。一个问题是它是一个命令行工具,而且还没有针对它的编程api(目前还没有)。但是您可以将应用程序与该工具同步(或在应用程序中使用该工具),以将分区的偏移量重置为1分钟前的偏移量,并从中使用:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test.group --topic foo:0,1 --by-duration P1M
这将重置主题的分区0和分区1的偏移量 foo 到每个分区中的第一条消息,时间戳在1分钟之前。您可以检查消息的时间戳来决定它是否符合处理条件(根据您的用例)。
foo
rdrgkggo2#
简单地说,答案是否定的。kafka消费者的消费是基于获得队列中最新的消息或最早的消息。查看文档(搜索auto.offset.reset)我认为您应该做的是在您的消费应用程序中保留一个消息缓冲区。使您的缓冲区只保存1分钟的消息,并删除超过1分钟的消息。这样,缓冲区中最早的消息总是1分钟。我就是这么做的。
mqkwyuun3#
您将能够使用kafka流、状态存储和处理器来实现这一点。下面的解决方案可以帮助您在1分钟后处理邮件,但您仍然会立即使用这些邮件。创建状态存储并将其添加到流生成器。使用该生成器创建流,并使用上面创建的状态存储添加处理器。使用处理器供应商来处理每条消息。您可以使用process()将所有消息保存在状态存储中。将标点符号()设置为60000毫秒,并使用标点符号()获取经过1分钟延迟的消息并对其进行处理。希望这有帮助。
3条答案
按热度按时间gwo2fgha1#
您可能能够利用0.11.0.0中引入的重置偏移工具。一个问题是它是一个命令行工具,而且还没有针对它的编程api(目前还没有)。但是您可以将应用程序与该工具同步(或在应用程序中使用该工具),以将分区的偏移量重置为1分钟前的偏移量,并从中使用:
这将重置主题的分区0和分区1的偏移量
foo
到每个分区中的第一条消息,时间戳在1分钟之前。您可以检查消息的时间戳来决定它是否符合处理条件(根据您的用例)。rdrgkggo2#
简单地说,答案是否定的。kafka消费者的消费是基于获得队列中最新的消息或最早的消息。查看文档(搜索auto.offset.reset)
我认为您应该做的是在您的消费应用程序中保留一个消息缓冲区。使您的缓冲区只保存1分钟的消息,并删除超过1分钟的消息。这样,缓冲区中最早的消息总是1分钟。
我就是这么做的。
mqkwyuun3#
您将能够使用kafka流、状态存储和处理器来实现这一点。下面的解决方案可以帮助您在1分钟后处理邮件,但您仍然会立即使用这些邮件。
创建状态存储并将其添加到流生成器。使用该生成器创建流,并使用上面创建的状态存储添加处理器。使用处理器供应商来处理每条消息。您可以使用process()将所有消息保存在状态存储中。将标点符号()设置为60000毫秒,并使用标点符号()获取经过1分钟延迟的消息并对其进行处理。
希望这有帮助。