如何使kafka将jdbc连接器连接到预定义的avro模式?它会在创建连接器时创建一个新版本。我正在读db2的文章,并把它放到Kafka的主题中。我在创建过程中设置架构名称和版本,但它不工作!!!以下是我的连接器设置:
{
"name": "kafka-connect-jdbc-db2-tst-2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:db2://mydb2:50000/testdb",
"connection.user": "DB2INST1",
"connection.password": "12345678",
"query":"SELECT CORRELATION_ID FROM TEST.MYVIEW4 ",
"mode": "incrementing",
"incrementing.column.name": "CORRELATION_ID",
"validate.non.null": "false",
"topic.prefix": "tst-4" ,
"auto.register.schemas": "false",
"use.latest.version": "true",
"transforms": "RenameField,SetSchemaMetadata",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "CORRELATION_ID:id",
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "foo.bar.MyMessage",
"transforms.SetSchemaMetadata.schema.version": "1"
}
}
下面是模式:v.1是我的,v.2是由jdbc源连接器创建的:
$ curl localhost:8081/subjects/tst-4-value/versions/1 | jq .
{
"subject": "tst-4-value",
"version": 1,
"id": 387,
"schema": "{"type":"record","name":"MyMessage",
"namespace":"foo.bar","fields":[{"name":"id","type":"int"}]}"
}
$ curl localhost:8081/subjects/tst-4-value/versions/2 | jq .
{
"subject": "tst-4-value",
"version": 2,
"id": 386,
"schema": "{"type":"record","name":"MyMessage","namespace":"foo.bar",
"fields":[{"name":"id","type":"int"}],
"connect.version":1,
"connect.name":"foo.bar.MyMessage"
}"
}
你知道我怎样才能强迫Kafka连接器使用我的模式吗?提前谢谢,
暂无答案!
目前还没有任何答案,快来回答吧!