无法使用Kafka主题的信息?

ttcibm8c  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(323)

我看了几个关于confluent.kafka客户机的例子(https://github.com/confluentinc/confluent-kafka-dotnet/),虽然我可以成功地让制作人把一条消息推到Kafka,但我无法把任何消息拉回到消费者那里。
通过ui,我可以看到主题被创建,消息进入这个主题(当前有10个分区和3条消息),但是我的使用者总是报告“分区结束”,没有任何消息的消耗(3条仍然在主题上,“onmessage”从不触发)。
但是,使用者肯定正在访问主题,并且可以在其中一个分区上看到3条消息:
分区结束:dotnet测试主题[6]@3
它只是不使用消息和触发器onmessage()。有什么想法吗?

var conf = new Dictionary<string, object> 
{ 
    { "group.id", Guid.NewGuid().ToString() },
    { "bootstrap.servers", "mykafkacluster:9094" },
    { "sasl.mechanisms", "SCRAM-SHA-256" },
    { "security.protocol", "SASL_SSL" },
    { "sasl.username", "myuser" },
    { "sasl.password", "mypass" }
};

using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
    producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
            .ContinueWith(result => 
            {
                var msg = result.Result;
                if (msg.Error.Code != ErrorCode.NoError)
                {
                    Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
                }
                else 
                {
                    Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
                }
            });

    producer.Flush(TimeSpan.FromSeconds(10));
}

using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{ 
    consumer.Subscribe("dotnet-test-topic");

    consumer.OnConsumeError += (_, err)
        => Console.WriteLine($"consume error: {err.Error.Reason}");

    consumer.OnMessage += (_, msg)
        => Console.WriteLine($"consumed: {msg.Value}");

    consumer.OnPartitionEOF += (_, tpo)
        => Console.WriteLine($"end of partition: {tpo}");

    while (true)
    {
        consumer.Poll(TimeSpan.FromMilliseconds(100));
    }  
}
qgzx9mmu

qgzx9mmu1#

如果没有提供以下配置,onmessage事件似乎不会触发:

{ "auto.offset.reset", "smallest" }

加上这个,我就可以阅读关于这个主题的信息了。

相关问题