kafka添加了一个新特性,在连接器中使用regex,但是在连接器启动之后,新添加的主题中的主题数据似乎在连接器重新启动之前不会被使用。我们需要动态添加新主题,并让连接器根据连接器属性中定义的regex使用该主题。如何实现?例如:regex:topic-.*topic:topic-1,topic-2如果我引入了新的topic-3,那么如何使连接器在不重新启动的情况下使用topic数据?
at0kjp5o1#
遵循其他人在评论中已经给出的想法,基本上你需要做什么构建一个机制,确定一个新的主题已经被引入,connecter需要干净地重新启动。我会做这样的事,1> 在已连接的主题(例如主题1)中发送特定类型的消息,如果收到此类消息,则代码应保留所有新消息轮询,并等待所有偏移提交完成。2> 然后从轮询循环中断并从使用者(consumer.unsubscribe())中删除订阅。3> 在从regex主题订阅的常规流之后,需要遵循在开始时完成的流程,因为新主题现在将成为regex的一部分。请记住提交很重要,如果您匆忙重新启动connecter,可能会得到重复的提交。还有一点很明显,就是不要更改group.id并保持auto.offset.reset为“最新”。
jm2pwxwz2#
Kafka消费者有一个选择 metadata.max.age.ms -使用者刷新主题元数据的时间间隔。如果你不需要实时的话,它会有帮助的。另请参见:kafka consumer to dynamic detect topics added在 /etc/kafka-connect/kafka-connect.properties 您应该指定 consumer.metadata.max.age.ms=1000 一秒钟。
metadata.max.age.ms
/etc/kafka-connect/kafka-connect.properties
consumer.metadata.max.age.ms=1000
2条答案
按热度按时间at0kjp5o1#
遵循其他人在评论中已经给出的想法,基本上你需要做什么构建一个机制,确定一个新的主题已经被引入,connecter需要干净地重新启动。
我会做这样的事,
1> 在已连接的主题(例如主题1)中发送特定类型的消息,如果收到此类消息,则代码应保留所有新消息轮询,并等待所有偏移提交完成。
2> 然后从轮询循环中断并从使用者(consumer.unsubscribe())中删除订阅。
3> 在从regex主题订阅的常规流之后,需要遵循在开始时完成的流程,因为新主题现在将成为regex的一部分。
请记住提交很重要,如果您匆忙重新启动connecter,可能会得到重复的提交。还有一点很明显,就是不要更改group.id并保持auto.offset.reset为“最新”。
jm2pwxwz2#
Kafka消费者有一个选择
metadata.max.age.ms
-使用者刷新主题元数据的时间间隔。如果你不需要实时的话,它会有帮助的。另请参见:kafka consumer to dynamic detect topics added在
/etc/kafka-connect/kafka-connect.properties
您应该指定consumer.metadata.max.age.ms=1000
一秒钟。