我正在使用Debezium + Kafka在MySQL数据库上执行CDC。一切正常,但我遇到了一个问题,MySQLMap到Kafka的TIMESTAMP
和DATETIME
列格式。
对于任何TIMESTAMP
列,它们在Kafka中以"2023-06-22T19:29:26Z"
存储,这非常适合我的用例,并且工作正常。但是,任何DATETIME
列都存储为UNIX时间戳1687462166000
,这对我的用例不起作用。
如果在我的项目中,这些列作为UNIX时间戳存储在50多个不同的表中,我将不得不花费大量的时间来重新设计,所以我正在寻找一种简单的方法来让这些DATETIME
值存储在Kafka中,就像TIMESTAMP
一样。我一直在寻找SMT,但还没有找到一个很好的解决方案。我希望我可以通过向Debezium配置添加一些参数来解决这个问题&今天到此为止。下面是一些更多的信息,以了解正在发生的事情。
Debezium配置
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/mysql-debezium-test/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "44",
"database.server.name": "asgard2",
"table.whitelist": "demo.movies,demo.second_movies",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "false",
"snapshot.mode": "schema_only",
"time.precision.mode": "adaptive",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard2.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"mysql2.$1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"log.retention.hours": "120",
"poll.interval.ms": "30000"
}'
MySQL表+列类型示例
CREATE TABLE `movies`
(
`movie_id` int(11) NOT NULL,
`title` varchar(256) NOT NULL,
`release_year` int(11) NOT NULL,
`country` varchar(256) NOT NULL,
`genres` varchar(256) NOT NULL,
`actors` varchar(1024) NOT NULL,
`directors` varchar(512) NOT NULL,
`composers` varchar(256) NOT NULL,
`screenwriters` varchar(256) NOT NULL,
`cinematographer` varchar(256) NOT NULL,
`production_companies` varchar(256) NOT NULL,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`created_at_datetime` DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`movie_id`)
);
消息存储在Kafka中
{
"movie_id": 2,
"title": "10,000 B.C. try 2",
"release_year": 2008,
"country": "United States",
"genres": "Adventure",
"actors": "Steven Strait|Camilla Belle|Cliff Curtis|Omar Sharif|Tim Barlow|Marco Khan|Reece Ritchie|Mo Zinal",
"directors": "Roland Emmerich",
"composers": "Harald Kloser|Thomas Wanker",
"screenwriters": "Roland Emmerich|Harald Kloser|John Orloff|Matthew Sand|Robert Rodat",
"cinematographer": "Ueli Steiger",
"production_companies": "Warner Bros. Pictures|Legendary Pictures|Centropolis",
"created_at": "2023-06-22T19:29:26Z",
"created_at_datetime": 1687462166000,
"__deleted": "false"
}
MySQL中的原始数据
有没有人想到一个快速修复的方法,不涉及更改MySQL中的实际列类型,或者必须在使用这些Kafka消息的应用程序中执行转换?任何帮助将不胜感激。谢谢!
1条答案
按热度按时间u4dcyp6a1#
KafkaConnect不支持
DateTime
类型和Timestamp
类型。这就是为什么Debezium
将这些数据类型转换为Long
类型,您需要使用转换器来修复它或切换到debezium-jdbc-connect