如何在kafka中使用多个使用者?

qyyhg6bp  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(375)

我是一个学习Kafka的新生,在了解多个消费者方面遇到了一些基本问题,到目前为止,文章、文档等都没有太大帮助。
我试着做的一件事是编写自己的高级Kafka生产者和消费者,并同时运行它们,向一个主题发布100条简单消息,让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入第二个使用者来使用刚刚发布的消息所指向的同一主题时,它没有收到任何消息。
我的理解是,对于每一个主题,你可以有来自不同消费群体的消费者,这些消费者群体中的每一个都会得到某个主题的消息的完整副本。是这样吗?如果没有,我应该怎样设置多个消费者?这是我到目前为止写的消费者类:

public class AlternateConsumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

    public AlternateConsumer(String topic, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", consumerGroup);
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }

    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(0);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

    }
}

此外,我注意到,最初我是针对一个只有一个分区的主题“test”测试上述消耗。当我将另一个消费者添加到一个现有的消费者组(比如“testgroup”)时,这触发了kafka重新平衡,它显著地降低了我消费的延迟,以秒为单位。我认为这是一个重新平衡的问题,因为我只有一个分区,但是当我创建了一个新的主题“multiplepartitions”,比如说6个分区时,出现了类似的问题,将更多的使用者添加到同一个使用者组会导致延迟问题。我环顾四周,人们告诉我,我应该使用多线程消费者——有人能解释一下吗?

nwsw7zdq

nwsw7zdq1#

如果希望多个使用者使用相同的消息(如广播),可以使用不同的使用者组生成它们,还可以在使用者配置中将auto.offset.reset设置为最小。如果希望多个使用者并行完成消费(在它们之间分配工作),则应创建number of partitions>=number of consumers。一个分区最多只能由一个使用者进程使用。但是一个使用者可以使用多个分区。

bwntbbo3

bwntbbo32#

我认为您的问题在于auto.offset.reset属性。当一个新的使用者从一个分区中读取数据并且没有以前提交的偏移量时,auto.offset.reset属性用于决定起始偏移量应该是什么。如果将其设置为“最大”(默认值),则从最新(最后)消息开始读取。如果将其设置为“最小”,则会收到第一条可用消息。
所以加上:

properties.put("auto.offset.reset", "smallest");

再试一次。

  • 编辑*

“最小的”和“最大的”不久前被弃用了。你现在应该用“最早的”或“最新的”。有什么问题,查一下文件

rkue9o1l

rkue9o1l3#

在这里的文档中,它说:“如果您提供的线程多于主题上的分区,那么某些线程将永远看不到消息”。你能给你的主题添加分区吗?我的使用者组线程数等于主题中的分区数,每个线程都在接收消息。
以下是我的主题配置:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
Topic:recent-wins   PartitionCount:3    ReplicationFactor:1 Configs:
Topic: recent-wins  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic: recent-wins  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: recent-wins  Partition: 2    Leader: 0   Replicas: 0 Isr: 0

我的消费者:

package com.cie.dispatcher.services;

import com.cie.dispatcher.model.WinNotification;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * This will create three threads, assign them to a "group" and listen for  notifications on a topic.
 * Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
 * the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
 * lifecycle manager in dropwizard.
 * <p/>
 * Created by aakture on 6/15/15.
 */
public class KafkaTopicListener implements Managed {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private int threadCount;
private WinNotificationWorkflow winNotificationWorkflow;
private ObjectMapper objectMapper;

@Inject
public KafkaTopicListener(String a_zookeeper,
                          String a_groupId, String a_topic,
                          int threadCount,
                          WinNotificationWorkflow winNotificationWorkflow,
                          ObjectMapper objectMapper) {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig(a_zookeeper, a_groupId));
    this.topic = a_topic;
    this.threadCount = threadCount;
    this.winNotificationWorkflow = winNotificationWorkflow;
    this.objectMapper = objectMapper;
}

/**
 * Creates the config for a connection
 *
 * @param zookeeper the host:port for zookeeper, "localhost:2181" for example.
 * @param groupId   the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads.
 * @return the config props
 */
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
    Properties props = new Properties();
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);
}

public void stop() {
    if (consumer != null) consumer.shutdown();
    if (executor != null) executor.shutdown();
    try {
        if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
            LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
        }
    } catch (InterruptedException e) {
        LOG.info("Interrupted during shutdown, exiting uncleanly");
    }
    LOG.info("{} shutdown successfully", this.getClass().getName());
}
/**
 * Starts the listener
 */
public void start() {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, new Integer(threadCount));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    executor = Executors.newFixedThreadPool(threadCount);
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ListenerThread(stream, threadNumber));
        threadNumber++;
    }
}

private class ListenerThread implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        try {
            String message = null;
            LOG.info("started listener thread: {}", m_threadNumber);
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext()) {
                try {
                    message = new String(it.next().message());
                    LOG.info("receive message by " + m_threadNumber + " : " + message);
                    WinNotification winNotification = objectMapper.readValue(message, WinNotification.class);
                    winNotificationWorkflow.process(winNotification);
                } catch (Exception ex) {
                    LOG.error("error processing queue for message: " + message, ex);
                }
            }
            LOG.info("Shutting down listener thread: " + m_threadNumber);
        } catch (Exception ex) {
            LOG.error("error:", ex);
        }
    }
  }
}

相关问题