我有一个Kafka环境,有2个经纪人和1个Zookeeper。
当我试图向kafka生成消息时,如果我停止broker 1(它是leader 1),客户机将停止生成消息并给出以下错误,尽管broker 2被选为主题和部分的新leader。
org.apache.kafka.common.errors.timeoutexception:在60000毫秒后更新元数据失败。
10分钟后,由于broker2是新的领导者,我希望生产者将数据发送给broker2,但它继续失败,给出了上述异常。lastrefreshms和lastsuccessfullrefreshms仍然相同,尽管生产者的metadataexpirems是300000。
我在producer端使用kafka新producer实现。
看起来,当producer启动时,它绑定到一个代理,如果该代理宕机,它甚至不会尝试连接到集群中的另一个代理。
但我的期望是,如果一个代理宕机,它应该直接检查另一个可用代理的元数据,并向它们发送数据。
顺便说一句,我的主题是4分区,复制因子是2。提供这些信息以防有意义。
配置参数。
{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}
用例:
1-启动br1和br2生成数据(引线为br1)
2-停止br2生成数据(精细)
3-停止br1(这意味着此时集群中没有活动的工作代理),然后启动br2并生成数据(虽然leader是br2,但失败)
4-启动br1生成数据(leader仍然是br2,但数据生成很精细)
5-停止br2(现在br1是leader)
6-停止br1(br1仍然领先)
7-启动br1生成数据(消息再次生成)
如果生产商将最新的成功数据发送给br1,然后所有经纪人都倒下,生产商希望br1会重新站起来,尽管br2已经站起来,成为新的领导者。这是预期的行为吗?
3条答案
按热度按时间kmb7vmvb1#
你需要增加重试次数。在您的情况下,需要将其设置为>=5。
这是让你的制作人知道你的集群有了新的领导者的唯一方法。
除此之外,请确保所有代理都有分区的副本。否则你就找不到新的领导了。
eagi6jfj2#
我花了几个小时才弄明白Kafka在我这种情况下的行为。可能这是一个错误,也可能这需要这样做的原因是隐藏在引擎盖,但实际上,如果我要这样做,我不会这样做:)
当所有代理都失败时,如果您只能启动一个代理,那么必须是最后一个失败的代理才能成功生成消息。
假设你有5个经纪人;br1、br2、br3、br4和br5。如果一切都失败了,如果最后一个死去的代理是br3(它是最后一个领头羊),尽管您启动了所有代理br1、br2、br4和br5,但除非您启动br3,否则就没有任何意义。
kzipqqlq3#
在最新的kafka版本中,当一个代理关闭时,它有一个由生产者使用的leader分区。生产者将重试,直到捕获可重试的异常,然后生产者需要更新元数据。可以从leastloadnode获取新的元数据。所以新的领导将会被更新,制片人可以在那里写作。