我正在尝试使用kafka对mysql数据库进行审计跟踪,系统应检测到源数据库中的任何更改,并相应地在相关表的目标数据库中插入一条记录,对该特定表进行审计跟踪。我目前正在使用debezium for cdc和jdbc接收器/源连接器。所需的用例是:
如果在源数据库的表students中插入一条记录,则应在students\u trail destination数据库中插入一行,其中包含该行的旧值和新值
更新和删除时相同
源数据库中有多个表,以上同样适用于所有表
目前,我有以下源和接收器连接器配置,可以将源数据库中发生的任何更改复制到目标数据库中,比如插入将插入同一行,而更新将更新该行,由于kafka的新手知识有限,我们如何实现上述审计日志记录场景。谢谢
@source.json文件:
{
"name": "jdbc-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "",
"database.password": "",
"database.server.name": "localhost",
"database.whitelist": "dbname",
"database.server.id": "1234",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.topicname",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
@sink.json:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "",
"connection.url": "jdbc:mysql://localhost:3306",
"connection.user": "",
"connection.password": "",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"insert.mode": "upsert",
"auto.create": "true",
"delete.enabled": "true",
"auto.evolve": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
更新:
假设这是一个源数据库表(student):
id name birth_date
1 den 2001-09-12
2 jeff 2002-09-02
如果我在这里插入新行,例如 3 foo 1999-09-28
然后,当前设置将在相关的自动创建表的dest db中插入同一行,但我希望在自定义模式表(例如,在本例中为students\u trail)中插入before和after状态值(josn表示),它们的列(id、student\u id、before、after、timestamp)可以是自预创建的或自动创建的,无论哪一个有效。在这种插入情况下,应在dest table中创建新行,并应如下所示:
学生之路
id student_id before after timestamp
98 3 null "3, foo, 1999-09-28" current_timestamp
如果插入前的状态为空,删除后的状态为空,更新前和更新后的状态都存在,并且对于每个操作,在相关的dest表中插入一行。
暂无答案!
目前还没有任何答案,快来回答吧!