**结束。**此问题需要详细的调试信息。它目前不接受答案。
**想改进这个问题吗?**更新问题,使其成为堆栈溢出的主题。
上个月关门了。
改进这个问题
我需要一个服务,不断轮询来自Kafka主题的消息,将消息交给处理任务,将此任务结束时的结果推送到另一个Kafka主题。我使用.net core 3.1中的backgroundservice作为基本基础结构,同时编写了一个consumerproducer类,该类封装了kafka的内容,具有以下公共方法:
public async Task ReceiveAndReply(ReplyCallback replyCallback)
{
try
{
var msg = consumer.Pull();
var reply = await replyCallback(msg);
producer.PushAsync((message)reply);
}
catch (Exception e)
{
throw new Exception(e.InnerException.Message);
};
}
backgroundservice派生类如下所示:
public class Worker : BackgroundService{
// initialization stuff....
protected ReplyCallback processDelegate = new ReplyCallback(ProcessQuery);
protected static Task<message> ProcessQuery(message source)
{
return new Task<message>>(() =>
{
try
{
var messagePayload= ProcessPulledMessage(source);
var replyMessage = new Message(messagePayload);
return replyMessage;
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
return null;
}
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await connector.ReceiveAndReply(processDelegate);
}
catch (Exception ex)
{
throw new Exception(ex.InnerException.Message);
}
}
}
}
代码被简化了,但它描述了这种情况。在运行时,我可以使用来自主题的消息,但是当在processquery中等待任务时,执行退出,应用程序继续运行,但是等待的任务永远不会执行。也许我做错了什么,我不擅长复杂的异步工作,所以任何提示都是非常受欢迎的。谢谢您
1条答案
按热度按时间e37o9pze1#
您正在返回一个未启动的任务。
文件
Task
建造商声明:示例化任务对象并启动任务的最常用方法是调用static task.run(action)或taskfactory.startnew(action)方法,而不是调用此构造函数。