我正在尝试使用avro-kafka反序列化程序对kafka-avro消息进行反序列化。这段代码非常常见,很多用户已经在实践中使用过了。但在实现相同的代码时,我遇到了一些困难:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
@SuppressWarnings("unchecked")
ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000); //1
for (ConsumerRecord<String, ATPEvent> record: records) //2 {
try {
kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
avroEvent = reader.read(avroEvent, avroBinaryDecoder);
System.out.println(avroEvent);
kafkaMessageInputStream.close();
} catch (Exception ex) {
System.out.println("Unable to process event from kafka, see exception details" + ex);
}
}
consumer.commitSync(); //4
}
下面是4个问题:
我必须加上演员,否则它会通过一个错误作为
Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent> Can only iterate over an array or an instance of java.lang.Iterable
我不知道为什么?我能做这个吗
List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000);
for (ConsumerRecord<String, ATPEvent> record: records) {
``` `The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent)` 这我明白,但是我怎样才能把一个类转换成一个字节,还有别的方法吗? `The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent>` 我能用一下吗 `consumer.close();` 请提供2和3的解决方案,如果可能,请解释1和4。
1条答案
按热度按时间zqry0prt1#
你用哪种版本的Kafka?0.8.x和0.9.x存在差异
Kafka0.8.x:
返回类型为
Map<String, ConsumerRecords>
(见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/kafkaconsumer.html)使用
ConsumerRecords#records(...)
得到List<ConsumerRecord>
对于迭代(参见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/consumerrecords.html)ConsumerRecord.value()
返回byte[]
(见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/consumerrecord.html#value%28%29)使用
commit(boolen)
(见http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commit%28(布尔值%29)--commitSync()
仅适用于0.9.xKafka0.9.x
返回类型为
ConsumerRecords
(见https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html)ConsumerRecords
工具Iterable
(见https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html)因此,您可以使用for(ConsumerRecord r : records)
ConsumerRecord#value
退货T
(与T == ATPEvent
在您的情况下)(参见https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html)commitSync()
提供0.9.x