flink:richsink函数的实现是不可序列化的

iqjalb3h  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(299)

我正在尝试将流下沉到s3 bucket并实现 Encoder 接口。

case class Info(vecId: Long, bkCode: String, state: String)

class S3Encoder extends Encoder[Info] {

    private val gson = new Gson()

    override def encode(element: Info, stream: OutputStream): Unit = {

      val json = new util.HashMap[String, Any]()

      json.put("vecId", element.vecId)
      json.put("bk", element.bkCode)
      json.put("state", element.state)

//      println(gson.toJson(json))
      stream.write(gson.toJson(json).getBytes("UTF-8"))
      stream.write('\n')

    }

我把Flume加在

val sink: StreamingFileSink[Info] = StreamingFileSink
            .forRowFormat(new Path(s3_path), new S3Encoder)
            .build()

但遇到了错误 non-serializable ,有人能说明这一点吗?非常感谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题