我有一个案例,Kafka制作人每天发送两次数据。这些生产者从数据库/文件中读取所有数据并发送给Kafka。所以这些信息每天都在发送,而且是重复的。我需要消除重复的消息,并写在一些持久性存储使用Spark流。在这种情况下,删除重复消息的最佳方法是什么?
发送的重复消息是一个json字符串,其时间戳字段仅更新。
note:i can“不要将kafka producer更改为只发送新的数据/消息,它已安装在客户端计算机中并由其他人编写。”。
我有一个案例,Kafka制作人每天发送两次数据。这些生产者从数据库/文件中读取所有数据并发送给Kafka。所以这些信息每天都在发送,而且是重复的。我需要消除重复的消息,并写在一些持久性存储使用Spark流。在这种情况下,删除重复消息的最佳方法是什么?
发送的重复消息是一个json字符串,其时间戳字段仅更新。
note:i can“不要将kafka producer更改为只发送新的数据/消息,它已安装在客户端计算机中并由其他人编写。”。
6条答案
按热度按时间fxnxkyjh1#
一个更简单的方法是在Kafka结束时解决这个问题。看看Kafka的日志压缩功能。如果记录具有相同的唯一密钥,它将为您消除记录中的重复数据。
https://kafka.apache.org/documentation/#compaction
wdebmtf22#
您可以使用一个键值数据存储,其中您的键将是不包括timestamp字段和实际json值的字段的组合。
在轮询记录时,创建键和值对,并将其写入数据存储,该数据存储处理upsert(insert+update)或检查该键是否存在于数据存储中,然后删除消息
我建议您检查hbase(处理upserts)和redis(用于查找的内存数据存储)
fhg3lkii3#
你可以试着用
mapWithState
. 检查我的答案。rhfm7lfc4#
你调查过这个吗:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-重复数据消除
您可以尝试使用dropduplicates()方法。如果需要使用多个列来确定重复项,可以使用dropduplicates(string[]colnames)传递它们。
tcbh2hod5#
您可以将主题配置更改为
compact
模式。通过压缩,具有相同密钥的记录将在kafka日志中被覆盖/更新。在那里你只能从Kafka那里得到钥匙的最新值。你可以在这里阅读更多关于压实的内容。
wpx232ag6#
对于重复数据消除,您需要将有关已处理内容的信息(例如消息的唯一ID)存储在某个位置。
要存储邮件,您可以使用:
Spark检查站。优点:开箱即用。缺点:如果你更新应用程序的源代码,你需要清理检查点。因此,您将丢失信息。如果对重复数据消除的要求不严格,解决方案可以工作。
任何数据库。例如,如果您在hadoop env上运行,那么可以使用hbase。对于每一条您确实“得到”的消息(检查它以前没有发送过),并在db sent中标记它真正发送的时间。