我正在用java编写一个kafka流应用程序,它接受一个连接器创建的输入主题,该连接器使用schema registry和avro作为键和值转换器。连接器生成以下架构:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
实际上,有几个主题,关键模式总是“int”,值模式总是某种记录(用户、产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
一开始我试着用 Consumed.with(Serdes.Integer(), userSerde);
但这不起作用,因为serdes.integer()期望整数使用4字节编码,而avro使用可变长度编码。使用 Consumed.with(Serdes.Bytes(), userSerde);
工作,但我真的想要整数,而不是字节,所以我把我的代码改成这个
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
这使得编译器产生一个警告(它不喜欢 (Serde<Integer>)(Serde)
铸造)但它允许我使用 Consumed.with(keySerde, userSerde);
得到一个整数作为键。这很好,我的应用程序的行为正如预期的太好了!但现在我想为键/值定义默认serde,但我无法让它工作。
设置默认值serde很简单:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
但是,我不知道如何定义默认密钥serde。
我试过了 streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());
产生运行时错误:找不到org.apache.kafka.common.serialization.serdes$wrapperserde的公共无参数构造函数 streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
产生运行时错误:java.lang.integer无法转换为org.apache.avro.specific.specificrecord
我错过了什么?谢谢。
3条答案
按热度按时间j91ykkif1#
更新(5.5及更新版本)
汇合型
5.5
通过添加对基本avro类型的本机支持PrimitiveAvroSerde
(参见。https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/primitiveavroserde.java)原始答案(版本5.4及更高版本):
这是一个众所周知的问题。原始的avro类型不能很好地处理confluent的avroserdes,因为serdes可以处理
GenericAvroRecord
以及SpecificAvroRecord
只是。比较https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro.
因此,基于
KafkaAvroSerializer
以及KafkaAvroDeserializer
是正确的方法。为了能够将其作为默认serde传递到配置中,您不能使用Serdes.serdeFrom
因为类型信息由于genrics类型擦除而丢失。但是,您可以实现自己的扩展类
Serde
接口,并将自定义类传递到配置中:ljsrvy3e2#
感谢@matthias j.的提示。sax,我想发布解决方案的工作。请免费增强它。
用作默认流配置:
覆盖默认值:
flmtquvp3#
如果你想和Kafka的消费者和制作人一起使用@thiyaga rajan这个伟大的解决方案
将此添加到类中