我正在尝试将流下沉到s3 bucket并实现 Encoder
接口。
case class Info(vecId: Long, bkCode: String, state: String)
class S3Encoder extends Encoder[Info] {
private val gson = new Gson()
override def encode(element: Info, stream: OutputStream): Unit = {
val json = new util.HashMap[String, Any]()
json.put("vecId", element.vecId)
json.put("bk", element.bkCode)
json.put("state", element.state)
// println(gson.toJson(json))
stream.write(gson.toJson(json).getBytes("UTF-8"))
stream.write('\n')
}
我把Flume加在
val sink: StreamingFileSink[Info] = StreamingFileSink
.forRowFormat(new Path(s3_path), new S3Encoder)
.build()
但遇到了错误 non-serializable
,有人能说明这一点吗?非常感谢!
暂无答案!
目前还没有任何答案,快来回答吧!