我有Kafka连接(版本 2.1.1-cp1
)与Kafka合作( 2.0.1-cp4
)运行大约70个连接器,工作量相当大。有时(每2-3周),一些节点会突然开始获取以下日志,然后干脆停止工作:
INFO [Worker clientId=connect-1, groupId=dwh-prod] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO [Worker clientId=connect-1, groupId=dwh-prod] Successfully joined group with generation 2288 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-8532b028-281c-4aca-8440-c4c999812158', leaderUrl='http://10.36.3.136:8083/', offset=2811, connectorIds=[<redacted>]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 is behind group assignment 2811, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Finished reading to end of log and updated config snapshot, new config log offset: -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 does not match group assignment 2811. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
我可以看到从configbackingstore获取configstate(偏移量)可能有问题,不知道为什么https://github.com/apache/kafka/blob/2.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/distributedherder.java#l823
它看起来也像是从新节点被繁殖开始的(它被托管在kubernetes中,带有垂直pod autoscaler),并且持续发生数小时,无限期。
该解决方案适用于k8s,具有 confluentinc/cp-kafka-connect
```
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: INFO - name: CONNECT_BOOTSTRAP_SERVERS
value:***:9092 - name: CONNECT_ZOOKEEPER_CONNECT
value:***:2181 - name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://***:8081 - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://***:8081 - name: CONNECT_GROUP_ID
value: dwh-prod - name: CONNECT_STATUS_STORAGE_TOPIC
value: dwh-prod-status - name: CONNECT_CONFIG_STORAGE_TOPIC
value: dwh-prod-configs - name: CONNECT_OFFSET_STORAGE_TOPIC
value: dwh-prod-offsets - name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
value: "10000" - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1" - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "1" - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1" - name: CONNECT_REST_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP - name: CONNECT_REST_PORT
value: "8083" - name: CONNECT_PLUGIN_PATH
value: /usr/share/java,/usr/share/confluent-hub-components,/usr/share/landoop-plugins - name: CONNECT_INTERNAL_KEY_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter - name: CONNECT_INTERNAL_VALUE_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter - name: CONNECT_KEY_CONVERTER
value: io.confluent.connect.avro.AvroConverter - name: CONNECT_VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter - name: CONNECT_LOG4J_LOGGERS
value: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR - name: CLASSPATH
value: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
有什么想法或提示吗?
1条答案
按热度按时间iugsix8n1#
apache kafka connect在分布式模式下使用
KafkaConfigBackingStore
作为实施ConfigBackingStore
.KafkaConfigBackingStore
在kafka主题中提供kafka connect连接器配置的持久存储。根据文件:
对于生产系统,此主题的复制因子应始终至少为3,但不能大于群集中kafka代理的数量。
我不确定,但这种情况可能会导致您的问题:由于某种原因(如k8s行为),具有配置数据的单个副本的代理变得不可用,因此kafka connect无法获取当前的配置状态。
请尝试增加的复制因子
config.storage.topic
(从1到3)。还要检查其他配置主题的回复因子(offset.storage.replication.factor
,status.storage.replication.factor
).