我需要帮助来完成一些事情。我已经创建了一个数据管道,如下所述。
mysql-->debezium-->kafka-->kafka连接--->aws s3。
现在s3将有一个json格式的debezium事件消息。
现在需要将其作为表加载到redshift。
s3-->红移(目标数据库)为行。
下面我已经为一个更新事件(产品id 102的更新数量)共享了一个debezium事件消息,并且只想有一个格式,当我在s3中执行copy命令时,它应该将更改(create/update/delete)加载到redshift表中。
注意:这里我已经给出了“rotate.interval.ms”:“3600000”,对于我们的每一个用户,将创建一个包含所有crud操作的debezium消息文件。
因此,我们需要一个解决方案,这样它就可以将s3中每个新创建的文件(作为debezium消息事件)转换为一种格式,在这种格式中,我们可以应用一个copy命令,以便它以红移方式加载。我的主要目标是从mysql中捕获cdc更改并在redshift中复制。
这是我的s3接收器连接器配置:kafka connect s3 sink:
{
"name": "s3-sink-db02",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "S3bucket",
"name": "s3-sink-db02",
"tasks.max": "3",
"s3.region": "us-east-1",
"aws.access_key_id": "accesskey",
"aws.secret_access_key": "secretKey",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"locale": "en",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"topics.regex": "dbserver1.(.*)",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"rotate.schedule.interval.ms": "3600000"
}
}
debezium消息:
{
"schema": {
"name": "dbserver1.inventory.orders.Envelope",
"optional": false,
"type": "struct",
"fields": [
{
"field": "before",
"name": "dbserver1.inventory.orders.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "order_number",
"optional": false,
"type": "int32"
},
{
"field": "order_date",
"name": "io.debezium.time.Date",
"optional": false,
"type": "int32",
"version": 1
},
{
"field": "purchaser",
"optional": false,
"type": "int32"
},
{
"field": "quantity",
"optional": false,
"type": "int32"
},
{
"field": "product_id",
"optional": false,
"type": "int32"
}
]
},
{
"field": "after",
"name": "dbserver1.inventory.orders.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "order_number",
"optional": false,
"type": "int32"
},
{
"field": "order_date",
"name": "io.debezium.time.Date",
"optional": false,
"type": "int32",
"version": 1
},
{
"field": "purchaser",
"optional": false,
"type": "int32"
},
{
"field": "quantity",
"optional": false,
"type": "int32"
},
{
"field": "product_id",
"optional": false,
"type": "int32"
}
]
},
{
"field": "source",
"name": "io.debezium.connector.mysql.Source",
"optional": false,
"type": "struct",
"fields": [
{
"field": "version",
"optional": false,
"type": "string"
},
{
"field": "connector",
"optional": false,
"type": "string"
},
{
"field": "name",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": false,
"type": "int64"
},
{
"default": "false",
"field": "snapshot",
"name": "io.debezium.data.Enum",
"optional": true,
"type": "string",
"version": 1,
"parameters": {
"allowed": "true,last,false"
}
},
{
"field": "db",
"optional": false,
"type": "string"
},
{
"field": "table",
"optional": true,
"type": "string"
},
{
"field": "server_id",
"optional": false,
"type": "int64"
},
{
"field": "gtid",
"optional": true,
"type": "string"
},
{
"field": "file",
"optional": false,
"type": "string"
},
{
"field": "pos",
"optional": false,
"type": "int64"
},
{
"field": "row",
"optional": false,
"type": "int32"
},
{
"field": "thread",
"optional": true,
"type": "int64"
},
{
"field": "query",
"optional": true,
"type": "string"
}
]
},
{
"field": "op",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": true,
"type": "int64"
}
]
},
"payload": {
"op": "u",
"before": {
"order_date": 16816,
"quantity": 1,
"purchaser": 1001,
"order_number": 10001,
"product_id": 102
},
"after":**{
"order_date": 16816,
"quantity": 6,
"purchaser": 1001,
"order_number": 10001,
"product_id": 102
},
"source": {
"query": null,
"thread": 4,
"server_id": 223344,
"version": "1.0.3.Final",
"file": "mysql-bin.000007",
"connector": "mysql",
"pos": 354,
"name": "dbserver1",
"gtid": null,
"row": 0,
"ts_ms": 1591620600000,
"snapshot": "false",
"db": "inventory",
"table": "orders"
},
"ts_ms": 1591620602204
}
1条答案
按热度按时间llmtgqce1#
我会在有时间的时候完善这个答案。它来自我们的一个生产回购,它使用https://github.com/goibibo/dataplatform_utils 图书馆。