有没有一个Kafka流的方法,以减少流的数字,只有“输出”时,数字被改变

slwdgvem  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(288)

我试图用Kafka蒸汽来减少一系列的数字,我只想在数据发生变化时记录出来。它工作得很完美,但问题是,如果运行代码的服务已关闭,它就无法从kafka获取数据。所以我猜答案是错的?我的代码:

KGroupedStream<String, JsonNode> groupedStream = filteredStream.groupByKey( Serdes.String(), jsonSerde);
KTable<String, JsonNode> reducedTable = groupedStream.reduce(
                (aggValue, newValue) ->  Calculate.newValue( newValue, aggValue, logger) ,/* adder */
                "reduced-stream-store" /* state store name */);
KStream<String, JsonNode> reducedStream =  reducedTable.toStream();

“计算”方法:

if (value != oldValue)
 return value
else return  null.

如果你有意见/建议,谢谢

shstlldc

shstlldc1#

return null 在您的代码中,将从结果表中删除条目。因此,您的代码没有达到预期的效果。
事实上,dsl操作符发出“on update”而不是“on change”,因此您不能将dsl用于您的用例。有一张罚单建议添加“emit on change”语义(https://issues.apache.org/jira/browse/kafka-8770).
作为解决方法,您需要使用自定义 transform() 用stat store代替。对于每个输入记录,检查它是否存在于存储中。如果没有,则发出记录并将其放入存储区。如果真的存在并且是相同的,不要发射任何东西。如果不同,则发出并更新存储。

相关问题