如何处理Kafka消费者中的多个记录?

ykejflvf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(603)

嗨,我在合流Kafka消费者工作。我的经纪人有多个记录。我现在要处理所有的记录。下面是我对消费者的实现。

public ConsumeResult<string, GenericRecord> Consume(string topic)
    {
      ConsumeResult<string, GenericRecord> result;
      try
      {
        result = consumer.Consume();
        Commit(result);
        return result;
      }
      catch (Exception e)
      {
        this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
        return null;
      }
    }


如果代理中有多个记录,那么我将使用genericrecord获得一个事件/消息。如果有多个记录,那么如何有效地处理消费者?任何帮助都将不胜感激。谢谢

w6mmgewl

w6mmgewl1#

你只要循环一下就行了。参见示例
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/avrogeneric/program.cs

while (true)
{
    try
    {
        var consumeResult = consumer.Consume(cts.Token);

        Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Consume error: {e.Error.Reason}");
    }
}

相关问题