我正在建立一个kafka连接分布式模式应用程序,它将是一个kafka到s3的管道。我使用的是Kafka0.10.1.0-1和Kafka连接3.1.1-1。到目前为止,事情进展顺利,但对于我正在使用的更大系统来说,一个重要的方面是需要知道kafka->filesystem管道的偏移量信息。根据文件记载 offset.storage.topic
配置将是分布式模式应用程序用于存储偏移信息的位置。考虑到Kafka如何在“新”Kafka中存储消费者补偿,这是有道理的。但是,在使用filestreamsinkconnector进行了一些测试之后,没有任何内容写入我的系统 offset.storage.topic
默认值: connect-offsets
.
具体来说,我使用python-kafka生产者将数据推送到主题,并使用kafka-connect和filestreamsinkconnect将数据从主题输出到文件。它的工作和行为与我期望的连接器的行为相同。此外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态,并且没有数据重复。然而,当我去 offset.storage.topic
要查看存储了什么偏移量元数据,主题中没有任何内容。
这是我使用的命令: kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning
在让此命令运行一分钟左右后,我收到以下消息: Processed a total of 0 messages
总之,我有两个问题:
为什么即使我的分布式应用程序保持正确的状态,偏移量元数据也没有写入应该存储它的主题?
如何访问kafka connect分布式模式应用程序的偏移量元数据信息?这对于我的团队实现系统的lambda架构是100%必要的。
谢谢你的帮助。
3条答案
按热度按时间yuvru6vn1#
liju是正确的,connect offset用于跟踪源连接器的偏移量(有生产者,但没有消费者)。接收器连接器有一个耗电元件和跟踪偏移通常的方式-\uu耗电元件\u偏移主题
查看最后提交的偏移量的最佳方法是使用consumer group工具:
bin/kafka-consumer-groups.sh—组连接弹性登录连接器—引导服务器localhost:9092 --describe
组名总是“connect-”和连接器名(在我的例子中是elastic login connector)。这将显示该组提交的最新偏移量,该组基本上确认所有到此偏移量之前的消息都已写入弹性体。
jaxagkaj2#
confluent发布的新s3连接器可能会引起您的兴趣。
从您描述的内容来看,也许它可以大大简化您将记录从kafka导出到s3存储桶的目标。
fiei3ece3#
偏移量可能正在提交到kafka默认偏移量提交主题,即\u consumer\u offsets