force alpakka kafka consumer show关于反序列化错误的错误消息

pw136qt2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(346)

alpakka-kafka消费者处理记录,直到遇到无法反序列化的记录并在不留下错误消息的情况下静默地终止。如何强制它报告错误消息?

uttx8gqw

uttx8gqw1#

根据文件,
当流失败时,库内部将处理所有底层资源。
(反)序列化
如果读Kafka失败是由其他原因造成的,比如反序列化问题,那么这个阶段将立即失败。如果您预期会出现这种情况,请考虑使用原始字节数组并在随后的Map阶段进行反序列化,在该阶段可以使用监视跳过失败的元素。
建议使用字节数组作为值对消息进行(反)序列化,并在akka流中的map操作中执行(反)序列化,而不是直接在kafka(反)序列化程序中实现。
spray json示例(此示例使用resuming对无法正确解析并忽略错误元素的数据进行响应):

import spray.json._

final case class SampleData(name: String, value: Int)

object SampleDataSprayProtocol extends DefaultJsonProtocol {
  implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}

import SampleDataSprayProtocol._

    val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
      new akka.japi.function.Function[Throwable, Supervision.Directive] {
        override def apply(t: Throwable): Supervision.Directive = t match {
          case _: spray.json.JsonParser.ParsingException => Supervision.Resume
          case _ => Supervision.stop
        }
      }
    }

    val consumer = Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      .map { consumerRecord =>
        val value = consumerRecord.value()
        val sampleData = value.parseJson.convertTo[SampleData]
        sampleData
      }
      .withAttributes(resumeOnParsingException)
      .toMat(Sink.seq)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()

相关问题