Debezium MYSQL连接器在初始快照期间超时

rvpgvaaj  于 2023-01-25  发布在  Mysql
关注(0)|答案(1)|浏览(265)

我们已经在Kafka Connect上部署了Debezium MYSQL Source连接器来从MYSQL表中传输数据,这些连接器对于非常大的表(3400万条记录)工作良好。
问题出在这张表上,它每秒都在进行插入和更新,并且有大约2000万条记录。Debezium连接器在此表的初始快照期间面临超时问题。记录数不可能是问题,因为我们已经成功地为较大的表传输数据,但它们每天只在2小时的时间窗口内更新。我们面临问题的表每秒更新一次(插入和更新都发生)。我们尝试将snapshot.timeout.ms从10000ms增加到60000ms,但没有解决问题。
连接器配置文件:

{
  "name": "DeviceUploadRecordSourceConnector_v1",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.hostname": "hostname",
    "database.port": "3306",
    "database.user": "username",
    "database.password": "${file:/etc/kcsecrets/hostname:username}",
    "database.server.name": "dbname",
    "time.precision.mode": "adaptive_time_microseconds",
    "database.history.kafka.bootstrap.servers": "kafka-servers",
    "database.history.kafka.topic": "schema-changes.dbname",
    "column.include.list": "dbname.deviceuploadrecord.ID, dbname.deviceuploadrecord.SerialNum, dbname.deviceuploadrecord.UnProcessedFileSize, dbname.deviceuploadrecord.UnProcessedFilePath, dbname.deviceuploadrecord.ProcessedFileSize, dbname.deviceuploadrecord.FileHeaders, dbname.deviceuploadrecord.CreatedOn, dbname.deviceuploadrecord.ProcessedFilePath, dbname.deviceuploadmergerecord.ID, dbname.deviceuploadmergerecord.MergeFilePath, dbname.deviceuploadmergerecord.MergeFileSize, dbname.deviceuploadmergerecord.CreatedOn",
    "table.include.list": "dbname.deviceuploadrecord, dbname.deviceuploadmergerecord",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "snapshot.mode": "initial",
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{username}\" password=\"${file:/etc/kcsecrets/username:username}\";",
    "database.history.producer.ssl.truststore.location": "/volume/kafkaconnect/ssl/TrustStore.jks",
    "database.history.producer.ssl.truststore.password": "${file:/etc/kcsecrets/kafkatruststore:kafkatruststore}",
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.sasl.mechanism": "PLAIN",
    "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{username}\" password=\"${file:/etc/kcsecrets/username:username}\";",
    "database.history.consumer.ssl.truststore.location": "/volume/kafkaconnect/ssl/TrustStore.jks",
    "database.history.comsumer.ssl.truststore.password": "password"
  }
}

异常堆栈:

{
  "name": "DeviceUploadRecordSourceConnector_v1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.19.3.222:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "172.19.3.222:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)\n\t... 5 more\nCaused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction\n\tat com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)\n\tat com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\tat com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:763)\n\tat com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:648)\n\tat io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1446)\n\tat io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.tableLock(MySqlSnapshotChangeEventSource.java:450)\n\tat io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.readTableStructure(MySqlSnapshotChangeEventSource.java:314)\n\tat io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.readTableStructure(MySqlSnapshotChangeEventSource.java:46)\n\tat io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:116)\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)\n\t... 8 more\n"
    }
  ],
  "type": "source"
}
zu0ti5jz

zu0ti5jz1#

Debezium在初始快照期间需要表锁。我预计由于插入/更新,连接器无法获得锁。
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-snapshots
可以选择使用带有gtid的replica或跳过初始快照。

相关问题