在Flink中序列化circe.Json

guykilcj  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(115)

Scala Flink has problems serializing json in circe.Json format.

I am using lib flink-adt to derive TypeInformation that contains the serializer.

implicit val jsonInfo: TypeInformation[Json] =
    deriveTypeInformation[Json]

Here is the error message:

magnolia: child class BiggerDecimalJsonNumber of class JsonNumber is neither final nor a case class

I also tried this.

implicit val jsonInfo: TypeInformation[Json] =
    TypeInformation.of(classOf[Json])

It compiles but crashes when I run unit test.

uqzxnwby

uqzxnwby1#

I got the serializer to work by adding new ExecutionConfig()
This code worked:

import io.circe._
import io.circe.syntax._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.ExecutionConfig

    implicit val jsonTypeInformation: TypeInformation[Json] =
      TypeInformation.of(classOf[Json])
    val typeInformation = implicitly[TypeInformation[Json]]
    val json = """[{"level":"up","num":1.23,"valid":true}]""".asJson
    val ser = typeInformation.createSerializer(new ExecutionConfig())
    roundtrip(ser, json)

相关问题