使用assign()时,kafka.net客户端不接收任何消息

xvw2m8pv  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(401)

我试图使我的服务重新阅读Kafka主题从头到尾就开始初始化内部数据结构。我正在使用confluent.net客户端。根据我的理解,以下代码应该订阅我的主题设置偏移到开头:

consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));

但由于某些原因,我没有收到主题中预先存在的消息,也没有收到新消息。我对assign()方法的理解有误吗?有没有一种方法可以使用subscribe()实现所需的结果,而不需要使用kafka cli硬重置偏移量?
这是一个完整的测试客户机,我的输出总是“没有消息…”,尽管topic有消息并且新消息正在到达。

static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "test-consumer",
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };
        var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
        var topic = "test-topic";
        consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
        while (true)
        {
            var result = consumer.Consume(TimeSpan.FromSeconds(5));
            if (result == null)
                Console.WriteLine("No messages...");
            else
                Console.WriteLine($"Offset: {result.Offset}");
        }
    }
wbgh16ku

wbgh16ku1#

问题是我用partition.any调用了assign(),以下代码可以工作: consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));

xmjla07d

xmjla07d2#

为什么要使用 Assign ? 以下几点应该对您有用:

public static void Main(string[] args)
{
    var conf = new ConsumerConfig
    { 
        GroupId = "test-consumer",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
    {
        c.Subscribe("test-topic");

        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) => {
            e.Cancel = true; // prevent the process from terminating.
            cts.Cancel();
        };

        try
        {
            while (true)
            {
                try
                {
                    var cr = c.Consume(cts.Token);
                    Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            c.Close();
        }
    }
}

相关问题