我们已经在Kafka Connect上部署了Debezium MYSQL Source连接器来从MYSQL表中传输数据,这些连接器对于非常大的表(3400万条记录)工作良好。
问题出在这张表上,它每秒都在进行插入和更新,并且有大约2000万条记录。Debezium连接器在此表的初始快照期间面临超时问题。记录数不可能是问题,因为我们已经成功地为较大的表传输数据,但它们每天只在2小时的时间窗口内更新。我们面临问题的表每秒更新一次(插入和更新都发生)。我们尝试将snapshot.timeout.ms从10000ms增加到60000ms,但没有解决问题。
连接器配置文件:
{
"name": "DeviceUploadRecordSourceConnector_v1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.hostname": "hostname",
"database.port": "3306",
"database.user": "username",
"database.password": "${file:/etc/kcsecrets/hostname:username}",
"database.server.name": "dbname",
"time.precision.mode": "adaptive_time_microseconds",
"database.history.kafka.bootstrap.servers": "kafka-servers",
"database.history.kafka.topic": "schema-changes.dbname",
"column.include.list": "dbname.deviceuploadrecord.ID, dbname.deviceuploadrecord.SerialNum, dbname.deviceuploadrecord.UnProcessedFileSize, dbname.deviceuploadrecord.UnProcessedFilePath, dbname.deviceuploadrecord.ProcessedFileSize, dbname.deviceuploadrecord.FileHeaders, dbname.deviceuploadrecord.CreatedOn, dbname.deviceuploadrecord.ProcessedFilePath, dbname.deviceuploadmergerecord.ID, dbname.deviceuploadmergerecord.MergeFilePath, dbname.deviceuploadmergerecord.MergeFileSize, dbname.deviceuploadmergerecord.CreatedOn",
"table.include.list": "dbname.deviceuploadrecord, dbname.deviceuploadmergerecord",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{username}\" password=\"${file:/etc/kcsecrets/username:username}\";",
"database.history.producer.ssl.truststore.location": "/volume/kafkaconnect/ssl/TrustStore.jks",
"database.history.producer.ssl.truststore.password": "${file:/etc/kcsecrets/kafkatruststore:kafkatruststore}",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{username}\" password=\"${file:/etc/kcsecrets/username:username}\";",
"database.history.consumer.ssl.truststore.location": "/volume/kafkaconnect/ssl/TrustStore.jks",
"database.history.comsumer.ssl.truststore.password": "password"
}
}
异常堆栈:
{
"name": "DeviceUploadRecordSourceConnector_v1",
"connector": {
"state": "RUNNING",
"worker_id": "172.19.3.222:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.19.3.222:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)\n\t... 5 more\nCaused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction\n\tat com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)\n\tat com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\tat com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:763)\n\tat com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:648)\n\tat io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1446)\n\tat io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.tableLock(MySqlSnapshotChangeEventSource.java:450)\n\tat io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.readTableStructure(MySqlSnapshotChangeEventSource.java:314)\n\tat io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.readTableStructure(MySqlSnapshotChangeEventSource.java:46)\n\tat io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:116)\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)\n\t... 8 more\n"
}
],
"type": "source"
}
1条答案
按热度按时间zu0ti5jz1#
Debezium在初始快照期间需要表锁。我预计由于插入/更新,连接器无法获得锁。
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-snapshots
可以选择使用带有gtid的replica或跳过初始快照。