第一个例子wordcount没有正确计算第一圈

c0vxltue  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(279)

我正在研究kafka流,我对java8中的第一个wordcount示例有一个问题,这个示例取自文档。
使用最新版本的kafka streams、kafka connect和wordcount lambda表达式示例。
我遵循以下步骤:在kafka中创建一个输入主题,然后创建一个输出主题。启动应用程序流,然后通过从.txt文件中插入一些单词上传输入主题
在第一次计数时,在输出主题中,我看到单词正确分组,但计数是错误的。如果我尝试重新插入相同的单词,则先前不正确计数的连续计数都是正确的。
如果我使用使用者控制台查看输入主题转储,它已正确加载,并且没有脏数据。
第一次怎么算错了?
示例[第一个数据]:(Kafka输入主题)hi-hi-mike测试
(应用程序流正在运行)
(输出主题)hi 12 mike 4测试3(偶然计数)
[连续数据-在输入主题中发布相同的单词]
(输出主题)hi 14 mike 6测试4
[新尝试]
(输出主题)hi 16 mike 8测试5
等等。。。。

vhmi4jdf

vhmi4jdf1#

apache kafka中的wordcount演示有以下几行:

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

这意味着,当您重新启动应用程序时,如果没有存储在kafka中的wordcount应用程序的现有使用者偏移量,它将从一开始(“最早”)读取其输入主题。在Kafka,应用程序的消费者补偿在一定程度的应用程序不活动后过期,默认为24小时(参见 offsets.retention.minutes 代理配置)。
我可以想象发生了以下事情:
您在不久前尝试过kafka,并在输入主题中输入了测试数据。
然后你在恢复实验前休息了超过24小时。
现在,当应用程序重新启动时,它从一开始就恢复到重新读取输入主题,从而拾取较旧的测试输入数据,从而导致“膨胀”计数。
如果我使用使用者控制台查看输入主题转储,它已正确加载,并且没有脏数据。
您可以通过在添加cli选项的同时再次查看控制台使用者的输入主题来验证我的上述假设 --from-beginning (见https://kafka.apache.org/documentation/#quickstart_consume).

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning

这将显示主题“yourinputtopic”中的所有可用数据——减去在此期间可能已从kafka主题中清除的任何数据(默认代理配置将清除超过7天的数据,参见。 log.retention.hours ).

相关问题