需要注意的重要事项是: 您将需要标准的kafka和avrojars来运行这个开箱即用的代码。 是非常重要的props.put(“serializer.class”,“kafka.serializer.defaultencoder”);大学教师 t use stringEncoder as that won 如果您将字节数组作为消息发送,则无法工作。 您可以将byte[]转换为十六进制字符串并发送该字符串,然后在使用者上将十六进制字符串重新转换为byte[],然后再转换为原始消息。 运行Zookeeper和经纪人如下所述:http://kafka.apache.org/documentation.html#quickstart 并创建一个名为“页面视图”或任何你想要的主题。 运行producertest.java,然后运行consumergroupexample.java,查看正在生成和使用的avro数据。
5条答案
按热度按时间v9tzhpje1#
更新的答案。
kafka有一个avro序列化程序/反序列化程序,其坐标为maven(sbt格式):
将kafkavroserializer的示例传递到kafkaproducer构造函数中。
然后可以创建avro genericord示例,并将这些示例用作kafka producerrecord示例中的值,您可以使用kafkaproducer发送这些示例。
在kafka消费端,您使用kafkaavrodeserializer和kafkaconsumer。
mutmk8jj2#
我终于想起问Kafka的邮件列表,并得到了以下答案,这是完美的工作。
是的,您可以以字节数组的形式发送消息。如果您查看message类的构造函数,您将看到-
def this(字节:数组[byte])
现在,看看producer send()api-
def send(producerdata:producerdata[k,v]*)
您可以将v设置为message类型,将k设置为您想要的密钥类型。如果您不关心使用键进行分区,那么也将其设置为消息类型。
谢谢,内哈
r6l8ljro3#
代替avro,您还可以简单地考虑压缩数据;使用gzip(良好的压缩,更高的cpu)或lzf或snappy(更快,更慢的压缩)。
或者还有一种微笑二进制json,由jackson在java中支持(使用此扩展):它是紧凑的二进制格式,比avro更易于使用:
基本上与json相同的代码,只是传递了不同的格式工厂。从数据大小的Angular 来看,smile和avro是否更紧凑取决于用例的细节;但两者都比json更紧凑。
这样做的好处是,使用json和smile,使用相同的代码,只需使用pojo,就可以快速工作。与avro相比,avro要么需要生成代码,要么需要大量的手动代码来打包和解包
GenericRecord
s。goqiplq24#
这是一个基本的例子。我没有尝试过多个分区/主题。
//示例生产者代码
//消费者代码示例
第1部分:使用者组代码:因为对于多个分区/主题,可以有多个使用者。
第2部分:实际消费信息的个人消费者。
测试avro架构:
需要注意的重要事项是:
您将需要标准的kafka和avrojars来运行这个开箱即用的代码。
是非常重要的props.put(“serializer.class”,“kafka.serializer.defaultencoder”);大学教师
t use stringEncoder as that won
如果您将字节数组作为消息发送,则无法工作。您可以将byte[]转换为十六进制字符串并发送该字符串,然后在使用者上将十六进制字符串重新转换为byte[],然后再转换为原始消息。
运行Zookeeper和经纪人如下所述:http://kafka.apache.org/documentation.html#quickstart 并创建一个名为“页面视图”或任何你想要的主题。
运行producertest.java,然后运行consumergroupexample.java,查看正在生成和使用的avro数据。
qmelpv7a5#
如果要从avro消息(Kafka部分已应答)获取字节数组,请使用二进制编码器: