将javardd< tuple2< object,long[]>>转换为java中的spark数据集< row>

2vuwiymt  于 2021-06-26  发布在  Java
关注(0)|答案(1)|浏览(381)

在java中(不是scala!)spark 3.0.1有一个javardd示例对象 neighborIdsRDD 它的类型是什么 JavaRDD<Tuple2<Object, long[]>> .
我的部分代码与javardd的生成有关,如下所示:

GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();

我不得不用 toJavaRDD() 因为 collectNeighborIds 返回一个 org.apache.spark.graphx.VertexRDD<long[]> 对象(vertexrdd doc)。
然而,在我的应用程序的其余部分,我需要有一个Spark Dataset<Row>collectNeighborIds 对象。
将javardd<tuple2<object,long[]>>转换为数据集的可能性和最佳方法是什么?

根据评论更新:

我根据注解调整了代码:

GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
        JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
        System.out.println("VertexRDD neighborIdsRDD is:");
        for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
            System.out.println(
                    ((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
                            Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
            );
        }

        Dataset<Row> dr = spark_session.createDataFrame(neighborIdsRDD.rdd(), Tuple2.class);
        System.out.println("converted Dataset<Row> is:");
        dr.show();

但我得到一个空数据集,如下所示:

VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
converted Dataset<Row> is:
++
||
++
||
||
||
||
||
++
6rvt4ljy

6rvt4ljy1#

我也遇到过同样的情况,但幸运的是我找到了一个方法来恢复Dataframe。
在步骤中注解解决方案代码 [1] , [2] 以及 [3] .

GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
System.out.println("VertexRDD neighborIdsRDD is:");
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
    System.out.println(
            ((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
                    Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
    );
}

// [1] Define encoding schema
StructType graphStruct =  new StructType(new StructField[]{
        new StructField("father", DataTypes.LongType, false, Metadata.empty()),
        new StructField("children", DataTypes.createArrayType(DataTypes.LongType), false, Metadata.empty()),
});

// [2] Build a JavaRDD<Row> from a JavaRDD<Tuple2<Object,long[]>>
JavaRDD<Row> dr = neighborIdsRDD.map(tupla -> RowFactory.create(tupla._1(), tupla._2()));

// [3] Finally build the reqired Dataframe<Row>
Dataset<Row> dsr = spark_session.createDataFrame(dr.rdd(), graphStruct);

System.out.println("DATASET IS:");
dsr.show();

打印输出:

VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
DATASET IS:
+------+------------+
|father|    children|
+------+------------+
|     4|         [3]|
|     1|      [2, 3]|
|     5|      [3, 2]|
|     2|   [1, 3, 5]|
|     3|[1, 2, 5, 4]|
+------+------------+

相关问题