我从一个Kafka主题接收二进制avro文件,我必须反序列化它们。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的avro文件分离是一种更好的做法,但是我不能控制生产者,我不能改变这一点。
我的代码运行在apachestorm之上。首先,我创建一个阅读器:
mDatumReader = new GenericDatumReader<GenericRecord>();
稍后我尝试反序列化消息而不声明架构:
Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);
但是当一个消息到达时,我得到一个错误:
Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]
我看到的所有答案都是关于使用其他格式,改变传递给Kafka的信息或其他东西。我无法控制这些事情。
我的问题是 bytes[]
在二进制消息中嵌入了模式,如何在不声明模式的情况下反序列化avro文件,以便读取它。
2条答案
按热度按时间wfsdck301#
添加maven依赖性
创建一个如下所示的文件
将以上内容另存为src/main/resources中的avro.avsc。
在eclipse或任何ide中run>maven生成源代码,这些源代码创建avro.java到包文件夹[namespace]tachyonis.space
消费者/生产者必须在同一台机器上运行。否则,您需要在windows/linux中配置hosts文件,并将所有组件配置属性从localhost更改为Map到实际的ip地址,以便广播给生产者/消费者。否则会出现网络连接问题等错误
wljmcqd82#
对于datumreader/writer,没有嵌入式模式这样的东西。当我第一次看到阿夫罗和Kafka的时候,也是我的误解。但是avro序列化程序的源代码清楚地表明,在使用genericdatumwriter时没有嵌入任何模式。
datafilewriter会在文件的开头编写模式,然后使用genericdatumwriter添加genericrecords。
既然您在开始时说有一个模式,我假设您可以读取它,将其转换为一个模式对象,然后将其传递给genericdataumreader(schema)构造函数。想知道消息是如何序列化的。也许datafilewriter被用来写入byte[]而不是实际的文件,那么您可以使用datafilereader来反序列化数据吗?