发布第一个插入: {"Customer_id": 2, "transaction_id": "1", "idd": [999, 1111], "id": 1}
然后是第二个: {"Customer_id": 2, "transaction_id": "2", "idd": [9, 10], "id": 1}
要求结果: {"Customer_id": 2, "transaction_id": "2", "idd": [[9, 10] , [999, 1111]], "id": 1}
我得到的是: {"Customer_id": 2, "transaction_id": "2", "idd": [9, 10] , "id": 1}
更新策略此更新数组不追加,但我的预期结果追加数组
配置:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=customer_id,transaction_id
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
我们如何通过mongo kafka接收器连接器实现这一点
1条答案
按热度按时间lfapxunr1#
看起来你在尝试聚合流数据
除非mongodb能够通过插入相同的文档id(我对此表示怀疑,因为它怎么知道您只想收集idd字段?),否则您必须使用kafka streams/ksql或其他有状态处理层来聚合您的值。接收器连接器只是将它看到的任何东西转发到数据库中;连接器不知道以前的记录