下面的代码是我的dockerfile for kafka connect jdbc和mysql驱动程序
FROM debezium/connect:1.3
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 8.0.20
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
| tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-8.0.20/mysql-connector-java-${MYSQL_DRIVER_VERSION}.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
docker build . --tag kafka kafka-connect-sink
下面是我的源代码dbjson
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}'
下面是我的目的地
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
"name": "inventory-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
"connection.user": "pavan",
"connection.password": "root",
"topics": "dbserver1.inventory.customers",
"table.name.format": "pk.customers",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
"name": "inventory-connector-sink-addresses",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
"connection.user": "pavan",
"connection.password": "root",
"topics": "dbserver1.inventory.addresses",
"table.name.format": "pk.addresses",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
有了这个配置,我需要订阅每个主题,但问题是我有100多个表要在目标数据库中复制,不管怎样,我可以在单个json配置中这样做,以便我可以订阅所有主题。
1条答案
按热度按时间kt06eoxx1#
你可以用
topics
(或topics.regex
)属性来定义要使用的主题列表,以及table.name.format
jbdc接收器连接器或RegexRouter
smt(或合并它们)以覆盖目标表名称: