关闭期间偏移提交失败

ee7vknir  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(392)

我正在尝试设置kafka connect bigquery接收器连接器。我有大约50多个Kafka主题。部署连接器时,最初10个表(我认为这是连接器配置中的threadpoolsize默认值)显示一些数据。然后,新数据就不再出现在表中。此外,bigquery中没有显示新表。
我的连接器配置:

{
   "name": "kcbq-connect1",
   "config": {
     "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
     "tasks.max" : "1",
     "topics" : "topic1,topic2,topic2",
     "sanitizeTopics" : "true",
     "autoCreateTables" : "true",
     "autoUpdateSchemas" : "true",     
     "schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
     "schemaRegistryLocation":"http://localhost:8081",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "bufferSize": "100000",
    "maxWriteSize":"10000",
    "tableWriteWait": "1000",
     "project" : "my-project-89507",
     "datasets" : ".*=my_cdc",
     "keyfile" : "/home/debezium/key.json"

   }
 }

这是我查询连接器状态时得到的结果:

{
  "name": "kcbq-connect1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.1.0.37:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.1.0.37:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail\n\tat com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:112)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:190)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\n"
    }
  ],
  "type": "sink"
}

跟踪日志:

[2020-12-05 13:07:22,399] WARN WorkerSinkTask{id=kcbq-connect1-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask:389)
[2020-12-05 13:07:22,399] ERROR WorkerSinkTask{id=kcbq-connect1-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
java.util.concurrent.RejectedExecutionException: Task com.wepay.kafka.connect.bigquery.write.batch.CountDownRunnable@6231b75c rejected from com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor@4ebc3f40[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 65]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:92)
        at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:129)
        at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:386)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:312)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:751)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290)
        at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:964)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:171)
        at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-12-05 13:07:22,400] INFO [Consumer clientId=connector-consumer-kcbq-connect1-0, groupId=connect-kcbq-connect1] Member connector-consumer-kcbq-connect1-0-f7533028-4e1a-4492-9f56-9b2d7bc1bc4e sending LeaveGroup request to coordinator <hostname>:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1005)

Kafka的Maven们能给我正确的方向来解决这个问题吗?谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题