更新操作不起作用

ldfqzlk8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(413)

我是新来Kafka连接源和接收器。我创建了一个应用程序来将表数据从一个模式(schema1)传输到另一个模式(schema2),这里我使用oracle作为数据库。我成功地将用于插入操作的数据/行从表“schema1.header”传输到表“schema2.header”,但无法使用下面提到的配置执行更新操作。
源配置:

{
           "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
            "connection.user": "USER",
            "connection.password": "user1234",
            "dialect.name": "OracleDatabaseDialect",
            "topic.prefix": "Schema1.Header",
            "incrementing.column.name": "SC_NO",
            "mode": "incrementing",
            "query": "SELECT * FROM (SELECT HEADER_V1.* FROM Schema1.Header HEADER_V1 INNER JOIN Schema1.LINE_V1 LINE_V1 ON HEADER_V1.SC_NO = LINE_V1.SC_NO AND LINE_V1.CLNAME_CODE ='XXXXXX' AND HEADER_V1.ITEM_TYPE = 'XXX')",
            "transforms": "ReplaceField",
            "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.ReplaceField.blacklist": "col_3,col_10"
            }

接收器配置:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
    "connection.user": "USER2",
    "connection.password": "user21234",
    "dialect.name": "OracleDatabaseDialect",
    "topics": "Schema1.Header",
    "table.name.format": "Schema2.Header",
    "tasks.max": "1"
}

请帮我解决这个问题。
注意:我只需要在schema schema1.tables中执行所有crud操作,使用kafka connect将这些数据传输到另一个schema schema2.tables。新插入的数据/行已传输,但更新的数据/行未通过kafka connect传输。我要怎么做才能做到这一点?

2ic8powd

2ic8powd1#

根据这个博客你需要设置 modetimestamp (或更好) timestamp+incrementing 如果您想在源配置中添加新的和更新的行)。
此外,还需要指定 timestamp.column.name 它应该指向每次更新行时都会更新的timestamp列。

相关问题