我已经使用Kafka源连接器从couchbase到Kafka的文件。然后将这些文档复制到mongo db。couchbase-->源连接器-->Kafka-->接收器连接器-->mongo如果源连接器关闭,那么如何再次将所有文档同步到Kafka?有没有什么get和touch功能可以将Kafka主题在关闭期间所做的所有更改显示出来?
ldxq2e6h1#
如果您正在询问如何处理源连接器关闭时发生的文档更改,则不需要执行任何操作。kafka connect存储源连接器的状态(偏移量),并将恢复streamtask状态,然后从停止的位置继续。couchbase源连接器支持这一点,正如我们在这里的代码中所看到的,这里使用它来初始化带有保存的偏移量的dcp流。如果您询问如何重置连接器并从头开始重新流式传输整个bucket,那实际上就不那么容易了。据我所知,在Kafka没有内置的方法来重置连接器的偏移量-有一个正在审查的kip与此相关:kip-199除非得到官方支持,我所知道的重置连接器状态的最佳方法是更改配置以使用不同的主题来保存偏移量,这是黑客和遗留作为一个潜在的问题,旧的偏移,或实际上编辑保存的偏移如这里所述。我决不主张在生产系统上做这两件事,所以请用你自己的判断。
1条答案
按热度按时间ldxq2e6h1#
如果您正在询问如何处理源连接器关闭时发生的文档更改,则不需要执行任何操作。kafka connect存储源连接器的状态(偏移量),并将恢复streamtask状态,然后从停止的位置继续。couchbase源连接器支持这一点,正如我们在这里的代码中所看到的,这里使用它来初始化带有保存的偏移量的dcp流。
如果您询问如何重置连接器并从头开始重新流式传输整个bucket,那实际上就不那么容易了。据我所知,在Kafka没有内置的方法来重置连接器的偏移量-有一个正在审查的kip与此相关:kip-199除非得到官方支持,我所知道的重置连接器状态的最佳方法是更改配置以使用不同的主题来保存偏移量,这是黑客和遗留作为一个潜在的问题,旧的偏移,或实际上编辑保存的偏移如这里所述。我决不主张在生产系统上做这两件事,所以请用你自己的判断。