我们正在使用sbt、scala、avro、kafka、avro4s和avrohugger sbt插件,在编译期间调用这些插件,从.avsc avro文件定义的类型生成scala case类。
一个特定的kafka主题有一个avro类型,表示为其他两种类型的并集:简单的消息有t1或t2类型(就像两种类型的事件)。
我试图为[t1,t2]实现一个serde(序列化程序加反序列化程序)。
当尝试执行以下操作时:
def serializer(): Serializer[Either[TariffUpserted, TariffCancelled]] =
(topic: String, data: Either[TariffUpserted, TariffCancelled]) => {
data match {
case Left(tariffUpserted) =>
implicitly[Serde[TariffUpserted]]
.serializer()
.serialize(topic, tariffUpserted)
case Right(contractCancelled) =>
implicitly[Serde[TariffCancelled]]
.serializer()
.serialize(topic, contractCancelled)
}
}
我意识到avro标准必须指定某种类型的报头,其中写入了哪种类型(t1或t2)是对剩余有效负载进行编码的类型。
我该如何实现这一点?
暂无答案!
目前还没有任何答案,快来回答吧!