我试图处理与MassTransit,我有一些服务器与StableDeficiency,他们接受任务,在队列中生成图像
但事实是,不同的服务器有不同的模型集,我不想为每个模型创建大量队列
我的GPU服务器如何从队列中过滤任务,并只处理它们可以执行的任务?
我的PublisherServer
internal class Program {
static async Task Main(string[] args) {
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(new Uri(ServerConfig.Host), h => {
h.Username(ServerConfig.Username);
h.Password(ServerConfig.Password);
});
cfg.ReceiveEndpoint("calculation_results_queue", e => {
e.Consumer(() => new ResultConsumer());
});
});
await busControl.StartAsync();
Console.WriteLine("Publisher is running...");
await busControl.Publish<CalculationTask>(new {
TaskId = NewId.NextGuid().ToString(),
Data = "Important data for calculation"
}, context => {
context.Headers.Set("model", "model_comic");
});
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await busControl.StopAsync();
}
}
字符串
我的GPU工人
internal class Program {
static async Task Main(string[] args) {
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(new Uri(ServerConfig.Host), h => {
h.Username(ServerConfig.Username);
h.Password(ServerConfig.Password);
});
cfg.ReceiveEndpoint("calculation_task_queue", e => {
e.Bind("headers_exchange", x => {
x.ExchangeType = "headers";
x.SetExchangeArgument("x-match", "any");
x.SetExchangeArgument("model", "model_real");
x.SetExchangeArgument("model", "model_anime");
});
e.Consumer(() => new CalculationTaskConsumer());
});
});
await busControl.StartAsync();
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await busControl.StopAsync();
}
}
型
我的员工不应该接收信息,但他还是这样做了。
1条答案
按热度按时间wpx232ag1#
字符串
默认的拓扑配置可能是原因所在,但您可以通过查看RabbitMQ管理控制台并查看附加绑定来了解这一点。