我正在关注Kafka连接教程,我想知道是否有可能收到消息,这将是某种类型的类。
辅导的:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
与教程中的表类似,模式如下所示:
{
"namespace": "avro",
"type": "record",
"name": "Audit",
"fields": [
{"name": "c1", "type": "int"},
{"name": "c2", "type": "string"},
{"name": "create_ts", "type": "long"},
{"name": "update_ts", "type": "long"}
]
}
基于avro格式,我用maven生成了一个类。
然后我用我的类型定义了消费者工厂:
public ConsumerFactory<String, Audit> auditConsumerFactory() { ... )
Kafka利特纳:
@KafkaListener(topics = "${kafka.mysql.topic}", containerFactory = "mysqlKafkaListenerContainerFactory")
public void receive(Audit audit) {
System.out.println(audit);
this.latch.countDown();
}
但最后我得到了这样的错误:
2019-12-16 21:56:50.139 ERROR 31862 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-audit-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class audit specified in writer's schema whilst finding reader's schema for a SpecificRecord.
使用反序列化程序编辑consumerfactory:
public ConsumerFactory<String, Audit> auditConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getKafkaBootstrapAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory(props);
}
审计.avsc
{
"type": "record",
"name": "avro.Audit",
"fields": [
{
"name": "c1",
"type": "int"
},
{
"name": "c2",
"type": "string"
},
{
"name": "create_ts",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
},
{
"name": "update_ts",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
}
],
"connect.name": "avro.Audit"
}
我在github上找到了问题的答案
1条答案
按热度按时间gpfsuwkq1#
我不知道是否有关于这个问题的其他线索,但最终合流解决了这个问题。将这三条线添加到jdbc连接器
“transforms”:“addnamespace”,“transforms.addnamespace.type”:“org.apache.kafka.connect.transforms.setschemametadata$value”,“transforms.addnamespace.schema.name”:“my.namespace.nameofschema”,
Kafka-7883