kafka与avro唱片

zlwx9yxi  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(355)

我有以下内容:source-kafka topic(trans)channel-memory sink-hdfs(avro\u event)
Kafka主题翻译中的数据是用c#producer编写的,有数千条avro记录。当我运行flume consumer时,它开始将数据放入hdfs。问题是数据的格式是:schema data schema data
而不是:
架构数据
我猜这是因为flume期望的记录类型是{header}{body},而来自kafka的数据只是{body}。我知道有一种方法可以将写在avroflumeevent中的avro数据 Package 到主题中,但是看起来它不再是真正的avro记录,可能spark消费者或storm会更喜欢这些数据在真正的阿夫罗下线。有没有一种方法可以处理这个主题,这样每次flume将数据滚动到hdfs时,数据就可以在没有多个模式的情况下写入?

kfgdxczn

kfgdxczn1#

一旦你把数据放到Kafka上,你有没有考虑过使用linkedin的加缪。它将运行mapreduce作业,但您应该获得所需的架构数据布局。您还应该了解confluent的kafka堆栈,特别是它提供的模式注册表和RESTAPI。

inn6fuwd

inn6fuwd2#

我们最终还是成功了。我们使用的是microsoft.net avro库,而不是c#producer中的apache avro库。这意味着avro记录被正确序列化。我还需要将flume接收器更改为使用“org.apache.flume.sink.hdfs.avroeventserializer$builder”作为接收器序列化程序,而不是“avro\u event”。我还需要包括一个连接到kafka源的flume拦截器,该拦截器将变量“flume.avro.schema.url”推送到flume头中,稍后由hdfs接收器序列化程序使用。
我看了一眼加缪,但对于我们试图实现的东西来说,似乎有些过分了,一个连接到Kafka主题的基本flume频道,它将avro数据接收到hdfs。
我刚刚从构建flume配置的java应用程序中删除了拦截器部分,希望它能帮助遇到此问题的其他人:

_flumeFileConfigProperties.put(_agentId+".sources." + _sourceId +".interceptors",_interceptorId);           
                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".type","static");
                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".key","flume.avro.schema.url");
                _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".value",_avroProdSchemaLocation +_databaseName + "/" + _topic + "/record/" + _schemaVersion + "/" + _topicName + ".avsc");

相关问题