我们有3个kafka(1.0.0)节点,一个主题有4个分区和3个副本。主题通常如下所示:
Topic:MissionControlTopic PartitionCount:4 ReplicationFactor:3 Configs:
Topic: MissionControlTopic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 2,1,0
Topic: MissionControlTopic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
Topic: MissionControlTopic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
Topic: MissionControlTopic Partition: 3 Leader: 0 Replicas: 0,2,1 Isr: 2,1,0
每隔一段时间,节点0就会停止响应(这是一个问题,但不是问题所在)。当发生这种情况时,其他两个节点将正确地接管其分区,主题如下所示:
Topic:MissionControlTopic PartitionCount:4 ReplicationFactor:3 Configs:
Topic: MissionControlTopic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: MissionControlTopic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 2,1
Topic: MissionControlTopic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,1
Topic: MissionControlTopic Partition: 3 Leader: 2 Replicas: 0,2,1 Isr: 2,1
在这一点上,大多数(但不是所有)生产者和消费者不能写/读Kafka和保持日志记录 LEADER_NOT_AVAILABLE
例外情况(第一期)。一旦节点0被还原并且领导者被重新平衡,应用程序仍然会记录异常(第二个问题)。只有在应用程序重新启动后,它们才能重新连接并开始正常工作。正如您所想象的,每当kafka节点出现问题时,重新启动所有应用程序是非常不切实际的。
我不确定这里有什么有用的信息来解决这个问题。我们在互联网上搜寻信息,但没有发现任何迹象表明我们的配置有任何明显的错误。我甚至在本地重现了这个问题,一旦节点恢复,应用程序就会正确地重新连接。
这是写给Kafka的代码:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.getCanonicalName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericEventSerializer.getCanonicalName());
kafkaProducer = new KafkaProducer<>(properties);
// And at some later point...
kafkaProducer.send(new ProducerRecord<>(TOPIC, event), (metadata, exception) -> {
if (exception != null)
{
LOGGER.error("Failed to write to Kafka", exception);
}
});
这是从中读取的代码:
Properties props = new Properties();
props.put("enable.auto.commit", false);
props.put("bootstrap.servers", kafkaHostString);
props.put("group.id", consumerGroupId);
props.put("request.timeout.ms", 15000);
props.put("session.timeout.ms", 10000);
props.put("max.poll.records", 10000);
props.put("batch.size", 6400000);
Consumer<String, GenericEvent> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new GenericEventDeserializer());
consumer.subscribe(Collections.singleton(topic));
// And at some later point ...
records = consumer.poll(pollTimeout);
consumer.commitSync();
``` `advertised.host.name` , `advertised.port` ,和 `advertised.listeners` 都准备好了 `server.properties` .
暂无答案!
目前还没有任何答案,快来回答吧!