我正在尝试使用flume 1.6来读取我的源文件(以管道分隔的文本文件)并将它们提供给kafka。
所有的管道似乎运作良好,所有的记录都成功地进入Kafka。然而,flume似乎在每个数据行的开头添加了nul和stx控制字符。
这对我来说是个问题,因为最终我将所有数据从kafka流到amazons3,最后流到amazonredshift。由于这些控制字符,我的数据加载到红移(复制命令)失败。
我花了相当长的时间在网上研究这个问题,但到目前为止运气不好。我是否可以指示flume不要在源数据中添加这些字符?
我的Flume配置如下。我尝试了“exec”和“spooldir”源代码,但没有任何改变。
kafka-agent.channels=ch1
kafka-agent.channels.ch1.type=org.apache.flume.channel.kafka.KafkaChannel
kafka-agent.channels.ch1.brokerList=localhost:9092
kafka-agent.channels.ch1.topic=call-center-dimension
kafka-agent.channels.ch1.zookeeperConnect=localhost:2181
kafka-agent.channels.ch1.capacity=10000
kafka-agent.channels.ch1.transactionCapacity=10000
kafka-agent.channels.ch1.parseAsFlumeEvent = true
kafka-agent.channels.ch1.kafka.serializer.class=kafka.serializer.DefaultEncoder
kafka-agent.sources=tail
# kafka-agent.sources.tail.type=spooldir
# kafka-agent.sources.tail.channels=ch1
# kafka-agent.sources.tail.spoolDir=/home/ec2-user/flumespool
# kafka-agent.sources.tail.fileHeader=false
kafka-agent.sources.tail.type=exec
kafka-agent.sources.tail.channels=ch1
kafka-agent.sources.tail.shell=/bin/bash -c
kafka-agent.sources.tail.command=cat /tmp/call_center_dimension_1.out
kafka-agent.sinks=sink1
kafka-agent.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
kafka-agent.sinks.sink1.brokerList=localhost:9092
kafka-agent.sinks.sink1.topic=kafka
kafka-agent.sinks.sink1.channel=ch1
kafka-agent.sinks.sink1.batchSize=5
kafka-agent.sinks.sink1.kafka.serializer.class=kafka.serializer.StringEncoder
感谢您的帮助。
谢谢,普拉维斯
2条答案
按热度按时间snz8szmq1#
尝试使用hdfs文件的数据流文件类型:
kafka-agent.sinks.sink1.hdfs.filetype=数据流
dwthyt8l2#
只是为了结束循环。
在这上面花了很多时间。还玩转了使用regex\u替换拦截器删除控制字符的想法,但运气不好。
最终评估并使用了apachenifi。很好的工具,易于设置和快速实现。我能够从文件中读取数据,并使用nifi推送到kafka主题,它保持了数据的干净和完整。
谢谢,普拉维斯