我用的是flink的(1.7.2)kafka消费者。如何反序列化几个扩展相同特性的case类?例如
import spray.json.{DefaultJsonProtocol, RootJsonFormat}
trait Foo
case class Boo(name: String) extends Foo
case class Buzz(name: String, age: Int) extends Foo
object Formats extends DefaultJsonProtocol{
implicit val booFormat: RootJsonFormat[Boo] =
jsonFormat1(Boo.apply)
implicit val buzzFormat: RootJsonFormat[Buzz] =
jsonFormat2(Buzz.apply)
}
我用的是Kafka消费者 DeserializationSchema
:
class FooSchema extends DeserializationSchema[Foo]{
@transient lazy val log = LoggerFactory.getLogger(this.getClass)
implicit val typeInfo = createTypeInformation[Foo]
override def deserialize(bytes: Array[Byte]): Foo = {
val foo = new String(bytes, StandardCharsets.UTF_8).parseJson
.convertTo[Foo] //doesn't compile, I need to deserialize to Boo and Buzz
log.debug(s"Received Boo")
foo
}
override def isEndOfStream(t: Foo): Boolean = false
override def getProducedType: TypeInformation[Foo] = createTypeInformation[Foo]
}
任何想法都将不胜感激
1条答案
按热度按时间kmb7vmvb1#
请尝试使用shapeless,它可以自动导出ADT的解码器,如下所示:
别忘了做这个
sealed
. 注意,原始json需要包含type
字段消歧器rawJsonString.parseJson.convertTo[Foo]
比如说工作应输出