如何将java复杂对象转换为sparkDataframe

polhcujo  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(568)

我使用的是javaspark,下面是我的代码

JavaRDD<MyComplexEntity> myObjectJavaRDD = resultJavaRDD.flatMap(result -> result.getMyObjects());

DataFrame df = sqlContext.createDataFrame(myObjectJavaRDD, MyComplexEntity.class);

df.saveAsParquetFile("s3a://mybucket/test.parquet");

mycomplexentity.java文件

public MyComplexEntity implements Serializable {
     private Identifier identifier;
     private boolean isSwitch1True;
     private String note;
     private java.util.ArrayList<Identifier> secodaryIds;
     ......
}

标识符.java

public Identifier implements Serializable {
     private int id;
     private String uuid;
     ......
}

问题是我从myobjectjavardd创建Dataframe时在第2步失败。如何将复杂java对象列表转换为Dataframe。谢谢

izj3ouym

izj3ouym1#

不管怎样,你能把它转换成scala吗?
scala支架 case class 在这种情况下
对你来说,挑战在于你有一个 Seq/ArrayInner 案例类as=> private java.util.ArrayList<Identifier> secodaryIds; 所以可以用下面的方法

// inner case class Identifier
case class Identifier(Id : Integer , uuid : String)
val innerVal = Seq(Identifier(1,"gsgsg"),Identifier(2,"dvggwgwg"))

// Outer case class MyComplexEntity
case class MyComplexEntity(notes : String, identifierArray : Seq[Identifier])
val outerVal = MyComplexEntity("Hello", innerVal)

请注意=>
outerval:mycomplexentity包含标识符对象列表,如下所示 outerVal: MyComplexEntity = MyComplexEntity(Hello,List(Identifier(1,gsgsg), Identifier(2,dvggwgwg))) 现在实际的spark方法是使用数据集

import spark.implicits._
// Convert Our Input Data in Same Structure as your MyComplexEntity
// Only Trick is To 'Reflect' A Seq[(Int,String)] => Seq[Identifier]
// Hence we have to do 2 Mapping once for Outer Case class (MyComplexEntity) And Once For Inner Seq of Identifier
// If We Just Take this Input Data and Convert To DataSet ( without any Schema Inference)
// This is How It looks 

val inputData = Seq(("Some DAY",Seq((210,"wert67"),(310,"bill123"))),
                    ("I WILL BE", Seq((420,"henry678"),(1000,"baba123"))),
                    ("Saturday Night",Seq((1000,"Roger123"),(2000,"God345")))
                    )

val unMappedDs = inputData.toDS

给我们=>

// See how it is Infered
// unMappedDs: org.apache.spark.sql.Dataset[(String, Seq[(Int, String)])] = [_1: string, _2: array<struct<_1:int,_2:string>>]

但如果我们“正确”Map它=>
作为=> // Second element is a Seq[(Int,String)] and We map it into Seq[Identifier] as x._2.map(y => Identifier(y._1,y._2)) 如下所示:

val resultDs = inputData.toDS.map(x =>MyComplexEntity(x._1,x._2.map(y => Identifier(y._1,y._2))))
resultDs.show(20,false)

我们得到一个类似=> resultDs: org.apache.spark.sql.Dataset[MyComplexEntity] = [notes: string, identifierArray: array<struct<Id:int,uuid:string>>] 数据如下:

+--------------+--------------------------------+
|notes         |identifierArray                 |
+--------------+--------------------------------+
|Some DAY      |[[210,wert67], [310,bill123]]   |
|I WILL BE     |[[420,henry678], [1000,baba123]]|
|Saturday Night|[[1000,Roger123], [2000,God345]]|
+--------------+--------------------------------+

使用scala很容易。谢谢。

相关问题