如上所述,我目前正在设置一个kafka连接接收器,将数据从kafka接收到google云存储。
一切都进展顺利,但是-它只是使用最新的可用抵消。也就是说,一旦它开始运行,它只接收新生成的消息到gcs,而不是来自kafka的现有消息。我尝试过删除kafka connect存储/偏移主题,创建新的连接器名称等。但是,它总是从最新的偏移开始。
是否有任何配置Kafka连接地面军事系统接收器的最早偏移量?我没有看到任何配置来处理这个问题
https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html
或
https://docs.confluent.io/current/connect/references/allconfigs.html
我尝试过删除任何kafka connect主题/文件存储,以及从新的连接器名称开始
我看到kafka connect接收器消息是在连接器启动后生成的。
我期望/需要消息从最早的可用偏移量开始下沉,即如果连接器没有提交偏移量,则从最早的消息开始下沉
1条答案
按热度按时间9w11ddsr1#
第一次创建连接器时,默认情况下需要
earliest
抵消。您应该在connect worker日志中看到:您可以通过在worker配置中更改来覆盖此设置:
consumer.auto.offset.reset
.删除并重新创建连接器时,偏移量将保留并重新使用。
如果使用新名称创建连接器,它将使用连接辅助进程中设置的偏移量(
earliest
)默认情况下。