我使用debezium将数据从mysql同步到s3。现在我想做些改变。
插入示例:
create table new (id int);
insert into new (1);
1. 自定义负载
{
"schema": {
"type": "struct",
bla bla bla
"optional": false,
"name": "_72.31.84.129.test.new.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 10
},
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
}
我只想通过一些自定义更改来推送有效负载选项。我需要包括 source,op,ts_ms
都在里面 payload.after
.
预期产量:
{
"id": 10,
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
我不想要模式,之前。我不知道如何得到这个输出。
1条答案
按热度按时间vu8f3i0k1#
查看smt以提取新记录状态。它只会传播
after
. 或者,您可以让它从中添加所选字段source
,也是。不能插入
op
以及ts_ms
字段,但它们可以作为消息头。