嗨,我是在合流Kafka工作。我有一个消费者返回consumeresult。下面是我对消费者的实现。
public ConsumeResult<string, GenericRecord> Consume(string topic)
{
consumer.Subscribe(topic);
ConsumeResult<string, GenericRecord> result;
try
{
result = consumer.Consume();
return result;
}
catch (Exception e)
{
this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
return null;
}
}
我称这种方法为
public ConsumeResult<string, GenericRecord> SubscribeAsync()
{
return this.consumerClient.Consume(productEventTopicName);
}
最后,我从控制器打电话给
var productevents=this.eventdispatcher.subscribeasync();
现在我要处理这个结果。我知道我可以得到一个领域作为
response.Message.Value.TryGetValue("Version", out object result);
我想一次得到所有的领域。我的困惑是,我是否需要根据从消费者那里收到的数据去选择所需的模型,或者我可以使用的现有模型?如果我使用现有的模型,那么如果明天新的领域出现在消费者的结果,那么我们的模型将如何处理它?有人能帮我理解这个吗?任何帮助都将不胜感激。谢谢。
1条答案
按热度按时间o7jaxewo1#
使用genericrecord时,反序列化程序将在生产者发送新的模式信息时与模式注册表联系,以提取新的模式信息。
但是,为了解析新字段,必须停止使用者并显式添加代码来相应地处理这些属性。这个问题对Kafka来说不是很具体