在不同的应用程序类型中使用相同的代码时会产生不同的结果

kd3sttzy  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(263)

我目前正在使用一个生产者-消费者示例,使用kubernetes集群运行kafka zookeeper设置。我的制作人创建为restapi,可以通过post请求将字符串发布到kafka中的队列中,
我的消费者是consoleapp,在一段时间内,trueloop一直在侦听某个主题中的新消息。
我的生产者似乎可以轻松地将消息推送到队列中,但我的消费者似乎可以消费消息?只有将is作为get请求,才能从队列中检索消息?
我收到一条错误消息,说明由于某种原因无法连接?
我不确定我是否了解这种类型的应用程序在这里要做什么?为什么它在一种情况下有效而在另一种情况下无效?
使用者控制台应用程序输出:

Hello World!
Consumer created!
Assigned to a Topic!
Ready to Consume!
Listen!
%3|1578587823.362|FAIL|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587826.949|ERROR|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587824.343|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)
%3|1578587826.956|ERROR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)

用户控制台应用程序代码:

Console.WriteLine("Hello World!");
var config = new ConsumerConfig
{
    BootstrapServers = "192.168.68.68:31000",
    // the group.id property must be specified when creating a consumer, even 
    // if you do not intend to use any consumer group functionality.
    GroupId = "ThisisaGroup",
    // partition offsets can be committed to a group even by consumers not
    // subscribed to the group. in this example, auto commit is disabled
    // to prevent this from occurring.
    EnableAutoCommit = true
};

using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
    Console.WriteLine("Consumer created!");
    string topicId = "test";
    c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
    Console.WriteLine("Assigned to a Topic!");
    CancellationTokenSource cts = new CancellationTokenSource();

    try
    {
        Console.WriteLine($"Ready to Consume!");
        while (!cts.IsCancellationRequested)
        {
            Console.WriteLine($"Listen!");
            try
            {
                var cr = c.Consume(cts.Token);

                if (cr.IsPartitionEOF)
                {
                    Console.WriteLine(
                        $"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.");

                    continue;
                }

                Console.WriteLine($"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}");
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Error occured: {e}");
            }
        }
    }
    catch (OperationCanceledException e)
    {
        // Ensure the consumer leaves the group cleanly and final offsets are committed.
        Console.WriteLine($"Error occured: {e.Data}");
        c.Close();
    }
}

}
制片人职位要求:

public string PostAsync([FromBody] JObject value)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.68.68:31000",
                ClientId = Dns.GetHostName()
            };

            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                try
                {
                    producer.Produce("test", new Message<string, string> { Key = "1", Value = value.ToString() });
                    producer.Flush();
                    return "ok!" + producer.Name;
                }
                catch (ProduceException<Null, string> e)
                {
                    return e.ToString();
                }
            }
         }

获取请求:

public string Get()
        {
            Console.WriteLine("Hello World!");
            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.68.68:31000",
                // the group.id property must be specified when creating a consumer, even 
                // if you do not intend to use any consumer group functionality.
                GroupId = "ThisisaGroup",
                // partition offsets can be committed to a group even by consumers not
                // subscribed to the group. in this example, auto commit is disabled
                // to prevent this from occurring.
                EnableAutoCommit = true,
                Debug = "broker"
            };

            using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                Console.WriteLine("Consumer created!");
                string topicId = "test";
                c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
                Console.WriteLine("Assigned to a Topic!");
                CancellationTokenSource cts = new CancellationTokenSource();

                try
                {
                    Console.WriteLine($"Ready to Consume!");
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine($"Listen!");
                        try
                        {
                            var cr = c.Consume(cts.Token);

                            if (cr.IsPartitionEOF)
                            {
                                return 
                                    $"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.";

                                continue;
                            }

                            return $"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}";
                        }
                        catch (ConsumeException e)
                        {
                            return $"Error occured: {e}";
                        }
                    }
                }
                catch (OperationCanceledException e)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    Console.WriteLine($"Error occured: {e.Data}");
                    c.Close();
                }
                return "Some";
            }

        }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题