alpakka-kafka消费者处理记录,直到遇到无法反序列化的记录并在不留下错误消息的情况下静默地终止。如何强制它报告错误消息?
uttx8gqw1#
根据文件,当流失败时,库内部将处理所有底层资源。(反)序列化如果读Kafka失败是由其他原因造成的,比如反序列化问题,那么这个阶段将立即失败。如果您预期会出现这种情况,请考虑使用原始字节数组并在随后的Map阶段进行反序列化,在该阶段可以使用监视跳过失败的元素。建议使用字节数组作为值对消息进行(反)序列化,并在akka流中的map操作中执行(反)序列化,而不是直接在kafka(反)序列化程序中实现。spray json示例(此示例使用resuming对无法正确解析并忽略错误元素的数据进行响应):
import spray.json._ final case class SampleData(name: String, value: Int) object SampleDataSprayProtocol extends DefaultJsonProtocol { implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData) } import SampleDataSprayProtocol._ val resumeOnParsingException = ActorAttributes.withSupervisionStrategy { new akka.japi.function.Function[Throwable, Supervision.Directive] { override def apply(t: Throwable): Supervision.Directive = t match { case _: spray.json.JsonParser.ParsingException => Supervision.Resume case _ => Supervision.stop } } } val consumer = Consumer .plainSource(consumerSettings, Subscriptions.topics(topic)) .map { consumerRecord => val value = consumerRecord.value() val sampleData = value.parseJson.convertTo[SampleData] sampleData } .withAttributes(resumeOnParsingException) .toMat(Sink.seq)(Keep.both) .mapMaterializedValue(DrainingControl.apply) .run()
1条答案
按热度按时间uttx8gqw1#
根据文件,
当流失败时,库内部将处理所有底层资源。
(反)序列化
如果读Kafka失败是由其他原因造成的,比如反序列化问题,那么这个阶段将立即失败。如果您预期会出现这种情况,请考虑使用原始字节数组并在随后的Map阶段进行反序列化,在该阶段可以使用监视跳过失败的元素。
建议使用字节数组作为值对消息进行(反)序列化,并在akka流中的map操作中执行(反)序列化,而不是直接在kafka(反)序列化程序中实现。
spray json示例(此示例使用resuming对无法正确解析并忽略错误元素的数据进行响应):