classcastexception从kafka流和avro反序列化

roqulrg3  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(392)

我已经用avromaven插件从avro模式生成了avrojava类。我将avro类序列化为字节数组,并将其写入kafka主题。
然后我有一个kafka流,它试图操纵avro数据来做一些事情。在反序列化过程中,我从同一个类中得到一个classcastexcition。我了解到这个问题是由于avro在fallback(类加载器的一个新示例)上使用了不同的类加载器而产生的。
有没有一种方法可以强制avro使用调用者的类加载器或类似的东西?
kafkastream属性

this.props = new Properties();
        this.props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        this.props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        this.props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());

我正在使用一个字符串键和序列化的avro的字节数组,然后我需要手动反序列化avro的负载。我使用avro的解码器反序列化如下:

AvroPayload stp = AvroPayload.fromByteBuffer(ByteBuffer.wrap(bytes));

或者像这样:

AvroPayload stp = AvroPayload.getDecoder().decode(ByteBuffer.wrap(bytes));

在第一个版本中,我可以看到,如果我保持在avro生成的类上下文中,字节数组被正确地反序列化到avropayload类中。返回新示例可能会引发 ClassCastException

8wigbo56

8wigbo561#

我找到的唯一解决方案是按照建议将avro类放入一个外部jar中,然后导入它。
这不是一个好的解决方案,因为它需要大量的配置来保持avro模式和生成的类的耦合,但这是我发现的唯一一个。
我配置了一个maven项目,该项目将avro的类jar生成到一个目录中,其名称中没有工件版本,因此我可以随时导入最新版本,而不必更改pom。
如果有人会发现另一个解决方案,请张贴

相关问题