kafka connect:java.lang.illegalstateexception:没有分区的当前分配

xe55xuns  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(534)

我正在kubernetes上运行kafka connect(8-16节点,自动缩放)。我总共定义了44个连接器,每个kafka主题一个(每个主题一个分区)。这些主题是由debezium/postgresql生成的。有3个Kafka节点。每个连接器的tasks.max设置为4。我的大部分连接器(但不是每个!)有一个(总是一个)失败的任务,原因是java.lang.illegalstateexception:没有分区0的当前分配。
不是Kafka的Maven,注意;)我假设有3个kafka节点,所以3个工人做得很好,而第4个任务没有任何连接,所以它失败了。但是为什么有时有4个任务运行得很好呢?
另外,我经常会遇到“由于重新平衡而导致的操作冲突”问题,这种情况可能会持续几分钟,甚至几小时。最近我删除了所有的pod,它们自己重新启动,问题消失了,但这不是长期的解决方案。
tasks.max的建议值是多少?提前谢谢!
例外情况:

java.lang.IllegalStateException: No current assignment for partition table-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748

接收器连接器配置:

connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas   true
sanitizeTopics  true
autoCreateTables    true
topics  <topic-name>
tasks.max   3
schemaRegistryLocation  http://<ip>:8081
project <big-query-project>
maxWriteSize    10000
datasets    .*=<big-query-dataset>
task.class  com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name    <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait  1000
bufferSize  100000

它抛出了上述异常 java.lang.IllegalStateException: No current assignment for [...]

8aqjt8rx

8aqjt8rx1#

财产价值 tasks.max 取决于几个因素。最重要的是特殊的连接器。特定的连接器取决于其逻辑和 tasks.max 计算 Task ,将创建。前任。 FileStreamSourceConnector 始终创建1个任务,因此即使传递的值高于1,也只会创建一个任务。同样的情况也发生在 PostgresConnector 它和一个平行。 tasks.max 值还应取决于其他因素,如:Kafka连接模式,有多少Kafka连接示例,你的机器cpu等。
我如何理解您正在使用sourceconnector( PostgresConnector ). 源连接器不轮询来自kafka的数据。例外情况是,您发布的与某些 SinkConnector . 如果使用正在使用 SinkConnector 你的 tasks.max 不应超过分区数。如果启动的任务数超过分区数,则某些任务将处于空闲状态(状态为“正在运行”,但它们不处理数据),并且可能发生重新平衡。

相关问题