param message.send.max.retries在kafka producer中不起作用

7vhp5slm  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(302)

我有一个关于scala和sbt的项目
我有制片人
如果Kafka无法联系到我,我会尝试重新发送消息

package com.example

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NonFatal

object producer extends App {

  private val decider: Supervision.Decider = {
    case NonFatal(ex) =>
      println("Non fatal exception in flow. Skip message and resuming flow.",
              ex)
      Supervision.Restart

    case ex: Throwable =>
      println("Other exception in flow. Stopping flow.", ex)
      Supervision.Stop
  }

  implicit val system = ActorSystem("QuickStart")
  private val strategy =
    ActorMaterializerSettings(system).withSupervisionStrategy(decider)
  implicit val materializer = ActorMaterializer(strategy)
  val config = system.settings.config.getConfig("akka.kafka.producer")
  val producerSettings =
    ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers("10.20.10.193:9092")
      .withProperty("message.send.max.retries", "3")
      .withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000")
      //.withProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "5000")

  val done =
    Source
      .single("11")
      .map(value => new ProducerRecord[String, String]("example", value))
      .runWith(Producer.plainSink(producerSettings))

  Await.result(done, 1000 seconds)
}

我定义了一个属性:

.withProperty("message.send.max.retries", "3")

但它不起作用
当我用一个坏的kafka主机运行produser时,输出是

[INFO ] - 2018-07-30 23:00:36,951 - suppression - akka.event.slf4j.Slf4jLogger - Slf4jLogger started
(Non fatal exception in flow. Skip message and resuming flow.,org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.)
(Non fatal exception in flow. Skip message and resuming flow.,org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.)

日志中只有两次重试,而不是三次

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题