我正在尝试设置kafka connect bigquery接收器连接器。我有大约50多个Kafka主题。部署连接器时,最初10个表(我认为这是连接器配置中的threadpoolsize默认值)显示一些数据。然后,新数据就不再出现在表中。此外,bigquery中没有显示新表。
我的连接器配置:
{
"name": "kcbq-connect1",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic2",
"sanitizeTopics" : "true",
"autoCreateTables" : "true",
"autoUpdateSchemas" : "true",
"schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
"schemaRegistryLocation":"http://localhost:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"bufferSize": "100000",
"maxWriteSize":"10000",
"tableWriteWait": "1000",
"project" : "my-project-89507",
"datasets" : ".*=my_cdc",
"keyfile" : "/home/debezium/key.json"
}
}
这是我查询连接器状态时得到的结果:
{
"name": "kcbq-connect1",
"connector": {
"state": "RUNNING",
"worker_id": "10.1.0.37:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.1.0.37:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail\n\tat com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:112)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:190)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\n"
}
],
"type": "sink"
}
跟踪日志:
[2020-12-05 13:07:22,399] WARN WorkerSinkTask{id=kcbq-connect1-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask:389)
[2020-12-05 13:07:22,399] ERROR WorkerSinkTask{id=kcbq-connect1-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
java.util.concurrent.RejectedExecutionException: Task com.wepay.kafka.connect.bigquery.write.batch.CountDownRunnable@6231b75c rejected from com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor@4ebc3f40[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 65]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:92)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:129)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:386)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:312)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:751)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:964)
at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:171)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-12-05 13:07:22,400] INFO [Consumer clientId=connector-consumer-kcbq-connect1-0, groupId=connect-kcbq-connect1] Member connector-consumer-kcbq-connect1-0-f7533028-4e1a-4492-9f56-9b2d7bc1bc4e sending LeaveGroup request to coordinator <hostname>:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1005)
Kafka的Maven们能给我正确的方向来解决这个问题吗?谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!