jdbc源连接器-设置avro模式

mrwjdhj3  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(265)

如何使kafka将jdbc连接器连接到预定义的avro模式?它会在创建连接器时创建一个新版本。我正在读db2的文章,并把它放到Kafka的主题中。我在创建过程中设置架构名称和版本,但它不工作!!!以下是我的连接器设置:

{
      "name": "kafka-connect-jdbc-db2-tst-2",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:db2://mydb2:50000/testdb",
        "connection.user": "DB2INST1",
        "connection.password": "12345678",
        "query":"SELECT CORRELATION_ID FROM TEST.MYVIEW4 ",
        "mode": "incrementing",
        "incrementing.column.name": "CORRELATION_ID",
        "validate.non.null": "false",
        "topic.prefix": "tst-4" ,
        "auto.register.schemas": "false",
        "use.latest.version": "true",
        "transforms": "RenameField,SetSchemaMetadata",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "CORRELATION_ID:id",
        "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
        "transforms.SetSchemaMetadata.schema.name": "foo.bar.MyMessage",
        "transforms.SetSchemaMetadata.schema.version": "1"

      }

    }

下面是模式:v.1是我的,v.2是由jdbc源连接器创建的:

$ curl localhost:8081/subjects/tst-4-value/versions/1 | jq .

    {
      "subject": "tst-4-value",
      "version": 1,
      "id": 387,
      "schema": "{"type":"record","name":"MyMessage",
    "namespace":"foo.bar","fields":[{"name":"id","type":"int"}]}"
    }

    $ curl localhost:8081/subjects/tst-4-value/versions/2 | jq .
    {
      "subject": "tst-4-value",
      "version": 2,
      "id": 386,
      "schema": "{"type":"record","name":"MyMessage","namespace":"foo.bar",
       "fields":[{"name":"id","type":"int"}],
       "connect.version":1,
       "connect.name":"foo.bar.MyMessage"
    }"
    }

你知道我怎样才能强迫Kafka连接器使用我的模式吗?提前谢谢,

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题