我有一个具体的用例,我每天都要重新创建一个Kafka集群。因此,bootstrap.servers
地址(我已经配置了,并且不会更改)每天都会通过CNAME DNS记录指向不同的代理地址。(例如,星期一b1.mycluster.internal
-> b1.mskdev.q1w2e3.c3.aws.com
,然后星期二b1.mskdev.a4s5d6.c7.aws.com
)
我注意到,一旦集群离线,我正在使用的Kafka消费者应用程序将尝试重新连接到已解析的CNAME,而不是bootstrap.servers
中配置的地址。为了重用上面的示例,消费者一直尝试重新建立到b1.mskdev.q1w2e3.c3.aws.com
的连接,但我实际上需要的是它重新连接到新的代理b1.mskdev.a4s5d6.c7.aws.com
,这是初始配置的bootstrap.servers=b1.mycluster.internal
的新CNAME记录。
我正在使用常用的kafka-client Java库,并使用所描述的引导服务器集初始化KafkaConsumer
。一旦集群离线,日志中就会填充与引导服务器后面的CNAME连接有关的错误,而不是引导服务器本身。使用默认的reconnect属性,消费者尝试无限期地重新连接到过期的代理地址,因此唯一的解决方案是重新启动消费者,以便通过引导服务器地址解析新的CNAME地址。
1条答案
按热度按时间fnx2tebb1#
Kafka客户端只在启动时使用
bootstrap.servers
,在此之后,它们会发现所有的代理,并根据获取的元数据缓存这些信息。由于Kafka不知道您的CNAME,它将返回内部DNS名称。为了控制返回的元数据,即返回CNAME而不是解析的名称。您可以设置
advertised.listeners
,这将导致元数据API返回侦听器地址作为您的CNAME,而不是内部地址。看到
请注意,通过错误地设置
advertised.listeners
很容易破坏Kafka。您应该只为客户端侦听器(CLIENT
或CLIENT_SECURE
)设置此设置,而不要触及REPLICATION
等,否则可能会破坏内部复制。我建议你在静态配置中设置这个,这样它就可以很容易地回滚或更新。
虽然这可以通过动态配置来完成,但它仍然需要代理重新启动才能生效。如果你在动态中破坏它,你需要直接从ZK中删除它。
如果操作正确,使用MSK-P是安全的,请参阅他们的Goldman Sachs博客(并非所有相关,只是广告.listeners部分)https://aws.amazon.com/blogs/big-data/how-goldman-sachs-builds-cross-account-connectivity-to-their-amazon-msk-clusters-with-aws-privatelink/