mysql 如何将Debezium捕获的UNIX时间戳转换为Kafka中的正常时间戳

pkln4tw6  于 2023-06-28  发布在  Mysql
关注(0)|答案(1)|浏览(186)

我正在使用Debezium + Kafka在MySQL数据库上执行CDC。一切正常,但我遇到了一个问题,MySQLMap到Kafka的TIMESTAMPDATETIME列格式。
对于任何TIMESTAMP列,它们在Kafka中以"2023-06-22T19:29:26Z"存储,这非常适合我的用例,并且工作正常。但是,任何DATETIME列都存储为UNIX时间戳1687462166000,这对我的用例不起作用。
如果在我的项目中,这些列作为UNIX时间戳存储在50多个不同的表中,我将不得不花费大量的时间来重新设计,所以我正在寻找一种简单的方法来让这些DATETIME值存储在Kafka中,就像TIMESTAMP一样。我一直在寻找SMT,但还没有找到一个很好的解决方案。我希望我可以通过向Debezium配置添加一些参数来解决这个问题&今天到此为止。下面是一些更多的信息,以了解正在发生的事情。
Debezium配置

curl -i -X PUT -H "Content-Type:application/json" \
  http://localhost:8083/connectors/mysql-debezium-test/config \
  -d '{
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "44",
        "database.server.name": "asgard2",
        "table.whitelist": "demo.movies,demo.second_movies",
        "database.history.kafka.bootstrap.servers": "broker:29092",
        "database.history.kafka.topic": "dbhistory.demo" ,
        "decimal.handling.mode": "double",
        "include.schema.changes": "false",
        "snapshot.mode": "schema_only",
        "time.precision.mode": "adaptive",
        "transforms": "unwrap,dropTopicPrefix",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode":"rewrite",
        "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropTopicPrefix.regex":"asgard2.demo.(.*)",
        "transforms.dropTopicPrefix.replacement":"mysql2.$1",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "log.retention.hours": "120",
        "poll.interval.ms": "30000"
    }'

MySQL表+列类型示例

CREATE TABLE `movies`
(
    `movie_id`             int(11)       NOT NULL,
    `title`                varchar(256)  NOT NULL,
    `release_year`         int(11)       NOT NULL,
    `country`              varchar(256)  NOT NULL,
    `genres`               varchar(256)  NOT NULL,
    `actors`               varchar(1024) NOT NULL,
    `directors`            varchar(512)  NOT NULL,
    `composers`            varchar(256)  NOT NULL,
    `screenwriters`        varchar(256)  NOT NULL,
    `cinematographer`      varchar(256)  NOT NULL,
    `production_companies` varchar(256)  NOT NULL,
    `created_at`           TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    `created_at_datetime`  DATETIME DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`movie_id`)
);

消息存储在Kafka中

{
    "movie_id": 2,
    "title": "10,000 B.C. try 2",
    "release_year": 2008,
    "country": "United States",
    "genres": "Adventure",
    "actors": "Steven Strait|Camilla Belle|Cliff Curtis|Omar Sharif|Tim Barlow|Marco Khan|Reece Ritchie|Mo Zinal",
    "directors": "Roland Emmerich",
    "composers": "Harald Kloser|Thomas Wanker",
    "screenwriters": "Roland Emmerich|Harald Kloser|John Orloff|Matthew Sand|Robert Rodat",
    "cinematographer": "Ueli Steiger",
    "production_companies": "Warner Bros. Pictures|Legendary Pictures|Centropolis",
    "created_at": "2023-06-22T19:29:26Z",
    "created_at_datetime": 1687462166000,
    "__deleted": "false"
}

MySQL中的原始数据

有没有人想到一个快速修复的方法,不涉及更改MySQL中的实际列类型,或者必须在使用这些Kafka消息的应用程序中执行转换?任何帮助将不胜感激。谢谢!

u4dcyp6a

u4dcyp6a1#

KafkaConnect不支持DateTime类型和Timestamp类型。这就是为什么Debezium将这些数据类型转换为Long类型,您需要使用转换器来修复它或切换到debezium-jdbc-connect

相关问题