如何从sequencefile创建sparkDataframe

czq61nw1  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(295)

我正在使用spark 1.5。我想创建一个 dataframe 从hdfs中的文件。hdfs文件包含 json 按顺序输入文件格式包含大量字段的数据。
有没有一种方法可以在java中优雅地实现这一点?我事先不知道json的结构/字段。
我可以从序列文件中获取rdd输入,如下所示:

JavaPairRDD<LongWritable,BytesWritable> inputRDD = jsc.sequenceFile("s3n://key_id:secret_key@file/path", LongWritable.class, BytesWritable.class);
JavaRDD<String> events = inputRDD.map(
    new Function<Tuple2<LongWritable,BytesWritable>, String>() {
        public String call(Tuple2<LongWritable,BytesWritable> tuple) {
            return Text.decode(tuple._2.getBytes());
        }
    }
);

如何从rdd创建Dataframe?

4ioopgfo

4ioopgfo1#

我对序列文件中的json数据执行了以下操作:

JavaRDD<String> events = inputRDD.map(
    new Function<Tuple2<LongWritable,BytesWritable>, String>() {
        public String call(Tuple2<LongWritable,BytesWritable> tuple) throws JSONException, UnsupportedEncodingException {
            String valueAsString = new String(tuple._2.getBytes(), "UTF-8");
            JSONObject data = new JSONObject(valueAsString);
            JSONObject payload = new JSONObject(data.getString("payload"));
            String atlas_ts = "";
            return payload.toString();
        }
    }
    );

相关问题