我在kafka中创建了一个带有9个分区的主题,将其恰当地命名为“test”,并使用 Confluent.Kafka
客户库:生产者和消费者。我只是从文档中修改了一些示例。
我正在运行两个consumer应用程序示例和一个producer示例。我不认为在这里粘贴消费代码有什么意义,它只是一个很小的“获取消息,在屏幕上打印”应用程序,但是,它也会打印消息来自的分区的编号。
这是producer应用程序:
static async Task Main(string[] args)
{
var random = new Random();
var config = new ProducerConfig {
BootstrapServers = "10.0.0.5:9092",
Partitioner = Partitioner.ConsistentRandom
};
int counter = 0;
while (true)
{
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
p.BeginProduce(
"test",
new Message<string, string>
{
//Key = random.Next().ToString(),
Value = $"test {++counter}"
});
if (counter % 10 == 0)
p.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
问题:如果 Key
如果没有设置消息的属性,则所有消息都会被发送到7号分区,这意味着我的一个使用者示例处于空闲状态。我不得不手动随机分配密钥,以便在分区之间分配它们(参见注解掉的行)(从文档中复制的原始代码 Null
作为密钥的类型,这也会将所有消息发送到第7个分区。)
为什么?根据 ProducerConfig.Partitioner
财产,财产 consistent_random
如果未指定密钥,则选项应确保随机分布。我试着用 Partioner.Random
选项,该选项应使用随机分布,而不考虑键,但这没有帮助。
这是预期的行为,是我做错了什么,还是我遇到了一个错误?
我使用的是confluent.kafka nuget的1.0.0-rc2版本。
分区器配置的完整文档:
// Summary:
// Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key
// (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32
// hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java
// Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
// `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are
// randomly partitioned. This is functionally equivalent to the default partitioner
// in the Java Producer.). default: consistent_random importance: high
暂无答案!
目前还没有任何答案,快来回答吧!