我目前正在使用一个生产者-消费者示例,使用kubernetes集群运行kafka zookeeper设置。我的制作人创建为restapi,可以通过post请求将字符串发布到kafka中的队列中,
我的消费者是consoleapp,在一段时间内,trueloop一直在侦听某个主题中的新消息。
我的生产者似乎可以轻松地将消息推送到队列中,但我的消费者似乎可以消费消息?只有将is作为get请求,才能从队列中检索消息?
我收到一条错误消息,说明由于某种原因无法连接?
我不确定我是否了解这种类型的应用程序在这里要做什么?为什么它在一种情况下有效而在另一种情况下无效?
使用者控制台应用程序输出:
Hello World!
Consumer created!
Assigned to a Topic!
Ready to Consume!
Listen!
%3|1578587823.362|FAIL|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587826.949|ERROR|rdkafka#consumer-1| [thrd:10.1.1.139:9092/0]: 10.1.1.139:9092/0: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21014ms in state CONNECT)
%3|1578587824.343|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)
%3|1578587826.956|ERROR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.1.1.139:9092 failed: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond... (after 21010ms in state CONNECT)
用户控制台应用程序代码:
Console.WriteLine("Hello World!");
var config = new ConsumerConfig
{
BootstrapServers = "192.168.68.68:31000",
// the group.id property must be specified when creating a consumer, even
// if you do not intend to use any consumer group functionality.
GroupId = "ThisisaGroup",
// partition offsets can be committed to a group even by consumers not
// subscribed to the group. in this example, auto commit is disabled
// to prevent this from occurring.
EnableAutoCommit = true
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
Console.WriteLine("Consumer created!");
string topicId = "test";
c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
Console.WriteLine("Assigned to a Topic!");
CancellationTokenSource cts = new CancellationTokenSource();
try
{
Console.WriteLine($"Ready to Consume!");
while (!cts.IsCancellationRequested)
{
Console.WriteLine($"Listen!");
try
{
var cr = c.Consume(cts.Token);
if (cr.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.");
continue;
}
Console.WriteLine($"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e}");
}
}
}
catch (OperationCanceledException e)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
Console.WriteLine($"Error occured: {e.Data}");
c.Close();
}
}
}
制片人职位要求:
public string PostAsync([FromBody] JObject value)
{
var config = new ProducerConfig
{
BootstrapServers = "192.168.68.68:31000",
ClientId = Dns.GetHostName()
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
producer.Produce("test", new Message<string, string> { Key = "1", Value = value.ToString() });
producer.Flush();
return "ok!" + producer.Name;
}
catch (ProduceException<Null, string> e)
{
return e.ToString();
}
}
}
获取请求:
public string Get()
{
Console.WriteLine("Hello World!");
var config = new ConsumerConfig
{
BootstrapServers = "192.168.68.68:31000",
// the group.id property must be specified when creating a consumer, even
// if you do not intend to use any consumer group functionality.
GroupId = "ThisisaGroup",
// partition offsets can be committed to a group even by consumers not
// subscribed to the group. in this example, auto commit is disabled
// to prevent this from occurring.
EnableAutoCommit = true,
Debug = "broker"
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
Console.WriteLine("Consumer created!");
string topicId = "test";
c.Assign(new TopicPartitionOffset(topicId, 0, Offset.Beginning));
Console.WriteLine("Assigned to a Topic!");
CancellationTokenSource cts = new CancellationTokenSource();
try
{
Console.WriteLine($"Ready to Consume!");
while (!cts.IsCancellationRequested)
{
Console.WriteLine($"Listen!");
try
{
var cr = c.Consume(cts.Token);
if (cr.IsPartitionEOF)
{
return
$"Reached end of topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}.";
continue;
}
return $"Consumed message: { cr.Message.Value} its position {cr.TopicPartitionOffset}";
}
catch (ConsumeException e)
{
return $"Error occured: {e}";
}
}
}
catch (OperationCanceledException e)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
Console.WriteLine($"Error occured: {e.Data}");
c.Close();
}
return "Some";
}
}
暂无答案!
目前还没有任何答案,快来回答吧!