Kafka流:有没有一种方法可以在写入另一个主题时忽略主题分区中的特定偏移量

fivyi3re  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(368)

背景:我在生成toprod主题时使用了错误的avro模式注册表,结果kafka connect因为模式id错误的消息而中断。因此,作为一个恢复计划,我们希望将prod主题中的消息复制到测试主题,然后将好的消息写入hdfs。但是我们面临着某些偏移量的问题,这些偏移量是读取prod主题时架构id错误。是否有方法在写入另一个主题时忽略此类偏移。

Exception in thread "StreamThread-1" 
 org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value 
 for record. topic=xxxx, partition=9, offset=1259032
  Caused by: org.apache.kafka.common.errors.SerializationException: Error 
  retrieving Avro schema for id 600
  Caused by: 

  io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
   Schema not found io.confluent.rest.exceptions.RestNotFoundException: Schema not found
  io.confluent.rest.exceptions.RestNotFoundException: Schema not found

{代码}

lsmepo6l

lsmepo6l1#

您可以将反序列化异常处理程序更改为跳过这些记录,如文档中所述:https://docs.confluent.io/current/streams/faq.html#handling-损坏的记录和反序列化错误
艾丽,你准备好了吗 LogAndContinueExceptionHandler 在config via参数中 default.deserialization.exception.handler .

相关问题