masstransit-ibusinstance的Kafka制作人尚未注册

hm2xizp9  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(395)

我想用masstransit建立一个Kafka消费者
我有这段代码

var services = new ServiceCollection();
services.AddMassTransit(x =>
{   
    x.AddRider(rider =>
    {
        rider.AddProducer<string, Request>("request", m => m.Message.RequestId);

        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

var provider = services.BuildServiceProvider();
var producer = provider.GetRequiredService<ITopicProducer<Request>>();

await producer.Produce(new Request()
{
    RequestId = "abc123",
    RequestedAt = DateTime.UtcNow
});

这是一个最简单的例子
但是当我试着运行它时,我得到了这个异常

Unhandled exception. System.InvalidOperationException: No service for type 'MassTransit.Registration.IBusInstance' has been registered.

看看他们网站上的例子,我发现这可能与我没有注册rabbitmq有关

x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

但我没有拉比MQ,我只在这个场景中使用Kafka。
为了向kafka生成消息,是否需要向其他消息代理注册总线?

sq1bmfud

sq1bmfud1#

根据文件:
masstransit v7引入的riders提供了一种从任何来源向公交车传递信息的新方法。乘客随车配置,启动时上车。
要添加附加程序,必须有一个总线示例。如果不需要具有持久传输(如rabbitmq)的总线,则可以使用内存中传输。

var services = new ServiceCollection();
services.AddMassTransit(x =>
{  
    x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));

    x.AddRider(rider =>
    {
        rider.AddProducer<string, Request>("request", m => m.Message.RequestId);

        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

公交车需要启动和停止,这也会启动/停止公交车上的任何乘客。你可以通过 IBusControl :

var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();

await busControl.StartAsync(cancellationToken);

或者,如果您使用的是asp.net核心通用主机,请添加masstransit托管服务。

services.AddMassTransitHostedService(); // in MassTransit.AspNetCore

相关问题