假设kafka主题(我的主题)有8个分区,我有一个侦听器组(我的主题组),由8到10个不同的进程在不同的机器上运行。某个特定分区(my-topic-2)没有被任何侦听器使用。
下面是kafaproducer(jar-1)的代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
KafkaProducer producer ;
.....
producer.send(new ProducerRecord('my-topic', student_id % 8, null, payload));
下面是spring中kafa侦听器配置的代码(jar-2)
@EnableKafka
@Configuration
public class SpringBootKafka {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
String servers ;
.....
props.put("bootstrap.servers", servers);
props.put("group.id", "my-topic-group");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
props.put("auto.offset.reset", "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Kafka听众(jar-2)
@Component
public class EventsReceiver {
@KafkaListener(topics = "my-topic")
public void receive(ConsumerRecord<String, String> consumerRecord) {
String message = consumerRecord.value();
}
}
最初,jar-2部署在一台机器(docker)中,慢慢地,我们将pod的数量增加到10个。没有人在听我的主题分区2。不止一个人在听my-topic-partition-7。所以我错过了一些Kafka事件的听众。
2条答案
按热度按时间l2osamch1#
看看日志;你应该看到这样的例子:
v2g6jxz62#
我发现这个问题,它运行在两个不同的数据中心,都指向同一个Kafka集群。分区2被分配给第二个数据中心。