kafka连接分布式模式组协调器不可用

50few1ms  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(947)

我已经试了两个星期了,我在不同的机器上运行kafka集群,而不是连接节点。我无法使连接正常运行。我可以读写Kafka没有问题。Zookeeper似乎运转良好。
我启动连接:

$ bin/connect-distributed connect-distributed.properties

connect不断循环此错误:

[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)

以下是我的connect-distributed.properties的外观:

bootstrap.servers=172.25.1.2:9092,172.25.1.3:9092,172.25.1.4:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=172.25.1.5
rest.port=8083

heartbeat.interval.ms=3000
session.timeout.ms=30000
security.protocol=PLAINTEXT
client.id=c1

plugin.path=/usr/share/java

__消费者主题如下所示:

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets                                       
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    Topic: __consumer_offsets       Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 1    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
    Topic: __consumer_offsets       Partition: 4    Leader: 2       Replicas: 2     Isr: 2.... etc
zrfyljdw

zrfyljdw1#

将所有kafka代理的主机名添加到/etc/hosts文件中,然后重试

wnavrhmk

wnavrhmk2#

为其他可能觉得有用的人发帖。
我也有同样的问题。。。Kafka解决了我的问题。
执行后:

service kafka status

使我的日志在不到10秒内恢复正常:

2019-11-08 14:30:19.781  INFO [-,,,] 1 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=datasources-ca-contacts] Discovered group coordinator myserver:9092 (id: 2147483647 rack: null)
xmq68pz9

xmq68pz93#

在go中编写了一个连接器之后,我遇到了同样的问题。我被迫自己解决。
当连接器连接到Kafka时,它会自动写入主题和 __offset_topics . 当连接器崩溃时,它作为协调器在这些表中留下自己的痕迹。当新连接器启动时,它会在表中找到记录并尝试与协调器通信。协调器无法响应,连接器无法工作。
你可以用两种方法来解决这个问题,删除所有的主题( connect-configs , connect-offsets , connect-status , __offset_topics )然后重新启动群集。另一种方法是从主题中删除协调器,我目前不确定如何执行。

相关问题