.net核心kafka produceasync未完成

d5vmydt9  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(482)

我想做一个概念证明,简单的Kafka生产者和消费者。我使用的平台是.net core 2.0,由visual studio 2017构建。我使用的nuget包是confluent.kafka
我通过研究发现了一些最成功的代码,但现在在实现.produceasync()方法时似乎遇到了问题。不会出现错误代码,程序似乎会继续运行,但不会执行该方法。
这是我的制作人:

class Producer
    {
        static void Main(string[] args)
        {
            Console.WriteLine("PRODUCER!!!");

            // The Kafka endpoint address
            string kafkaEndpoint = "127.0.0.1:9092";

            // The Kafka topic we'll be using
            string kafkaTopic = "testtopic";

            // Create the producer configuration
            var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };

            // Create the producer
            using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
            {
                Console.WriteLine("Producer Created!");
                // Send 10 messages to the topic
                for (int i = 0; i < 10; i++)
                {
                    var message = $"Event {i}";
                    Console.WriteLine($"Begin Event {i}");
                    var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
                    Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
                    Console.WriteLine("Press any key to continue...");
                    Console.ReadLine();
                }
            }
        }
    }

这是我的消费者:

class Consumer
    {
        static void Main(string[] args)
        {
            Console.WriteLine("CONSUMER!!!");

            // The Kafka endpoint address
            string kafkaEndpoint = "127.0.0.1:9092";

            // The Kafka topic we'll be using
            string kafkaTopic = "testtopic";

            // Create the consumer configuration
            var consumerConfig = new Dictionary<string, object>
            {
                { "group.id", "myconsumer" },
                { "bootstrap.servers", kafkaEndpoint },
            };

            // Create the consumer
            using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
            {
                // Subscribe to the OnMessage event
                consumer.OnMessage += (obj, msg) =>
                {
                    Console.WriteLine($"Received: {msg.Value}");
                };

                // Subscribe to the Kafka topic
                consumer.Subscribe(new List<string>() { kafkaTopic });

                // Handle Cancel Keypress 
                var cancelled = false;
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    cancelled = true;
                };

                Console.WriteLine("Ctrl-C to exit.");

                // Poll for messages
                while (!cancelled)
                {
                    consumer.Poll(3000);
                    //consumer.Poll();
                }
            }
        }
    }

在我运行代码时,使用者处于空闲状态,因为似乎没有生成任何消息,但下面是我对生产者的控制台输出:

PRODUCER!!!
Producer Created!
Begin Event 0

(就这样。控制台此时也处于空闲状态。)
问题是:如何解决这个问题,使消息实际上是生成的,并使消费者能够接收它们?

gwo2fgha

gwo2fgha1#

给出: producer.Flush(TimeSpan.FromSeconds(10); 试试看。这里也有很多很好的例子:https://github.com/confluentinc/confluent-kafka-dotnet

jchrr9hc

jchrr9hc2#

您可以通过将producer中的行更改为:

var result = producer.ProduceAsync(kafkaTopic, null, message).Result;

检查结果内容,找出哪里出了问题。

相关问题