如何关闭kafka consumerconnector

irlmq6kh  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(438)

我有一个系统,它从kafka主题中提取消息,当由于某些外部资源不可用而无法处理消息时,它会关闭使用者,将消息返回到主题,并等待一段时间,然后再次启动使用者。唯一的问题是,关机不起作用。以下是我在日志中看到的:
2014-09-30 08:24:10918-com.example.kafka.kafkaconsumer[info]-[application akka.actor.workflow-context-8]关闭kafka消费者的主题新问题报告2014-09-30 08:24:10927-clients.kafka.problemreportobserver[info]-[application akka.actor.workflow-context-8]消费者关闭2014-09-30 08:24:11,946-clients.kafka.problemreportobserver[warn]-[application akka.actor.workflow-context-8]将7410-1412090624000发送回队列2014-09-30 08:24:12021-clients.kafka.problemreportobserver[debug]-[kafka akka.actor.kafka-consumer-worker-context-9]来自分区0的消息:key=7410-1412090624000,msg=7410-1412090624000
这里有几个层在工作,但重要的代码是:
KafkaConsumer.scala :

protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig)
def shutdown() = {
  logger.info(s"Shutting down kafka consumer for topic ${config.topic}")
  consumer.shutdown()
}

在观察消息的例程中:

(processor ? ProblemReportRequest(problemReportKey)).map {
  case e: ConnectivityInterruption =>
    val backoff = 10.seconds
    logger.warn(s"Can't connect to essential services, pausing for $backoff", e)
    stop()
    // XXX: Shutdown isn't instantaneous, so returning has to happen after a delay.
    // Unfortunately, there's still a race condition here, plus there's a chance the
    // system will be shut down before the message has been returned.
    system.scheduler.scheduleOnce(100 millis) { returnMessage(message) }
    system.scheduler.scheduleOnce(backoff) { start() }
    false
  case e: Exception => returnMessage(message, e)
  case _ => true
}.recover { case e => returnMessage(message, e) }

停止方法:

def stop() = {
  if (consumerRunning.get()) {
    consumer.shutdown()
    consumerRunning.compareAndSet(true, false)
    logger.info("Consumer shutdown")
  } else {
    logger.info("Consumer is already shutdown")
  }
  !consumerRunning.get()
}

这是虫子,还是我做错了?

lnxxn5zx

lnxxn5zx1#

因为你的 consumer 是一个 def . 它会创建一个新的kafka示例,并在您像这样调用它时关闭该新示例 consumer.shutdown() . 制造 consumerval 相反。

相关问题