在Nest Application而不是Nest Microservice中运行NestJS Kafka consumer

jjjwad0x  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(111)

我有一个相当简单的Nest微服务,看起来像这样:

async function bootstrap() {
  const app = await NestFactory.createMicroservice(EventModule, microserviceConfig)
  await app.listen()
}

bootstrap();

我在microserviceConfig中配置了各种Kafka传输和选项。然后,我可以使用一个非常简单的控制器来监听关于某个主题的消息:

@Controller()
export class EventController {
  constructor(private readonly eventService: EventService) {}

  @EventPattern("ITEM_CREATED")
  async handleClientRelationshipCreated(@Ctx() ctx : KafkaContext) {
    console.log(ctx.getTopic())
    this.eventService.handleMessage(ctx.getMessage())
  }
}

这工作得很好,但是我想在Nest应用程序中使用相同的代码:

const app = await NestFactory.create(AppModule, {
  bufferLogs: true,
});

当我将我的Kafka配置添加到我的应用模块时:

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'my-service',
        transport: Transport.KAFKA,
        options: {
          client: {
// ...

应用程序运行,但我没有看到任何类型的Kafka初始化的日志,也没有收到任何消息。我猜Kafka库是围绕微服务构建的,不支持Nest应用程序。
问:
在Nest应用程序中使用Kafka消息的最简单方法是什么?将我的应用程序移植到微服务上并不容易,例如,我们使用了一堆微服务不支持的应用程序中间件。
有没有可能在某个地方运行Kafka消费者,然后在某种后台工作程序中手动启动它?是否有任何现有的库允许在Nest应用程序中使用Kafka消息?谢谢你,谢谢

bn31dyow

bn31dyow1#

我想出了答案。运行hybrid application是这里的解决方案!
我只是跑:

app.connectMicroservice<MicroserviceOptions>(
    {
      transport: Transport.KAFKA,

      options: {
// ...

然后:

await app.startAllMicroservices();

现在,我的普通Nest应用程序和基于Kafka的微服务都在一个进程中运行。

相关问题