关闭。这个问题需要细节或清晰。它目前不接受答案。**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。
9个月前关门了。改进这个问题我想将SQLServer列名转换为小写,同时将其存储在kafka主题中。我使用debezium作为我的源连接器
kdfy810k1#
这可以通过使用jeremycustenborder的kafka-connect-common转换来完成sql server表:
Id Name Description Weight Pro_Id 101 aaa Sample_Test 3.14 2020-02-21 13:32:06.5900000 102 eee testdata1 3.14 2020-02-21 13:32:06.5900000
步骤1:从这个链接下载jeremy custenborder在confluent hub中提供的kafka connect公共转换jar文件步骤2:根据您的kafka环境,将jar文件放在/usr/share/java或/kafka/libs中步骤3:创建debezium sql server源连接器
{ "name": "sqlserver_src_connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.server.name": "sqlserver", "database.hostname": "*.*.*.*", "database.port": "1433", "database.user": "username", "database.password": "password", "database.dbname": "db_name", "table.whitelist": "dbo.tablename", "transforms": "unwrap,changeCase", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value", "transforms.changeCase.from" : "UPPER_UNDERSCORE", "transforms.changeCase.to" : "LOWER_UNDERSCORE", "database.history.kafka.bootstrap.servers": "*.*.*.*", "database.history.kafka.topic": "schema-changes-tablename" } }
第四步:Kafka主题数据
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "name" }, { "type": "string", "optional": true, "field": "description" }, { "type": "double", "optional": true, "field": "weight" }, { "type": "int64", "optional": false, "name": "io.debezium.time.NanoTimestamp", "version": 1, "field": "pro_id" } ], "optional": true, "name": "sqlserver.dbo.tablename" }, "payload": { "id": 101, "name": "aaa", "description": "Sample_Test", "weight": 3.14, "pro_id": 1582291926590000000 } } { "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "name" }, { "type": "string", "optional": true, "field": "description" }, { "type": "double", "optional": true, "field": "weight" }, { "type": "int64", "optional": false, "name": "io.debezium.time.NanoTimestamp", "version": 1, "field": "pro_id" } ], "optional": true, "name": "sqlserver.dbo.tablename" }, "payload": { "id": 102, "name": "eee", "description": "testdata1", "weight": 3.14, "pro_id": 1582291926590000000 } }
感谢来自debezium社区的jiri pechanec和chris cranford@naros的帮助
1条答案
按热度按时间kdfy810k1#
这可以通过使用jeremycustenborder的kafka-connect-common转换来完成
sql server表:
步骤1:从这个链接下载jeremy custenborder在confluent hub中提供的kafka connect公共转换jar文件
步骤2:根据您的kafka环境,将jar文件放在/usr/share/java或/kafka/libs中
步骤3:创建debezium sql server源连接器
第四步:Kafka主题数据
感谢来自debezium社区的jiri pechanec和chris cranford@naros的帮助