debezium-custom payload-mysql连接器

yr9zkbsy  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我使用debezium将数据从mysql同步到s3。现在我想做些改变。
插入示例:

create table new (id int);
insert into new (1);

1. 自定义负载

{
    "schema": {
        "type": "struct",
        bla bla bla
        "optional": false,
        "name": "_72.31.84.129.test.new.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "id": 10
        },
        "source": {
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "11.11.84.129",
            "ts_ms": 1576605998000,
            "snapshot": "false",
            "db": "test",
            "table": "new",
            "server_id": 1,
            "gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
            "file": "mysql-bin.000003",
            "pos": 12770,
            "row": 0,
            "thread": 47,
            "query": null
        },
        "op": "c",
        "ts_ms": 1576605998231
    }
}

我只想通过一些自定义更改来推送有效负载选项。我需要包括 source,op,ts_ms 都在里面 payload.after .

预期产量:

{
            "id": 10, 
            "source": {
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "11.11.84.129",
            "ts_ms": 1576605998000,
            "snapshot": "false",
            "db": "test",
            "table": "new",
            "server_id": 1,
            "gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
            "file": "mysql-bin.000003",
            "pos": 12770,
            "row": 0,
            "thread": 47,
            "query": null
        },
        "op": "c",
        "ts_ms": 1576605998231
        }

我不想要模式,之前。我不知道如何得到这个输出。

vu8f3i0k

vu8f3i0k1#

查看smt以提取新记录状态。它只会传播 after . 或者,您可以让它从中添加所选字段 source ,也是。

...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...

不能插入 op 以及 ts_ms 字段,但它们可以作为消息头。

相关问题