kafka streams-serializationexception:未知的魔字节

h6my8fg2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(408)

我正在尝试创建一个处理avro记录的kafka streams应用程序,但出现以下错误:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我不确定是什么导致了这个错误。我只是想先把avro记录放到应用程序中,在那里它们会被处理,然后输出到另一个主题,但它似乎不起作用。我已经包括了下面应用程序的代码。有人知道为什么它不起作用吗?

Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    Serde<String> stringSerde = Serdes.String();
    Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();

    specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));

    KStream<String, trackingReport> outputreports = inputreports;

    String outputTopic = "outtesttopic";
    outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));

    Topology topology = builder.build();

    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
ldfqzlk8

ldfqzlk81#

未知的魔法字节!
意味着您的数据不符合架构注册表所需的wire格式。
或者,换句话说,您试图读取的数据不是avro,正如合流avro反序列化程序所期望的那样。
通过运行 kafka-avro-console-consumer ,所以您可能也想使用它进行调试
如果您确定您的数据确实是avro,并且模式实际上是作为消息的一部分发送的(需要查看您的生产者代码),那么您不应该使用合流avro反序列化程序,因为它们在消息中需要特定的字节格式。相反,你可以用 ByteArrayDesrializer 自己读取avro记录,然后将其传递给apacheavro二进制解码器 class . 作为奖励,您可以将该逻辑提取到您自己的deserialzer类中
另外,如果输入主题是avro,我认为不应该使用这个属性来读取字符串

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

相关问题