我正在学习关于kafka connect的教程,我想知道是否有可能为一个数据来自mysql表的主题定义一个自定义模式注册表。
我在json/connect配置中找不到定义它的位置,我不想在创建之后创建该模式的新版本。
我的mysql表stations有这个模式
Field | Type
---------------+-------------
code | varchar(4)
date_measuring | timestamp
attributes | varchar(256)
其中属性包含json数据而不是字符串(我必须使用该类型,因为属性的json字段是可变的)。
我的连接器坏了
{
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "The Kafka topic will be made up of this prefix, plus the table name ",
"key.converter.schema.registry.url": "http://localhost:8081",
"name": "jdbc_source_mysql_stations",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": [
"ValueToKey"
],
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": [
"code",
"date_measuring"
],
"connection.url": "jdbc:mysql://localhost:3306/db_name?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC",
"connection.user": "confluent",
"connection.password": "**************",
"table.whitelist": [
"stations"
],
"mode": "timestamp",
"timestamp.column.name": [
"date_measuring"
],
"validate.non.null": "false",
"topic.prefix": "mysql-"
}
并创建这个模式
{
"subject": "mysql-stations-value",
"version": 1,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"stations\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"date_measuring\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"attributes\",\"type\":\"string\"}],\"connect.name\":\"stations\"}"
}
其中“attributes”字段当然是一个字符串。不像我会把它应用到另一个模式。
{
"fields": [
{
"name": "code",
"type": "string"
},
{
"name": "date_measuring",
"type": {
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
},
{
"name": "attributes",
"type": {
"type": "record",
"name": "AttributesRecord",
"fields": [
{
"name": "H1",
"type": "long",
"default": 0
},
{
"name": "H2",
"type": "long",
"default": 0
},
{
"name": "H3",
"type": "long",
"default": 0
},
{
"name": "H",
"type": "long",
"default": 0
},
{
"name": "Q",
"type": "long",
"default": 0
},
{
"name": "P1",
"type": "long",
"default": 0
},
{
"name": "P2",
"type": "long",
"default": 0
},
{
"name": "P3",
"type": "long",
"default": 0
},
{
"name": "P",
"type": "long",
"default": 0
},
{
"name": "T",
"type": "long",
"default": 0
},
{
"name": "Hr",
"type": "long",
"default": 0
},
{
"name": "pH",
"type": "long",
"default": 0
},
{
"name": "RX",
"type": "long",
"default": 0
},
{
"name": "Ta",
"type": "long",
"default": 0
},
{
"name": "C",
"type": "long",
"default": 0
},
{
"name": "OD",
"type": "long",
"default": 0
},
{
"name": "TU",
"type": "long",
"default": 0
},
{
"name": "MO",
"type": "long",
"default": 0
},
{
"name": "AM",
"type": "long",
"default": 0
},
{
"name": "N03",
"type": "long",
"default": 0
},
{
"name": "P04",
"type": "long",
"default": 0
},
{
"name": "SS",
"type": "long",
"default": 0
},
{
"name": "PT",
"type": "long",
"default": 0
}
]
}
}
],
"name": "stations",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
有什么建议吗?如果不可能,我想我必须创建一个kafkastream来创建另一个主题,即使我会避免它。
提前谢谢!
1条答案
按热度按时间dojqjjoe1#
我不认为你在问什么关于使用“定制”注册表的问题(这两行表示你正在使用哪个注册表),而是问你如何在从数据库中提取记录之后解析数据/应用模式
您可以编写自己的转换,也可以使用kstream,这实际上是这里的主要选项。有一个setschemametadata转换,但我不确定它是否能满足您的要求(将字符串解析为avro记录)
或者,如果您必须将json数据塞进一个数据库属性中,也许您不应该使用mysql,而应该使用具有更灵活的数据约束的文档数据库。
否则,可以使用blob而不是varchar,并将二进制avro数据放入该列,但仍然需要自定义反序列化程序来读取数据