后台线程和任务

ozxc1zmp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(439)

我正在试图找到从专用后台线程运行任务的最佳方法。
使用上下文是从kafka主题消费,并引发一个异步事件处理程序来处理consumeresult<tkey,tvalue>示例。
Kafka消费者 consumer (下面的示例)阻塞线程,直到消息被使用或它传递的cancellationtoken被取消。

consumeThread = new Thread(Consume)
{
    Name = "Kafka Consumer Thread",
    IsBackground = true,
};

这是我提出的consume方法的实现,它由上面的专用线程启动:

private void Consume(object _)
{
    try
    {
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            var consumeResult = consumer.Consume(cancellationTokenSource.Token);

            var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
                consumer, consumeResult, cancellationTokenSource.Token);

            _ = Task.Run(async () =>
            {
                if (onConsumeResultReceived is null) continue;

                var handlerInstances = onConsumeResultReceived.GetInvocationList();
                foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
                {
                    if (cancellationTokenSource.IsCancellationRequested) return;                        
                    await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);                            
                }

            }, cancellationTokenSource.Token);
        }
    }
    catch (OperationCanceledException)
    {

    }
    catch (ThreadInterruptedException)
    {

    }
    catch (ThreadAbortException)
    {
        // Aborting a thread is not implemented in .NET Core.
    }
}

我不确定这是推荐的从专用线程运行任务的方法,因此任何建议都将非常感谢。

zfciruhq

zfciruhq1#

我不明白你为什么需要一个专门的线程。当前的代码启动一个线程,然后该线程阻塞以供使用,然后引发线程池线程上的事件处理程序。
这个 _ = Task.Run 这个习惯用法是一个“fire and forget”,它很危险,因为它会默默地吞下事件引发代码或事件处理程序中的任何异常。
我建议更换 ThreadTask.Run ,并直接提升事件处理程序:

consumeTask = Task.Run(ConsumeAsync);

private async Task ConsumeAsync()
{
  while (true)
  {
    var consumeResult = consumer.Consume(cancellationTokenSource.Token);
    var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
        consumer, consumeResult, cancellationTokenSource.Token);

    if (onConsumeResultReceived is null) continue;

    var handlerInstances = onConsumeResultReceived.GetInvocationList();
    foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
    {
      if (cancellationTokenSource.IsCancellationRequested) return;
      await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);
    }
  }
}

相关问题