Kafkaavroderializer问题

ulmd4ohb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(365)

我正在尝试使用avro-kafka反序列化程序对kafka-avro消息进行反序列化。这段代码非常常见,很多用户已经在实践中使用过了。但在实现相同的代码时,我遇到了一些困难:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
        @SuppressWarnings("unchecked")
        ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000);  //1

       for (ConsumerRecord<String, ATPEvent> record: records)  //2 {

            try {
                kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
                avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
                avroEvent = reader.read(avroEvent, avroBinaryDecoder);
                System.out.println(avroEvent);
                kafkaMessageInputStream.close();
              } catch (Exception ex) {
                System.out.println("Unable to process event from kafka, see exception details" + ex);
              }
        }
        consumer.commitSync(); //4
}

下面是4个问题:
我必须加上演员,否则它会通过一个错误作为
Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent> Can only iterate over an array or an instance of java.lang.Iterable 我不知道为什么?我能做这个吗

List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000);

for (ConsumerRecord<String, ATPEvent> record: records) {
``` `The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent)` 这我明白,但是我怎样才能把一个类转换成一个字节,还有别的方法吗? `The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent>` 我能用一下吗 `consumer.close();` 请提供2和3的解决方案,如果可能,请解释1和4。
zqry0prt

zqry0prt1#

你用哪种版本的Kafka?0.8.x和0.9.x存在差异
Kafka0.8.x:
返回类型为 Map<String, ConsumerRecords> (见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/kafkaconsumer.html)
使用 ConsumerRecords#records(...) 得到 List<ConsumerRecord> 对于迭代(参见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/consumerrecords.html) ConsumerRecord.value() 返回 byte[] (见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/consumerrecord.html#value%28%29)
使用 commit(boolen) (见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commit%28(布尔值%29)-- commitSync() 仅适用于0.9.x
Kafka0.9.x
返回类型为 ConsumerRecords (见https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html) ConsumerRecords 工具 Iterable (见https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html)因此,您可以使用
for(ConsumerRecord r : records) ConsumerRecord#value 退货 T (与 T == ATPEvent 在您的情况下)(参见https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html) commitSync() 提供0.9.x

相关问题