rabbitmq 无法等待任务完成- ASP.NET Web API微服务

eufgjt7s  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(178)

我有一个有趣的问题,而且我是异步编程的新手,所以任何批评都是可以接受的。我有两个API在互相通信。我尝试创建的主要思想是当用户试图完成他的购物车时-购物车API向目录API询问可用数量。由于它是一个消息(RabbitMQ)我想等待这个消息被接收回我的购物车API,处理,然后返回到购物车API中的方法的控制流,以便继续执行支付。我有3个涉及的类:CheckoutShoppingCart -执行付款,向Catalogue API询问可用数量。SessionState -保存产品/登录用户的临时数据(Singleton,注入到2个类中)。EventProcessor -处理传入的RabbitMQ消息。
根据RabbitMQ的官方文档,他们使用一种叫做correlationId的东西。但是我决定创建一个变通方法,我在某种程度上成功了,但是代码由于一个非常意想不到的原因失败了。
下面的第一个方法是通过RabbitMQ向Catalogue API发送消息的方法。

public class CheckoutShoppingCart 
{
   await PublishMessageAvailableQuantities(loggedUserCart.Products);
   await ProductQuantityEvaluation();
}

然后第二个是失败的,这是它的身体:

private async Task ProductQuantityEvaluation()
{
 Task.WhenAny(sessionState.CompleteTask());
}

现在,WhenAny - sessionState.CompleteTask()中的任务是一个伪方法,它是我在消息处理器类中创建的一个标志,用于处理从catalogue API接收到的信息消息。当消息被成功接收时,处理器会执行这个伪方法。这就是为什么我跟踪它的状态--看看它是否被执行。
因此,当有人点击执行购物车的付款时,我希望到达Task。WhenAny,它将等待消息处理器类完成此任务,这意味着接收到有关可用数量的消息。当接收到该消息时,控制流将在执行购物车付款的类中继续。
所有任务都正常。等等(10000)。当这个任务等待时,其他线程处理消息,当它完成时,控制流返回到这里。但是当我使用WhenAny时,当我调试时-由于某种原因,WhenAny(sessionState.completeTask())调用方法本身,并且当我调试sessionState.CompleteTask()方法时,它实际上在事件处理器调用之前被调用,但是从代码的Angular 来看,它只从消息处理器调用。
事件处理器的一种情况:

public class EventProcessor 
{
case AppConstants.eventTypeSendActualProductQuantities:
                    var disapprovedProductInfo = JsonSerializer.Deserialize<PublishedProductModel>(message);
                    sessionState.publishedProductModel = disapprovedProductInfo;
                    sessionState.productQuantityStatus = 2;
                  **await sessionState.CompleteTask();**
                    break;
}

这是一个伪方法,仅用作标志。

public class SessionState 
{
public async Task CompleteTask()
        {
            var tcs = new TaskCompletionSource<bool>();
            tcs.SetResult(true);

        }
}

下面是我检查上述方法是否完成的方法:

private async Task ProductQuantityEvaluation()
{
 Task.WhenAny(sessionState.CompleteTask());
}

因此,代码在Wait(miliseconds)上运行得很好,但是当我想等待另一个方法完成时,就使用WhenAny();它不起作用。我错过了什么?

dvtswwa3

dvtswwa31#

目前,您的CompleteTask()方法只是创建一个任务并完成它:

public async Task CompleteTask()
{
  var tcs = new TaskCompletionSource<bool>();
  tcs.SetResult(true);            
}

所以,实际上,它总是返回一个已经完成的任务。
而且,这段代码并不等待CompleteTask被 * 调用 *;它正在 * 调用 * 它,然后等待它返回的任务:

// Removing the unnecessary Task.WhenAny and adding the missing await:
private async Task ProductQuantityEvaluation()
{
  await sessionState.CompleteTask();
}

// Is the same as this:
private async Task ProductQuantityEvaluation()
{
  var task = sessionState.CompleteTask();
  await task;
}

您实际上要寻找的是一种“异步信号”,在大多数情况下是TaskCompletionSource<T>。您希望在此之前创建它,让控制器操作await TCS任务,然后稍后从通知方法触发它。

public class SessionState 
{
  private readonly TaskCompletionSource<bool> _tcs = new();
  public Task CompleteTask => _tcs.Task;
  public void Complete() => _tcs.TrySetResult(true);
}

等待它这样:

private async Task ProductQuantityEvaluation()
{
  await sessionState.CompleteTask;
}

并像这样触发它:

public class EventProcessor 
{
case AppConstants.eventTypeSendActualProductQuantities:
  var disapprovedProductInfo = JsonSerializer.Deserialize<PublishedProductModel>(message);
  sessionState.publishedProductModel = disapprovedProductInfo;
  sessionState.productQuantityStatus = 2;
  sessionState.Complete();
  break;
}

这样就能让它基本上工作了。
现在,这段代码仍然存在一些问题,您可能需要在下一步修复这些问题。
1.“设置结果,然后发送完成信号”有点奇怪。为什么结果本身不是完成信号的一部分?你有一个TaskCompletionSource<T>,我认为把publishedProductModelproductQuantityStatus作为T的一部分是有意义的。
1.你可能需要不止一个信号。这种模式的一个更正常的用法是使用Dictionary<SomeKey, TaskCompletionSource<T>>。除非你已经在代码的其他地方使用了Dictionary<SomeKey, SessionState>,并且每个SessionState代表一个请求。在这种情况下,每个SessionState使用一个TaskCompletionSource就可以了。

相关问题