向Kafka发送消息到多个分区并监听多个相同的消费者

eyh26e7m  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(157)

我想了解如何从生产者写入多个分区?我做对了吗?是不是很容易做决定?我的主题有两个部分。我想读相同的消息类型等于消费者
如何在消费者中接收相同的消息,而不需要在生产者和消费者中指定分区号?
谢谢制片人

@Service
public class Producer {
   private static final Logger LOG = LoggerFactory.getLogger(Producer.class);

   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;

   @Value("${app.topic.foo}")
   private String topic;

   public void send(String message){
      LOG.info("sending message='{}' to topic='{}'", message, topic);
      kafkaTemplate.send(topic,0, "1",  message);
      kafkaTemplate.send(topic, 1, "1", message);
   }
}

字符串
消费者1

@Service
public class Consumer {

  private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);

  @KafkaListener(
          groupId = "reflectoring-group-3",
          topicPartitions = @TopicPartition(
                  topic = "${app.topic.foo}",
                  partitionOffsets = { @PartitionOffset(
                          partition = "0",
                          initialOffset = "0") }))
  void commonListenerForMultipleTopics(String message) {
      LOG.info("MultipleTopicListener - {}", message);
  }
}


消费者2

@Service
public class Consumer2 {
  private static final Logger LOG = LoggerFactory.getLogger(Consumer2.class);

  @KafkaListener(
          groupId = "reflectoring-group-3",
          topicPartitions = @TopicPartition(
                  topic = "${app.topic.foo}",
                  partitionOffsets = { @PartitionOffset(
                          partition = "1",
                          initialOffset = "0") }))
  void commonListenerForMultipleTopics(String message) {
      LOG.info("MultipleTopicListener - {}", message);
  }
}

z9ju0rcb

z9ju0rcb1#

我做对了吗?
你不必要地重复了主题中的信息,但确实如此。例如,您可以使用kafka-console-consumer --partition CLI进行检查。
在消费者中接收相同的消息,而不指定分区号
例如,如果您设置了唯一的groupId,那么您将从所有分区获取事件。或者,如果删除parititonOffsets并使用MessageConsumerRecord参数而不是String,则仍需要手动过滤message.partition() == X

相关问题