我们一直在研究Kafka的生态系统。让我来看看流程
源代码(sqlserver)->debezium(cdc)->kafka代理->kafka流(处理、连接等)->mongo连接器->mongo db
现在我们进入了最后一步,我们正在将处理过的数据插入mongodb,但现在我们需要向上插入数据,而不是仅仅插入。
我们可以从mongo sink connector获得upsert(插入/更新)功能吗。至于现在,我明白这是做不到的。
我们一直在研究Kafka的生态系统。让我来看看流程
源代码(sqlserver)->debezium(cdc)->kafka代理->kafka流(处理、连接等)->mongo连接器->mongo db
现在我们进入了最后一步,我们正在将处理过的数据插入mongodb,但现在我们需要向上插入数据,而不是仅仅插入。
我们可以从mongo sink connector获得upsert(插入/更新)功能吗。至于现在,我明白这是做不到的。
2条答案
按热度按时间x8goxv8g1#
请按照提供的链接,它有关于Kafka蒙戈连接器的所有信息。我已经成功地实现了upsert功能。您只需要仔细阅读这个文档。
Kafka连接器-mongodb
6vl6ewon2#
实际上,这是一个upsert,如果${uniquefieldtoupdateon}不在mongo中,我们希望插入,如果它存在,则更新如下。
以下配置状态:
将${uniquefieldtoupdateon}替换为要对其进行更新建模的记录所特有的字段。
allowlist(白名单)此字段用于
PartialValueStrategy
允许为id策略投影自定义值字段。replaceonebusinesskeystrategy意味着只更新上面声明的唯一字段引用的一个文档。