java—反序列化事务一致的使用者消息

4zcjmb1e  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(229)

这个问题也与我先前提出的问题有关。iidr cdc向Kafka提供了交易详情,但Kafka仍未得到答复。
事务一致性使用者api的示例avroconsole在控制台中输出消息,并使用kafkadeserializer示例tcc控制台链接将byte[]反序列化为objecthttps://www.ibm.com/support/knowledgecenter/sstrgz_11.4.0/com.ibm.cdcdoc.cdckafka.doc/tasks/kafkatccdev.html
我们尝试使用javakafkaconsumer类执行同样的操作,并且能够使用byte[]反序列化器打印它。consumerrecord键和consumerrecord值仍为序列化格式。
下面是示例代码

final Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID + "-" + UUID.randomUUID().toString());
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
prop.put("schema.registry.url", SCHEMA_REGISTRY_URL);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + "-" + UUID.randomUUID().toString());
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

consumer = new KafkaConsumer<byte[], byte[]>(properties);
consumer.subscribe(Arrays.asList(TCC_TOPIC));
consumer.seekToBeginning(consumer.assignment());

Map<String, Object> keyDeserializerConfig = new HashMap<String, Object>();
keyDeserializerConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, StreamConfigurator.SCHEMA_REGISTRY_URL);
keyDeserializerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer ();
keyDeserializer.configure(keyDeserializerConfig, true);

Map<String, Object> valueDeserializerConfig = new HashMap<String, Object>();
valueDeserializerConfig.putAll(keyDeserializerConfig);
valueDeserializerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
KafkaAvroDeserializer valueDeserializer = new KafkaAvroDeserializer ();
valueDeserializer.configure(valueDeserializerConfig, false);

while (true) {
    ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<byte[],byte[]> record : records)
    {
        System.out.println(record.topic());
        System.out.println(record.partition());
        System.out.println(keyDeserializer.deserialize("key", record.key()));
        System.out.println(valueDeserializer.deserialize("value", record.value()));
        System.out.println(record.toString());
    }
}

下面的代码行中出现运行时错误
system.out.println(keydeserializer.deserialize(“key”,record.key());
error is exception in thread“main”org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错,原因是:org.apache.kafka.common.errors.serializationexception:未知的魔字节!
任何帮助都将不胜感激。没有太多更好的文档或示例可以为事务一致的使用者主题编写使用者代码。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题