Kafka连接:不是无法恢复工作后,最后一次重新平衡

r7knjye2  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(656)

我有Kafka连接(版本 2.1.1-cp1 )与Kafka合作( 2.0.1-cp4 )运行大约70个连接器,工作量相当大。有时(每2-3周),一些节点会突然开始获取以下日志,然后干脆停止工作:

INFO [Worker clientId=connect-1, groupId=dwh-prod] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO [Worker clientId=connect-1, groupId=dwh-prod] Successfully joined group with generation 2288 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-8532b028-281c-4aca-8440-c4c999812158', leaderUrl='http://10.36.3.136:8083/', offset=2811, connectorIds=[<redacted>]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 is behind group assignment 2811, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Finished reading to end of log and updated config snapshot, new config log offset: -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 does not match group assignment 2811. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

我可以看到从configbackingstore获取configstate(偏移量)可能有问题,不知道为什么https://github.com/apache/kafka/blob/2.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/distributedherder.java#l823
它看起来也像是从新节点被繁殖开始的(它被托管在kubernetes中,带有垂直pod autoscaler),并且持续发生数小时,无限期。
该解决方案适用于k8s,具有 confluentinc/cp-kafka-connect ```

  • name: CONNECT_LOG4J_ROOT_LOGLEVEL
    value: INFO
  • name: CONNECT_BOOTSTRAP_SERVERS
    value:***:9092
  • name: CONNECT_ZOOKEEPER_CONNECT
    value:***:2181
  • name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
    value: http://***:8081
  • name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
    value: http://***:8081
  • name: CONNECT_GROUP_ID
    value: dwh-prod
  • name: CONNECT_STATUS_STORAGE_TOPIC
    value: dwh-prod-status
  • name: CONNECT_CONFIG_STORAGE_TOPIC
    value: dwh-prod-configs
  • name: CONNECT_OFFSET_STORAGE_TOPIC
    value: dwh-prod-offsets
  • name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
    value: "10000"
  • name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
    value: "1"
  • name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
    value: "1"
  • name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
    value: "1"
  • name: CONNECT_REST_ADVERTISED_HOST_NAME
    valueFrom:
    fieldRef:
    fieldPath: status.podIP
  • name: CONNECT_REST_PORT
    value: "8083"
  • name: CONNECT_PLUGIN_PATH
    value: /usr/share/java,/usr/share/confluent-hub-components,/usr/share/landoop-plugins
  • name: CONNECT_INTERNAL_KEY_CONVERTER
    value: org.apache.kafka.connect.json.JsonConverter
  • name: CONNECT_INTERNAL_VALUE_CONVERTER
    value: org.apache.kafka.connect.json.JsonConverter
  • name: CONNECT_KEY_CONVERTER
    value: io.confluent.connect.avro.AvroConverter
  • name: CONNECT_VALUE_CONVERTER
    value: io.confluent.connect.avro.AvroConverter
  • name: CONNECT_LOG4J_LOGGERS
    value: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
  • name: CLASSPATH
    value: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
有什么想法或提示吗?
iugsix8n

iugsix8n1#

apache kafka connect在分布式模式下使用 KafkaConfigBackingStore 作为实施 ConfigBackingStore . KafkaConfigBackingStore 在kafka主题中提供kafka connect连接器配置的持久存储。
根据文件:
对于生产系统,此主题的复制因子应始终至少为3,但不能大于群集中kafka代理的数量。
我不确定,但这种情况可能会导致您的问题:由于某种原因(如k8s行为),具有配置数据的单个副本的代理变得不可用,因此kafka connect无法获取当前的配置状态。
请尝试增加的复制因子 config.storage.topic (从1到3)。还要检查其他配置主题的回复因子( offset.storage.replication.factor , status.storage.replication.factor ).

相关问题