jdbc avro connect如何定义自定义模式注册表

rt4zxlrg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(347)

我正在学习关于kafka connect的教程,我想知道是否有可能为一个数据来自mysql表的主题定义一个自定义模式注册表。
我在json/connect配置中找不到定义它的位置,我不想在创建之后创建该模式的新版本。
我的mysql表stations有这个模式

Field          | Type        
---------------+-------------
code           | varchar(4)  
date_measuring | timestamp   
attributes     | varchar(256)

其中属性包含json数据而不是字符串(我必须使用该类型,因为属性的json字段是可变的)。
我的连接器坏了

{
  "value.converter.schema.registry.url": "http://localhost:8081",
  "_comment": "The Kafka topic will be made up of this prefix, plus the table name  ",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "name": "jdbc_source_mysql_stations",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "transforms": [
    "ValueToKey"
  ],
  "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.ValueToKey.fields": [
    "code",
    "date_measuring"
  ],
  "connection.url": "jdbc:mysql://localhost:3306/db_name?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC",
  "connection.user": "confluent",
  "connection.password": "**************",
  "table.whitelist": [
    "stations"
  ],
  "mode": "timestamp",
  "timestamp.column.name": [
    "date_measuring"
  ],
  "validate.non.null": "false",
  "topic.prefix": "mysql-"
}

并创建这个模式

{
  "subject": "mysql-stations-value",
  "version": 1,
  "id": 23,
  "schema": "{\"type\":\"record\",\"name\":\"stations\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"date_measuring\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"attributes\",\"type\":\"string\"}],\"connect.name\":\"stations\"}"
}

其中“attributes”字段当然是一个字符串。不像我会把它应用到另一个模式。

{
  "fields": [
    {
      "name": "code",
      "type": "string"
    },
    {
      "name": "date_measuring",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "connect.version": 1,
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    },
    {
      "name": "attributes",
      "type": {
        "type": "record",
        "name": "AttributesRecord",
        "fields": [
          {
            "name": "H1",
            "type": "long",
            "default": 0
          },
          {
            "name": "H2",
            "type": "long",
            "default": 0
          },
          {
            "name": "H3",
            "type": "long",
            "default": 0
          },          
          {
            "name": "H",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Q",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P1",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P2",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P3",
            "type": "long",
            "default": 0
          },                    
          {
            "name": "P",
            "type": "long",
            "default": 0
          },          
          {
            "name": "T",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Hr",
            "type": "long",
            "default": 0
          },          
          {
            "name": "pH",
            "type": "long",
            "default": 0
          },          
          {
            "name": "RX",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Ta",
            "type": "long",
            "default": 0
          },  
          {
            "name": "C",
            "type": "long",
            "default": 0
          },                  
          {
            "name": "OD",
            "type": "long",
            "default": 0
          },          
          {
            "name": "TU",
            "type": "long",
            "default": 0
          },          
          {
            "name": "MO",
            "type": "long",
            "default": 0
          },          
          {
            "name": "AM",
            "type": "long",
            "default": 0
          },          
          {
            "name": "N03",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P04",
            "type": "long",
            "default": 0
          },          
          {
            "name": "SS",
            "type": "long",
            "default": 0
          },          
          {
            "name": "PT",
            "type": "long",
            "default": 0
          }          
        ]
       }
     }    
  ],
  "name": "stations",
  "namespace": "com.mycorp.mynamespace",
  "type": "record"
}

有什么建议吗?如果不可能,我想我必须创建一个kafkastream来创建另一个主题,即使我会避免它。
提前谢谢!

dojqjjoe

dojqjjoe1#

我不认为你在问什么关于使用“定制”注册表的问题(这两行表示你正在使用哪个注册表),而是问你如何在从数据库中提取记录之后解析数据/应用模式
您可以编写自己的转换,也可以使用kstream,这实际上是这里的主要选项。有一个setschemametadata转换,但我不确定它是否能满足您的要求(将字符串解析为avro记录)
或者,如果您必须将json数据塞进一个数据库属性中,也许您不应该使用mysql,而应该使用具有更灵活的数据约束的文档数据库。
否则,可以使用blob而不是varchar,并将二进制avro数据放入该列,但仍然需要自定义反序列化程序来读取数据

相关问题