java—为多个主题创建一个kafka使用者

lf5gs5x2  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(347)

我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我传输订阅中主题列表的参数,如下所示:

private Consumer createConsumer() {
    Properties props = getConsumerProps();
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ArrayList<String> topicMISL = new ArrayList<>();
    for (String s:Connect2Redshift.kafkaTopics) {
        topicMISL.add(systemID + "." + s);
    }
    consumer.subscribe(topicMISL);
    return consumer;
}

private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
        try {  Long start = System.currentTimeMillis();
            ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
            }
   }

之后,我想每隔3秒将kafka的记录轮询到流中并对其进行处理,但我想知道这个消费者的内部是什么-如何轮询来自不同主题的记录-首先轮询一个主题,然后轮询另一个主题,或者并行轮询。是不是一个消息量大的主题会一直被处理,而另一个消息量小的主题会一直等待?

tuwxkamq

tuwxkamq1#

一般来说,这取决于你的主题设置。kafka通过每个主题使用多个分区来扩展。
如果在一个主题上有3个分区,Kafka可以并行地读取它们
对于多个主题也是如此,阅读可以并行进行
如果您有一个比其他分区接收更多消息的分区,那么您可能会遇到这个特定分区的使用者延迟的情况。调整批量大小和使用者设置可以帮助他们,也可以压缩消息。理想情况下,确保负载均匀分布可以避免这种情况。
看看这篇博客文章,它让我很好地理解了它的内部结构:https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

cwtwac6a

cwtwac6a2#

ConsumerRecords<String, String> records = consumer.poll(long value);
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
        for (ConsumerRecord<String, String> record : partitionRecords) {

        }

    }

还需要通过查找offset并使用consumer.commitsync进行commit for offset

相关问题