我正试图通过flume把Kafka的数据放到hdfs中。Kafka的制作人每10秒发送一条信息。我想收集所有的消息在一个文件中的hdfs。这是我使用的flume的配置,但它在hdfs上存储了许多文件(一个用于消息):
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = prova
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/input
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink
p、 从一个.csv文件开始。Kafka制作人获取文件并选择一些感兴趣的字段,然后每10秒发送一个条目。flume将条目存储在hadoop hdfs上,但是存储在许多文件中(1个条目=1个文件)。我希望所有的条目都在一个文件中。如何改变Flume的配置?
2条答案
按热度按时间2hh7jdfx1#
看起来flume当前确实设置为在hdfs上为每个输入文件创建一个文件。
正如这里所建议的,您可以通过编写一个周期性的pig(或mapreduce)作业来处理这个问题,该作业接受所有的输入文件并将它们组合起来。
减少文件数量的另一个选择可能是减少入站文件的频率。
ffvjumwh2#
将rollinterval设置为0,因为您不希望根据时间生成不同的文件。如果要使其基于数字条目或事件,请更改rollcount值。例如,如果要在一个文件中保存10个事件或条目: