jdbc自定义avro模式

hc8w905p  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(502)

我正在关注Kafka连接教程,我想知道是否有可能收到消息,这将是某种类型的类。
辅导的:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
与教程中的表类似,模式如下所示:

{
   "namespace": "avro",
   "type": "record",
   "name": "Audit",
   "fields": [
      {"name": "c1", "type": "int"},
      {"name": "c2", "type": "string"},
      {"name": "create_ts", "type": "long"},
      {"name": "update_ts", "type": "long"}
   ]
}

基于avro格式,我用maven生成了一个类。
然后我用我的类型定义了消费者工厂:

public ConsumerFactory<String, Audit> auditConsumerFactory() { ... )

Kafka利特纳:

@KafkaListener(topics = "${kafka.mysql.topic}", containerFactory =   "mysqlKafkaListenerContainerFactory")
public void receive(Audit audit) {
     System.out.println(audit);
     this.latch.countDown();
}

但最后我得到了这样的错误:

2019-12-16 21:56:50.139 ERROR 31862 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-audit-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class audit specified in writer's schema whilst finding reader's schema for a SpecificRecord.

使用反序列化程序编辑consumerfactory:

public ConsumerFactory<String, Audit> auditConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getKafkaBootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return new DefaultKafkaConsumerFactory(props);
    }

审计.avsc

{
  "type": "record",
  "name": "avro.Audit",
  "fields": [
    {
      "name": "c1",
      "type": "int"
    },
    {
      "name": "c2",
      "type": "string"
    },
    {
      "name": "create_ts",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "update_ts",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    }
  ],
  "connect.name": "avro.Audit"
}

我在github上找到了问题的答案

gpfsuwkq

gpfsuwkq1#

我不知道是否有关于这个问题的其他线索,但最终合流解决了这个问题。将这三条线添加到jdbc连接器
“transforms”:“addnamespace”,“transforms.addnamespace.type”:“org.apache.kafka.connect.transforms.setschemametadata$value”,“transforms.addnamespace.schema.name”:“my.namespace.nameofschema”,
Kafka-7883

相关问题