如何连续读取Kafka的文件?

zsohkypk  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(377)

我是Kafka的新手。
我已经尝试了一些文件阅读的例子,并在我的项目应用了几个星期。然而,我的申请似乎没有工作,因为我想所以我请求您的建议。
我的目的是:
Kafka制作者从目录a读取文件。
storm使用从1生成的数据。
一旦读到文件就移到其他目录。
条件:
文件被连续发送到目录a。
这是一个简单的逻辑,但它让我头痛。
到目前为止,我已经在本地计算机eclipse上创建并测试了kafka生产者代码。
我想的是,因为Kafka制作者应该继续读取文件,所以即使读取了目录a中的所有文件,进程也必须保持活动状态。但是它会在目录a中的所有文件都被读取和发送后立即终止。
我用3个代理在一个节点上运行kafka,下面是producer属性设置。

Properties props = new Properties();

props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks", "1");

已使用以下命令创建主题。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test

在Kafka的建筑观点中,我不断阅读文件的想法是错误的吗?还是有什么方法我还没找到?如果有人能回答我的问题,我将不胜感激。

oaxa6hgo

oaxa6hgo1#

使用Kafka连接


# File connect-standalone.properties

# bootstrap kafka servers

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# local file storing offsets and config data

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:\\KafkaSetup\\kafka_2.13-2.4.0\\libs

# File myFileConnector.properties

name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=C:\\KafkaSetup\\input\\data.txt
topic=aryan_topic

# Command

C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>connect-standalone.bat C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\connect-standalone.properties C:\\KafkaSetup\\kafka_2.13-2.4.0\\config\\myFileConnector.properties 

# data.txt contains

Hello By Arun

# Kafka client

C:\KafkaSetup\kafka_2.13-2.4.0\bin\windows>kafka-console-consumer --bootstrap-server 127.0.0.1:9094 --topic aryan_topic
i]3
gh
"Hello By Arun"
yruzcnhs

yruzcnhs2#

你应该使用 kafka.serializer.DefaultSerializer (二进制)。
如何监视文件夹中的新文件?你可以用一些 apache.commons.io.monitor . 看看这里。
你被困在哪里了?您需要解决哪些问题(错误消息,任何真正的问题)?问问题是因为看起来你想从某人那里得到一个完整的解决方案,而这并不是你所能提供的。深入研究并提出具体问题,当然,张贴代码。

相关问题