如何暂停Nest.js Kafka消费者

cvxl0en2  于 11个月前  发布在  Apache
关注(0)|答案(1)|浏览(170)

我在Nest.js项目中使用Kafka.js。这是我初始化KafkaClient的方式:

@Module({
...
providers: [{
      provide: 'KAFKA_CLIENT',
      useFactory: async (configService: KafkaClientConfigService) => {

        const kafkaOptions = configService.getKafkaOptions();
        return ClientProxyFactory.create(kafkaOptions);
      },
      inject: [KafkaClientConfigService],
    },
  ]
...
})

字符串
现在我将KafkaClient注入到我的控制器中,我希望使用间隔来消费消息。虽然有一种方法可以使用Kafka.js使用consumer.pause()来实现这一点,但我在KafkaClient中找不到任何引用。
是否有任何选项可以通过暂停或限制消费者来实现这一点?

ldxq2e6h

ldxq2e6h1#

ClientKafka有一个protected consumer属性。在消费者本身,你可以调用pause()resume()
为了使用这些方法,我目前正在用一个自定义类扩展ClientKafka,并将其用作提供程序。

import { ClientKafka, KafkaOptions } from '@nestjs/microservices'

export class KafkaClient extends ClientKafka {
  pause() {
    this.consumer.pause()
    // ...
  }
}

字符串
请确保使用自定义类作为app.module.ts中的提供程序:

@NgModule({
  providers: [
    { provide: ClientKafka, useClass: KafkaClient },
  ]
})


或者使用useFactory来注入依赖项。

相关问题