通过kafka mongo sink connector在mongo的内联json数组对象中追加字段值

yvfmudvl  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(400)

发布第一个插入: {"Customer_id": 2, "transaction_id": "1", "idd": [999, 1111], "id": 1} 然后是第二个: {"Customer_id": 2, "transaction_id": "2", "idd": [9, 10], "id": 1} 要求结果: {"Customer_id": 2, "transaction_id": "2", "idd": [[9, 10] , [999, 1111]], "id": 1} 我得到的是: {"Customer_id": 2, "transaction_id": "2", "idd": [9, 10] , "id": 1} 更新策略此更新数组不追加,但我的预期结果追加数组
配置:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=customer_id,transaction_id
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

我们如何通过mongo kafka接收器连接器实现这一点

lfapxunr

lfapxunr1#

看起来你在尝试聚合流数据
除非mongodb能够通过插入相同的文档id(我对此表示怀疑,因为它怎么知道您只想收集idd字段?),否则您必须使用kafka streams/ksql或其他有状态处理层来聚合您的值。接收器连接器只是将它看到的任何东西转发到数据库中;连接器不知道以前的记录

相关问题