我正在尝试新的MassTransit
IJobConsumer
实现,尽管我已经尝试按照文档进行操作,但我编写的JobConsumer
从未运行/命中。
我有:
- 我创建了
JobConsumer
,它有一个run方法,可以运行我需要它运行的代码
public class CalculationStartRunJobConsumer : IJobConsumer<ICalculationStartRun>
{
private readonly ICalculationRunQueue runQueue;
public CalculationStartRunJobConsumer(ICalculationRunQueue runQueue)
{
this.runQueue = runQueue;
}
public Task Run(JobContext<ICalculationStartRun> context)
{
return Task.Run(
() =>
{
var longRunningJob = new LongRunningJob<ICalculationStartRun>
{
Job = context.Job,
CancellationToken = context.CancellationToken,
JobId = context.JobId,
};
runQueue.StartSpecial(longRunningJob);
},
context.CancellationToken);
}
}
- 我已注册该消费者尝试使用
ConnectReceiveEndpoint
和AddConsumer
- 已按文档中所示配置
ServiceInstance
services.AddMassTransit(busRegistrationConfigurator =>
{
// TODO: Get rid of this ugly if statement.
if (consumerTypes != null)
{
foreach (var consumerType in consumerTypes)
{
busRegistrationConfigurator.AddConsumer(consumerType);
}
}
if(requestClientType != null)
{
busRegistrationConfigurator.AddRequestClient(requestClientType);
}
busRegistrationConfigurator.UsingRabbitMq((context, cfg) =>
{
cfg.UseNewtonsoftJsonSerializer();
cfg.UseNewtonsoftJsonDeserializer();
cfg.ConfigureNewtonsoftJsonSerializer(settings =>
{
// The serializer by default omits fields that are set to their default value, but this causes unintended effects
settings.NullValueHandling = NullValueHandling.Include;
settings.DefaultValueHandling = DefaultValueHandling.Include;
return settings;
});
cfg.Host(
messagingHostInfo.HostAddress,
hostConfigurator =>
{
hostConfigurator.Username(messagingHostInfo.UserName);
hostConfigurator.Password(messagingHostInfo.Password);
});
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobServiceEndpoints(serviceCfg =>
{
serviceCfg.FinalizeCompleted = true;
});
instance.ConfigureEndpoints(context);
});
});
});
- 看到作业队列确实出现在
RabbitMQ
队列中 - 当我调用
.Send
向该队列发送消息时,它不会激活JobConsumer
上的Run
方法。
public async Task Send<T>(string queueName, T message) where T : class
{
var endpointUri = GetEndpointUri(messagingHostInfo.HostAddress, queueName);
var sendEndpoint = await bus.GetSendEndpoint(endpointUri);
await sendEndpoint.Send(message);
}
有人能帮忙吗?
软件
- x1m11 n1个8.0.2英寸
- x1米12英寸1英寸8.0.2英寸
- x1米13英寸1x 8.0.2英寸
.NET6
个- 将内存中用于
JobConsumer
1条答案
按热度按时间mdfafbf11#
缺少为长时间运行的作业设置任何类型的资料档案库。我们需要执行以下操作之一:
根据MassTransit的建议,我们选择了通过实施数据库并使用EF Core与它们交互来设置 Saga 存储库。