为了使用kafka发布消息,我使用类名作为主题:
kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));
消费者订阅他们感兴趣的课程:
for(Object sub:_subscriptions)
topics.add(sub.getClass().getName());
_kafkaConsumer.subscribe(topics);
问题是,只有一个消费者曾经收到订阅的消息。我的理解是kafka将为每个订户分配一个唯一的分区(如果可用)。我目前只有2个订阅服务器,我的kafka server.properties指定了4个分区。似乎所有使用者都在从同一分区读取数据。也许Kafka是一个糟糕的选择服务巴士由于这种明显的限制。任何帮助都将不胜感激!
Kafka消费地产:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
properties.put("group.id", "TestGroup");
properties.put("auto.offset.reset","earliest");
Kafka制片公司:
properties.put("bootstrap.servers",_settings.getEndpoint());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
服务器属性(我唯一更改的是默认属性):
num.partitions=4
注意:我还尝试了以下用户设置:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.commit.interval.ms","1000");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "testGroup");
properties.put("auto.offset.reset","latest");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
2条答案
按热度按时间pes8fvy91#
Kafka默认使用rangeassignor作为分区分配策略,具有以下特点:
范围赋值器以每个主题为基础工作。对于每个主题,我们按数字顺序排列可用分区,按字典顺序排列使用者。然后,我们将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它不能平均分配,那么前几个消费者将有一个额外的分区。例如,假设有两个使用者c0和c1,两个主题t0和t1,每个主题有3个分区,从而得到分区t0p0、t0p1、t0p2、t1p0、t1p1和t1p2。赋值为:c0:[t0p0,t0p1,t1p0,t1p1]c1:[t0p2,t1p2]
如果您想为少量分区提供更均匀的分布,可以通过设置
partition.assignment.strategy
s8vozzvw2#
如果您的所有消费者都有相同的消费群体(
group.id
属性),则组中只有一个使用者将接收消息。如果您希望所有的消费者都能收到消息,那么他们需要有不同的group.id
.要检查哪些使用者绑定到主题的分区,可以使用以下命令