我的dockerfile,我用来下载sink connector和mysql jdbc驱动程序
FROM debezium/connect
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 5.1.39
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
| tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
我在当前的工作目录中保留了这个没有扩展名dockerfile的文件,并使用下面的命令创建docker映像
docker build . --tag kafka-connect-sink
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
docker run -it --rm --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h192.168.99.102 -P3307 -uroot -pdebezium'
docker run -it --rm --name mysqldest -p 3308:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql
docker run -it --rm --name mysqltermdest --link mysqldest --rm mysql:5.7 sh -c 'exec mysql -h192.168.99.102 -P3308 -uroot -pdebezium'
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql --link mysqldest:mysqldest kafka-connect-sink
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.test" } }'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{ "name": "inventory-connector-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysqldest:3306/inventory?useSSL=false","connection.user": "root", "connection.password": "debezium", "topics": "dbserver1.inventory.customers", "table.name.format": "inventory.customers" ,"auto.create": "true" ,"auto.evolve":"true","delete.enabled":"true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key" ,"transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope" } }'
注意:要删除不需要的配置文件,我们可以在下面使用
curl -X DELETE 192.168.99.102:8083/connectors/inventory-connector-sink
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k dbserver1.inventory.customers
一切正常,唯一的问题是,如果我删除了源数据库中的任何一行,它在目标数据库事件中不起作用,尽管我使用delete.enable作为true,pk.mode作为record\u键
1条答案
按热度按时间uujelgoq1#
请任意一套https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-新记录状态删除墓碑到
true
或者https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-新记录状态删除处理模式none
.这两个选项中的任何一个都将确保接收器连接器将接收tombstone事件,该事件是delete的指示符。当不存在时,删除记录被过滤掉。