为什么JobConsumer未被命中/运行?

dnph8jn4  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(142)

我正在尝试新的MassTransitIJobConsumer实现,尽管我已经尝试按照文档进行操作,但我编写的JobConsumer从未运行/命中。
我有:

  • 我创建了JobConsumer,它有一个run方法,可以运行我需要它运行的代码
public class CalculationStartRunJobConsumer : IJobConsumer<ICalculationStartRun> 
  {
      private readonly ICalculationRunQueue runQueue;

      public CalculationStartRunJobConsumer(ICalculationRunQueue runQueue)
      {
          this.runQueue = runQueue;
      }

      public Task Run(JobContext<ICalculationStartRun> context)
      {
          return Task.Run(
              () =>
              {
                  var longRunningJob = new LongRunningJob<ICalculationStartRun>
                  {
                      Job = context.Job,
                      CancellationToken = context.CancellationToken,
                      JobId = context.JobId,
                  };

                  runQueue.StartSpecial(longRunningJob);
              },
              context.CancellationToken);
      } 
  }
  • 我已注册该消费者尝试使用ConnectReceiveEndpointAddConsumer
  • 已按文档中所示配置ServiceInstance
services.AddMassTransit(busRegistrationConfigurator =>
      {
          // TODO: Get rid of this ugly if statement.
          if (consumerTypes != null)
          {
              foreach (var consumerType in consumerTypes)
              {
                  busRegistrationConfigurator.AddConsumer(consumerType);
              }
          }

          if(requestClientType != null)
          {
              busRegistrationConfigurator.AddRequestClient(requestClientType);
          }

          busRegistrationConfigurator.UsingRabbitMq((context, cfg) =>
          {
              cfg.UseNewtonsoftJsonSerializer();
              cfg.UseNewtonsoftJsonDeserializer();
              cfg.ConfigureNewtonsoftJsonSerializer(settings =>
              {
                  // The serializer by default omits fields that are set to their default value, but this causes unintended effects
                  settings.NullValueHandling = NullValueHandling.Include;
                  settings.DefaultValueHandling = DefaultValueHandling.Include;
                  return settings;
              });

              cfg.Host(
                  messagingHostInfo.HostAddress,
                  hostConfigurator =>
                  {
                      hostConfigurator.Username(messagingHostInfo.UserName);
                      hostConfigurator.Password(messagingHostInfo.Password);
                  });

              cfg.ServiceInstance(instance =>
              {
                  instance.ConfigureJobServiceEndpoints(serviceCfg =>
                  {
                      serviceCfg.FinalizeCompleted = true;
                  });

                  instance.ConfigureEndpoints(context);
              });
          });
      });
  • 看到作业队列确实出现在RabbitMQ队列中
  • 当我调用.Send向该队列发送消息时,它不会激活JobConsumer上的Run方法。
public async Task Send<T>(string queueName, T message) where T : class
  {
      var endpointUri = GetEndpointUri(messagingHostInfo.HostAddress, queueName);
      var sendEndpoint = await bus.GetSendEndpoint(endpointUri);

      await sendEndpoint.Send(message);
  }

有人能帮忙吗?

软件

  • x1m11 n1个8.0.2英寸
  • x1米12英寸1英寸8.0.2英寸
  • x1米13英寸1x 8.0.2英寸
  • .NET6
  • 将内存中用于JobConsumer
mdfafbf1

mdfafbf11#

缺少为长时间运行的作业设置任何类型的资料档案库。我们需要执行以下操作之一:

  • 明确指定它正在使用InMemory(在文档中缺失)
  • 使用EF Core等设置 Saga 存储库。

根据MassTransit的建议,我们选择了通过实施数据库并使用EF Core与它们交互来设置 Saga 存储库。

相关问题