合流的kafka文档说,消费者类的定义如下:
Class Consumer<TKey, TValue>
上面的consumer类实现了高级apachekafka consumer(具有键和值反序列化)。
我知道tkey和tvalue是用来反序列化密钥的,它是从生产者发送进来的。例如,类似
从制作人那里寄来钥匙
var deliveryReport = producer.ProduceAsync(topicName, key, val);
在使用者端接收字符串键
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(topics);
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
while (!cancelled)
{
Message<Ignore, string> msg;
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
因为我们传递的是一个密钥,所以使用者被初始化为
Consumer<Ignore, string>
消息初始化为
Message<Ignore, String>
毕竟,我的问题是,键的反序列化到底意味着什么?为什么我们要这么做?另外,为什么我们需要传入一个键值对忽略字符串来执行反序列化?
1条答案
按热度按时间qij5mzcb1#
为什么我们需要传入一个键值对忽略字符串来执行反序列化?
你不需要通过那些特定的设置。你需要匹配制作人的设置。或者,如果您不确定,您可以为key和value提供byte array对象。
如果生产者没有发送一个密钥,比如null,那么就没有什么可以反序列化的。我想这就是忽略类的作用。注意,您没有提供键反序列化器类,但是为值提供了
所有kafka消息都只以字节形式包含键、值对。生产者使用序列化程序,作为使用者,您需要反序列化。理想情况下,您可以将消息反序列化为实际对象,例如字符串、json对象或avro、protobuf等等。
默认情况下,键决定您将从哪个主题分区中使用消息。空键将平均分布在整个主题中。否则,producer应用程序可以定义自己的分区器,并在逻辑决定的任何位置发送数据