apache-kafka-kafkastream主题/分区

rdlzhqv9  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(365)

我写Kafka消费高容量高速分布式应用程序。我只有一个主题,但传入的消息率非常高。有多个分区服务于更多的使用者将适合这个用例。最好的消费方式是拥有多个流读取器。根据文档或可用的示例,consumerconnector发出的kafkastreams的数量取决于主题的数量。想知道如何获得多个kafkastream读取器[基于分区],以便我可以在每个流中跨一个线程,或者在多个线程中读取同一个kafkastream将执行多个分区的并发读取?
任何见解都是非常感谢的。

1mrurvl1

1mrurvl11#

想和大家分享一下我在邮件列表中的发现:
在主题Map中传递的数字控制一个主题被划分为多少个流。在您的例子中,如果传入1,那么所有10个分区的数据都将被馈送到1个流中。如果传入2,则2个流中的每个流将从5个分区获取数据。如果传入11,其中10个将分别从1个分区获取数据,1个流将一无所获。
通常,您需要在自己的线程中迭代每个流。这是因为如果没有新事件,每个流都可能永远阻塞。
示例代码段:

topicCount.put(msgTopic, new Integer(partitionCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic);

for (final KafkaStream stream : streams) {
    ReadTask task = new ReadTask(stream, msgTopic);
    task.addObserver(this.msgObserver);
    tasks.add(task); executor.submit(task);
}

参考文献:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3cca+shyy_z903domnjp7_yyr_ae2srw-x7xpanqkmwap66goqf6w@mail.gmail.com%3e

t40tm48m

t40tm48m2#

/**
 * @param source : source kStream to sink output-topic
 */
private static void pipe(KStream<String, String> source) {
    source.to(Serdes.String(), Serdes.String(), new StreamPartitioner<String, String>() {

        @Override
        public Integer partition(String arg0, String arg1, int arg2) {
            return 0;
        }
    }, "output-topic");
}

上面的代码将在主题名为“output topic”的分区1处写入记录

dba5bblo

dba5bblo3#

建议的方法是拥有一个线程池,这样java就可以为您处理组织,createmessagestreamsbyfilter方法为每个流提供了一个可运行的线程池。例如:

int NUMBER_OF_PARTITIONS = 6;
Properties consumerConfig = new Properties();
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181" );
consumerConfig.put("backoff.increment.ms", "100");
consumerConfig.put("autooffset.reset", "largest");
consumerConfig.put("groupid", "java-consumer-example");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));

TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic");
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS);

ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for(final KafkaStream<Message> stream: streams){
    executor.submit(new Runnable() {
        public void run() {
            for (MessageAndMetadata<Message> msgAndMetadata: stream) {
                ByteBuffer buffer = msgAndMetadata.message().payload();
                byte [] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                //Do something with the bytes you just got off Kafka.
            }
        }
    });
}

在这个例子中,我要求6个线程,因为我知道每个主题有3个分区,我在白名单中列出了两个主题。一旦我们有了传入流的句柄,我们就可以迭代它们的内容,即messageandmetadata对象。元数据实际上只是主题名和偏移量。正如您所发现的,如果您请求1个流而不是我的示例6中的流,那么您可以在单个线程中执行,但是如果您需要并行处理,那么最好的方法是启动一个执行器,每个返回的流有一个线程。

相关问题