我知道一定有办法,但我想不出来。我需要停止Kafka消费者一旦我已经阅读了所有的消息从队列。有人能提供这方面的信息吗?
r8uurelv1#
如果您还没有死心塌地地使用scala客户机,请尝试kafkacat和 -e 选项告诉它在到达分区结尾时退出。e、 g.使用mytopic分区2的所有消息,然后退出:
-e
$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e
或使用最后3000条消息,然后退出:
$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e
ovfsdjhp2#
目前,Kafka版本2.11-2.1.1有一个脚本,名为 kafka-console-consumer.sh .它有一个新的标志: --timeout-ms .基本上,当没有新的日志等待时,这个标志是退出前等待的最长时间。它是毫秒级的。您可以使用此属性在读取所有消息后结束控制台使用者。
kafka-console-consumer.sh
--timeout-ms
zzwlnbp83#
您可以传递参数:-consumer timeout ms,并在启动consumer时提供一个值,如果在此期间没有读取任何消息,它将引发异常。例如,如果在最后2秒内没有新消息到达,则停止使用者:kafka.consumer.consoleconsumer-consumer timeout ms 2000您可以在这里看到这个和所有其他输入选项
iugsix8n4#
您可以使用simpleconsumershell,而不需要等待logend选项。请参阅systemtools simpleconsumershell例如:
./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend
4条答案
按热度按时间r8uurelv1#
如果您还没有死心塌地地使用scala客户机,请尝试kafkacat和
-e
选项告诉它在到达分区结尾时退出。e、 g.使用mytopic分区2的所有消息,然后退出:
或使用最后3000条消息,然后退出:
ovfsdjhp2#
目前,Kafka版本2.11-2.1.1有一个脚本,名为
kafka-console-consumer.sh
.它有一个新的标志:
--timeout-ms
.基本上,当没有新的日志等待时,这个标志是退出前等待的最长时间。它是毫秒级的。
您可以使用此属性在读取所有消息后结束控制台使用者。
zzwlnbp83#
您可以传递参数:-consumer timeout ms,并在启动consumer时提供一个值,如果在此期间没有读取任何消息,它将引发异常。例如,如果在最后2秒内没有新消息到达,则停止使用者:kafka.consumer.consoleconsumer-consumer timeout ms 2000
您可以在这里看到这个和所有其他输入选项
iugsix8n4#
您可以使用simpleconsumershell,而不需要等待logend选项。请参阅systemtools simpleconsumershell
例如: