我是Kafka的新手。
我已经尝试了一些文件阅读的例子,并在我的项目应用了几个星期。然而,我的申请似乎没有工作,因为我想所以我请求您的建议。
我的目的是:
Kafka制作者从目录a读取文件。
storm使用从1生成的数据。
一旦读到文件就移到其他目录。
条件:
文件被连续发送到目录a。
这是一个简单的逻辑,但它让我头痛。
到目前为止,我已经在本地计算机eclipse上创建并测试了kafka生产者代码。
我想的是,因为Kafka制作者应该继续读取文件,所以即使读取了目录a中的所有文件,进程也必须保持活动状态。但是它会在目录a中的所有文件都被读取和发送后立即终止。
我用3个代理在一个节点上运行kafka,下面是producer属性设置。
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks", "1");
已使用以下命令创建主题。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
在Kafka的建筑观点中,我不断阅读文件的想法是错误的吗?还是有什么方法我还没找到?如果有人能回答我的问题,我将不胜感激。
2条答案
按热度按时间oaxa6hgo1#
使用Kafka连接
yruzcnhs2#
你应该使用
kafka.serializer.DefaultSerializer
(二进制)。如何监视文件夹中的新文件?你可以用一些
apache.commons.io.monitor
. 看看这里。你被困在哪里了?您需要解决哪些问题(错误消息,任何真正的问题)?问问题是因为看起来你想从某人那里得到一个完整的解决方案,而这并不是你所能提供的。深入研究并提出具体问题,当然,张贴代码。