如何使用一个不断增长的文件作为ApacheKafka生产者,并只读取新附加的数据

ycl3bljg  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(571)

我想用文件作为我的Kafka制作人。源文件持续增长(例如每秒20条记录/行)。下面是一篇与我的问题类似的帖子:
如何向Kafka制作人写入文件
但在这种情况下,每次在文件中插入新行时,都会读取整个文件并将其添加到kafka主题中。我只希望新添加的行被发送到主题(即,如果文件已经包含10行,并且又添加了4行,那么只需要将这4行发送到主题)。
有没有办法做到这一点??
其他解决方案:
apacheflume使用源类型为'spooldir'。但它没有用,因为它从添加到目录的新文件中读取数据,而不是将数据附加到已读取的文件中。
我们还尝试将flume源类型设置为“exec”,命令设置为“tail–f/path/file name”。这似乎也不管用。
使用任何其他工具的建议也受到欢迎,因为我的目标是实时读取文件中的数据(即,一旦数据插入到文件中,我就需要数据)。

eufgjt7s

eufgjt7s1#

有几个选择,你可以看看,根据你的具体需要。
Kafka连接
正如上面chin huang所说的,kafka connect的文件源连接器应该能够在不安装其他软件的情况下执行您想要的操作。查看connect快速启动以获得如何启动和运行它的指导,实际上他们有一个将文件读入kafka的示例。
贮木场
logstash是类似这样的东西的经典选项,它的kafka输出可以为一个或多个文件执行您希望它执行的操作。下面的配置应该大致满足您的需求。

input {
  file {
    path => "/path/to/your/file"
  }
output {
    kafka {
        bootstrap_servers => "127.0.0.1:9092"
        topic_id => "topicname"
    }
}

filebeat公司
filebeat与logstash非常相似,如果您想对从文件中读取的数据执行额外的处理,它只提供较少的功能。而且,它是用go而不是java编写的,因此在运行它的机器上占用的空间应该更小。以下应该是一个最低限度的配置,让您开始(从内存中,您可能需要添加一个或两个参数,如果它们是必需的):

filebeat.prospectors:
- type: log
  paths:
    - /path/to/your/file

output.kafka:
  hosts: ["127.0.0.1:9092"]
  topic: 'topicname'

渡槽
如果你想重温你的Flume选项,看看taildirsource,我没有用过它,但听起来它应该很适合你的用例。

相关问题