scala—在失败时优雅地重新启动React式kafka消费流

uidvcgyl  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(523)

重新启动/完成/停止流时出现问题旧使用者未死亡/关闭:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

描述我正在构建一个服务,该服务接收来自kafka主题的消息,并通过http请求将消息发送到外部服务。
与外部服务的连接可能会断开,我的服务需要重试该请求。
此外,如果流中存在错误,则需要重新启动整个流。
最后,有时我不需要流和它对应的kafka消费者,我想关闭整个流
所以我有一条小溪:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

http请求已发送 sourceFunction 我遵循了新文档中新的kafka消费者重启说明

RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
                  streamComplete
                    .flatMap { _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
                    }
                    .recoverWith {
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
                    }
             }
            .flatMapConcat(sourceFunction)
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

在复杂的akka流中,有一个问题讨论了这个非终止消费者,但是还没有解决方案。
有没有一个解决办法,迫使Kafka消费者终止

sc4hvdpw

sc4hvdpw1#

如何将消费者 Package 到actor中并注册killswitch,请参见:https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-流处理
然后在actor poststop方法中可以终止流。通过用backoffsupervisor Package 演员,你得到了指数退避。
演员示例:https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/kafkasubscriberactor.scala#l27

相关问题