如何用flume将json写入kafka中按日期划分的hdfs

d6kp6zgx  于 2021-06-04  发布在  Flume
关注(0)|答案(2)|浏览(445)

我们在Kafka有如下json:{'date':'2017-01-01','timestamp':1483228800,'field1':x,…}.{'日期':'2017-01-02','timestamp':1483315200,'field1':x,…}。我们希望使用flume将事件从kafka加载到hdfs。我们希望这些文件是按照json中的日期进行分区的。但是,我们不知道flume是否支持此功能。似乎只有httpsource支持jsonhandler。我想知道Kafka的消息来源是否也有同样的支持。
我们的配置

a.sources = kafka1
a.channels = channel1
a.sinks = hdfs1
a.sources.kafka1.channel = channel1
a.sources.kafka1.type =  org.apache.flume.source.kafka.KafkaSource

a.sources.kafka1.topic = topic1

a.channels.channel1.type = memory
a.sinks.hdfs1.type = hdfs
a.sinks.hdfs1.channel = channel1
a.sinks.hdfs1.path = maprfs:///user/abc/topic1/date=%{date}
a.sinks.hdfs1.fileType = SequenceFile
a.sinks.hdfs1.useLocalTimeStamp = false

因此,日期为空。如何得到实际的日期值?任何帮助都将不胜感激。谢谢

zujrkrfu

zujrkrfu1#

如果kafka通道由json对象填充,则可以按如下方式设置代理:

a.channels= kafka1
a.sinks = hdfs1

a.channels.kafka1.type = org.apache.flume.channel.kafka.KafkaChannel
a.channels.kafka1.kafka.bootstrap.servers = hostnameorip:9092,hostnameorip:9092
a.channels.kafka1.kafka.topic = topic_sample
a.channels.kafka1.kafka.consumer.group.id = consumer_group_sample

a.sinks.hdfs1.type = hdfs
a.sinks.hdfs1.channel = kafka1
a.sinks.hdfs1.path = /user
a.sinks.hdfs1.fileType = SequenceFile
a.sinks.hdfs1.filePrefix = /abc/topic1/date=%{date}

我不是flume方面的Maven,但是如果您只想从kafka中提取事件并存储在hdfs中,则不需要使用源代码。您可以从json事件中提取字段以在文件名中使用,方法是引用如下字段名:%{field}
我希望这有帮助

s3fp2yjn

s3fp2yjn2#

我找到了办法。基本上,我们需要添加regex拦截器来提取日期信息并将其放入“flume header”中。

相关问题