kafka connect filestreamsource忽略附加行

wribegjk  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(339)

我正在用spark开发一个处理日志的应用程序,我想用kafka作为从日志文件中传输数据的一种方式。基本上,我有一个日志文件(在本地文件系统上),它不断地用新的日志更新,kafka connect似乎是从文件中获取数据以及新的附加行的完美解决方案。
我使用以下命令以默认配置启动服务器:
zookeeper服务器: zookeeper-server-start.sh config/zookeeper.properties zookeeper.properties属性

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

Kafka服务器: kafka-server-start.sh config/server.properties 服务器属性

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]

然后我创建了主题“连接测试”: kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test 最后我运行Kafka连接器: connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties connect-standalone.properties属性

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connect-file-source.properties连接文件

name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test

首先,我通过运行一个简单的控制台使用者来测试连接器: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning 一切都很好地工作,消费者从文件接收日志,当我添加日志时,消费者不断更新新的日志。
(然后我尝试了spark作为“消费者”遵循以下指南:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-两个直接进近无接收机,一切正常)
之后,我从日志文件中删除了一些日志并更改了主题(我删除了“connect test”主题,创建了另一个主题,并用新主题编辑了connect-file-source.properties)。
但现在连接器的工作方式已经不一样了。使用console consumer时,我只获取文件中已经存在的日志,并且我添加的每一行都会被忽略。也许更改主题(和/或修改日志文件中的数据)而不更改连接器名称会破坏kafka中的某些内容。。。
这就是kafka connect对我的主题“新主题”和连接器“新文件连接器”所做的:

[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

(即使在文件中添加新行后,它仍会刷新0条未完成的消息)
所以我尝试重新开始:我删除了/tmp/kafka logs目录,/tmp/connect.offset文件,并使用了一个全新的主题名、连接器名和日志文件,以防万一。但是,连接器仍然忽略了新日志。。。我甚至试着删除我的Kafka,从档案中重新提取它,然后再次运行整个过程(以防Kafka中发生变化),但没有成功。
我不知道问题出在哪里,任何帮助都将不胜感激!

acruukt9

acruukt91#

kafka connect不会“监视”或“跟踪”文件。我不相信任何地方都有文件证明它会这样做。
我想说,对于阅读活动日志来说,它甚至不如使用spark streaming来查看文件夹有用。spark将“识别”新创建的文件。Kafka连接 FileStreamSource 必须指向一个预先存在的、不可变的文件。
要让spark使用活动日志,您需要执行“日志旋转”操作—即,当文件达到最大大小或某个条件(如某个时间段(如一天)结束时),此过程会将活动日志移动到spark正在监视的目录,然后它处理启动一个新的日志文件,以便应用程序继续写入。
如果您希望主动监视文件并将其吸收到kafka中,那么可以使用filebeat、fluentd或apacheflume。

snvhrwxg

snvhrwxg2#

按文档:
filestream连接器示例旨在展示一个简单的连接器是如何作为用户或开发人员运行的。不建议用于生产。
我将使用类似filebeat的东西(带有kafka输出)来将日志摄取到kafka中。或者kafka connect spooldir,如果您的日志不是直接附加的,而是放置在文件夹中供接收的独立文件。

相关问题