我正在通过kafka包中已经提供的shell脚本测试kafka的性能。我创建了一个包含10个分区和泵送数据的主题,如下所示:
./bin/kafka-producer-perf-test.sh --topic test-topic --num-records 9000000 --record-size 300 --throughput 250000 --producer-props bootstrap.servers=110.17.14.302:9092 acks=1 max.in.flight.requests.per.connection=1 batch.size=5000
现在,我想使用我从多个消费者而不仅仅是单个消费者那里抽取的数据,如上图所示。所以我开始使用 kafka-consumer-perf-test.sh
. 我就是这么做的:
./bin/kafka-consumer-perf-test.sh --zookeeper localhost:2181 --topic test-topic --group test1
有没有什么方法可以让我们通过命令行在一个使用者组中运行多个kafka使用者,并且每个使用者使用 kafka-consumer-perf-test.sh
? 我在和Kafka合作 0.10.1.0
我看到了这篇文章,但它没有说明在哪里配置我们要运行多少消费者,以及他们将在哪个分区上工作?
更新:
这就是我看到的错误:
./bin/kafka-consumer-perf-test.sh --zookeeper 110.27.14.10:2181 --messages 50 --topic test-topic --threads 1
[2017-01-11 22:34:09,785] WARN [ConsumerFetcherThread-perf-consumer-14195_kafka-cluster-3098529006-zeidk-1484174043509-46a51434-2-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@54fb48b6 (kafka.consumer.ConsumerFetcherThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:109)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
1条答案
按热度按时间4xrmg8kj1#
只需运行相同的命令(即。,
./bin/kafka-consumer-perf-test.sh
)在不同的控制台中多次。关于分区分配:Kafka会自动为你这样做。如果你使用消费群体。
如果要手动分配分区,则不能使用使用者组。为此,不能使用kafka-consumer-perf-test.sh,但需要自己编写。
阅读javadoc:https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html