如何处理从confluent kafka消费者处收到的结果?

z4bn682m  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(281)

嗨,我是在合流Kafka工作。我有一个消费者返回consumeresult。下面是我对消费者的实现。

public ConsumeResult<string, GenericRecord> Consume(string topic)
    {
      consumer.Subscribe(topic);
      ConsumeResult<string, GenericRecord> result;
      try
      {
        result = consumer.Consume();
        return result;
      }
      catch (Exception e)
      {
        this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
        return null;
      }
    }

我称这种方法为

public ConsumeResult<string, GenericRecord> SubscribeAsync()
    {
      return this.consumerClient.Consume(productEventTopicName);
    }

最后,我从控制器打电话给
var productevents=this.eventdispatcher.subscribeasync();
现在我要处理这个结果。我知道我可以得到一个领域作为

response.Message.Value.TryGetValue("Version", out object result);

我想一次得到所有的领域。我的困惑是,我是否需要根据从消费者那里收到的数据去选择所需的模型,或者我可以使用的现有模型?如果我使用现有的模型,那么如果明天新的领域出现在消费者的结果,那么我们的模型将如何处理它?有人能帮我理解这个吗?任何帮助都将不胜感激。谢谢。

o7jaxewo

o7jaxewo1#

使用genericrecord时,反序列化程序将在生产者发送新的模式信息时与模式注册表联系,以提取新的模式信息。
但是,为了解析新字段,必须停止使用者并显式添加代码来相应地处理这些属性。这个问题对Kafka来说不是很具体

相关问题