读取所有消息后终止kafka console consumer

nafvub8i  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(693)

我知道一定有办法,但我想不出来。我需要停止Kafka消费者一旦我已经阅读了所有的消息从队列。
有人能提供这方面的信息吗?

r8uurelv

r8uurelv1#

如果您还没有死心塌地地使用scala客户机,请尝试kafkacat和 -e 选项告诉它在到达分区结尾时退出。
e、 g.使用mytopic分区2的所有消息,然后退出:

$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e

或使用最后3000条消息,然后退出:

$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e
ovfsdjhp

ovfsdjhp2#

目前,Kafka版本2.11-2.1.1有一个脚本,名为 kafka-console-consumer.sh .
它有一个新的标志: --timeout-ms .
基本上,当没有新的日志等待时,这个标志是退出前等待的最长时间。它是毫秒级的。
您可以使用此属性在读取所有消息后结束控制台使用者。

zzwlnbp8

zzwlnbp83#

您可以传递参数:-consumer timeout ms,并在启动consumer时提供一个值,如果在此期间没有读取任何消息,它将引发异常。例如,如果在最后2秒内没有新消息到达,则停止使用者:kafka.consumer.consoleconsumer-consumer timeout ms 2000
您可以在这里看到这个和所有其他输入选项

iugsix8n

iugsix8n4#

您可以使用simpleconsumershell,而不需要等待logend选项。请参阅systemtools simpleconsumershell
例如:

./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend

相关问题