我想了解如何从生产者写入多个分区?我做对了吗?是不是很容易做决定?我的主题有两个部分。我想读相同的消息类型等于消费者
如何在消费者中接收相同的消息,而不需要在生产者和消费者中指定分区号?
谢谢制片人
@Service
public class Producer {
private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic,0, "1", message);
kafkaTemplate.send(topic, 1, "1", message);
}
}
字符串
消费者1
@Service
public class Consumer {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(
groupId = "reflectoring-group-3",
topicPartitions = @TopicPartition(
topic = "${app.topic.foo}",
partitionOffsets = { @PartitionOffset(
partition = "0",
initialOffset = "0") }))
void commonListenerForMultipleTopics(String message) {
LOG.info("MultipleTopicListener - {}", message);
}
}
型
消费者2
@Service
public class Consumer2 {
private static final Logger LOG = LoggerFactory.getLogger(Consumer2.class);
@KafkaListener(
groupId = "reflectoring-group-3",
topicPartitions = @TopicPartition(
topic = "${app.topic.foo}",
partitionOffsets = { @PartitionOffset(
partition = "1",
initialOffset = "0") }))
void commonListenerForMultipleTopics(String message) {
LOG.info("MultipleTopicListener - {}", message);
}
}
型
1条答案
按热度按时间z9ju0rcb1#
我做对了吗?
你不必要地重复了主题中的信息,但确实如此。例如,您可以使用
kafka-console-consumer --partition
CLI进行检查。在消费者中接收相同的消息,而不指定分区号
例如,如果您设置了唯一的groupId,那么您将从所有分区获取事件。或者,如果删除
parititonOffsets
并使用Message
或ConsumerRecord
参数而不是String,则仍需要手动过滤message.partition() == X