我怎样才能让flink hcfs连接器以这样的模式从google云存储中读取 **/*S0.json
,其中文件包含换行分隔的json数据?
这些文件包含如下内容
{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}
在gcs ui中,它如下所示:
根据Flink的模式使用gcs文件进行跟踪
我怎样才能让flink hcfs连接器以这样的模式从google云存储中读取 **/*S0.json
,其中文件包含换行分隔的json数据?
这些文件包含如下内容
{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}
在gcs ui中,它如下所示:
根据Flink的模式使用gcs文件进行跟踪
1条答案
按热度按时间wribegjk1#
以纯文本形式从hcfs读取json文件后,可以将其Map到
JSONObject
使用自定义Map器:import org.apache.flink.api.common.functions.MapFunction;
import org.apache.sling.commons.json.JSONObject;
public class StringToJsonObject implements MapFunction<String, JSONObject> {
private static final long serialVersionUID = 4573928723585302447L;
}