过滤Kafka中的重复消息

daolsyd0  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(215)

前言:
在我们的组织中,我们尝试使用kafka来解决一个问题,这个问题涉及到捕获oracle数据库中的更改并通过kafka发送。它实际上是一个疾病控制中心,我们正在使用Kafka连接器。
我们捕捉oracle中的变化,使用oracle flashback查询,这允许我们获得变化的时间戳和涉及的操作(insert、delete、update)。
一旦在我们观察到的表中做了一些更改,kafka连接器就会将其发布到一个主题中,我们使用kafka流进一步阅读这个主题。
问题是,有时flashback查询中会出现相等的行,因为表中的某些更新没有更改任何内容(这也会触发flashback更改),或者如果表有100列,而我们只观察20列,那么最终会在查询中看到重复的行,因为这20个字段都没有更改。
我们使用flashback获取更改的行(包括排除的行)。在连接器中,我们设置了timestamp+increment模式(timestamp由flashback查询的字段versions\u starttime获得)
重要提示:我们不能接触数据库超过这个,我的意思是,我们不能创建触发器,而不是使用这个已经闪回方案。
问题
我们正在尝试过滤Kafka中的记录,如果一些(键、值)在我们想要丢弃的内容中是相同的。注意,这并不完全是一次语义。记录将以较大的时间戳差异重复。
如果我使用ktable来检查某个记录的最后一个值,那么经过很长一段时间后,这会有多大的效率?
我的意思是,消费者的内部状态存储,由rocksdb和一个支持kafka主题来处理,因为如果我使用一个没有窗口的ktable,这个内部空间可能会非常大。
在这种情况下,哪种方法被认为是一种好方法?为了不使Kafka消费者的内部状态存储过载,同时能够知道实际记录是否早在一段时间前就已经处理过了。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题