如何通过命令行运行多个kafka消费者?

inb24sb2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(374)

我正在通过kafka包中已经提供的shell脚本测试kafka的性能。我创建了一个包含10个分区和泵送数据的主题,如下所示:

./bin/kafka-producer-perf-test.sh --topic test-topic --num-records 9000000 --record-size 300 --throughput 250000 --producer-props bootstrap.servers=110.17.14.302:9092 acks=1 max.in.flight.requests.per.connection=1 batch.size=5000

现在,我想使用我从多个消费者而不仅仅是单个消费者那里抽取的数据,如上图所示。所以我开始使用 kafka-consumer-perf-test.sh . 我就是这么做的:

./bin/kafka-consumer-perf-test.sh --zookeeper localhost:2181 --topic test-topic --group test1

有没有什么方法可以让我们通过命令行在一个使用者组中运行多个kafka使用者,并且每个使用者使用 kafka-consumer-perf-test.sh ? 我在和Kafka合作 0.10.1.0 我看到了这篇文章,但它没有说明在哪里配置我们要运行多少消费者,以及他们将在哪个分区上工作?
更新:
这就是我看到的错误:

./bin/kafka-consumer-perf-test.sh --zookeeper 110.27.14.10:2181 --messages 50 --topic test-topic --threads 1

[2017-01-11 22:34:09,785] WARN [ConsumerFetcherThread-perf-consumer-14195_kafka-cluster-3098529006-zeidk-1484174043509-46a51434-2-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@54fb48b6 (kafka.consumer.ConsumerFetcherThread)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
        at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
        at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:109)
        at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
4xrmg8kj

4xrmg8kj1#

只需运行相同的命令(即。, ./bin/kafka-consumer-perf-test.sh )在不同的控制台中多次。
关于分区分配:Kafka会自动为你这样做。如果你使用消费群体。
如果要手动分配分区,则不能使用使用者组。为此,不能使用kafka-consumer-perf-test.sh,但需要自己编写。
阅读javadoc:https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html

相关问题