我有以下内容:source-kafka topic(trans)channel-memory sink-hdfs(avro\u event)
Kafka主题翻译中的数据是用c#producer编写的,有数千条avro记录。当我运行flume consumer时,它开始将数据放入hdfs。问题是数据的格式是:schema data schema data
而不是:
架构数据
我猜这是因为flume期望的记录类型是{header}{body},而来自kafka的数据只是{body}。我知道有一种方法可以将写在avroflumeevent中的avro数据 Package 到主题中,但是看起来它不再是真正的avro记录,可能spark消费者或storm会更喜欢这些数据在真正的阿夫罗下线。有没有一种方法可以处理这个主题,这样每次flume将数据滚动到hdfs时,数据就可以在没有多个模式的情况下写入?
2条答案
按热度按时间kfgdxczn1#
一旦你把数据放到Kafka上,你有没有考虑过使用linkedin的加缪。它将运行mapreduce作业,但您应该获得所需的架构数据布局。您还应该了解confluent的kafka堆栈,特别是它提供的模式注册表和RESTAPI。
inn6fuwd2#
我们最终还是成功了。我们使用的是microsoft.net avro库,而不是c#producer中的apache avro库。这意味着avro记录被正确序列化。我还需要将flume接收器更改为使用“org.apache.flume.sink.hdfs.avroeventserializer$builder”作为接收器序列化程序,而不是“avro\u event”。我还需要包括一个连接到kafka源的flume拦截器,该拦截器将变量“flume.avro.schema.url”推送到flume头中,稍后由hdfs接收器序列化程序使用。
我看了一眼加缪,但对于我们试图实现的东西来说,似乎有些过分了,一个连接到Kafka主题的基本flume频道,它将avro数据接收到hdfs。
我刚刚从构建flume配置的java应用程序中删除了拦截器部分,希望它能帮助遇到此问题的其他人: