大家好,
我试图在spark中解析一个xml文件。我正在使用explode函数来展平数据。下面是输入、输出模式和代码。
Input Schema
root
|-- _no: string (nullable = true)
|-- _sc double (nullable = true)
|-- _xsi: string (nullable = true)
|-- header: struct (nullable = true)
| |-- con: string (nullable = true)
| |-- co: string (nullable = true)
| |-- cr: date (nullable = true)
| |-- pe: string (nullable = true)
| |-- st: timestamp (nullable = true)
|-- scs: struct (nullable = true)
| |-- _te: string (nullable = true)
| |-- scle: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- cId: long (nullable = true)
| | | |-- eId: long (nullable = true)
| | | |-- ent: array (nullable = true)
| | | | |-- eent: struct (containsNull = true)
| | | | | |-- MSId: string (nullable = true)
| | | | | |-- _date: date (nullable = true)
| | | | | |-- uas: string (nullable = true)
| | | | | |-- tes: struct (nullable = true)
| | | | | | |-- time: string (nullable = true)
| | | | | |-- tv: uct (nullable = true)
| | | | | | |-- LUE: string (nullable = true)
| | | | | | |-- _dour: string (nullable = true)
| | | | | | |-- nete: string (nullable = true)
| | | | | | |-- netSy: string (nullable = true)
| | | | | | |-- parum: struct (nullable = true)
| | | | | | | |-- UE: long (nullable = true)
| | | | | | | |-- Parts: long (nullable = true)
| | | | | | |-- sa: struct (nullable = true)
| | | | | | | |-- VA: boolean (nullable = true)
| | | | | | | |-- ng: string (nullable = true)
| | | | | | |-- stitled: struct (nullable = true)
| | | | | | | |-- LUE: boolean (nullable = true)
| | | | | | | |-- ng: string (nullable = true)
| | | | | | |-- tvering: struct (nullable = true)
| | | | | | | |-- dfLUE: string (nullable = true)
| | | | | | | |-- _body: string (nullable = true)
| | | | | | |-- ubting: struct (nullable = true)
| | | | | | | |-- _LUE: string (nullable = true)
| | | | | | | |-- dy: string (nullable = true)
需要输出。
root
|-- _no: string (nullable = true)
|-- _sc double (nullable = true)
|-- _xsi: string (nullable = true)
|-- header: struct (nullable = true)
|-- con: string (nullable = true)
|-- co: string (nullable = true)
|-- cr: date (nullable = true)
|-- pe: string (nullable = true)
|-- st: timestamp (nullable = true)
|-- scs: struct (nullable = true)
|-- _te: string (nullable = true)
|-- scle: array (nullable = true)
|-- element: struct (containsNull = true)
|-- cId: long (nullable = true)
|-- eId: long (nullable = true)
|-- ent: array (nullable = true)
|-- eent: struct (containsNull = true)
|-- MSId: string (nullable = true)
|-- _date: date (nullable = true)
|-- uas: string (nullable = true)
|-- tes: struct (nullable = true)
|-- time: string (nullable = true)
|-- tv: uct (nullable = true)
|-- LUE: string (nullable = true)
|-- _dour: string (nullable = true)
|-- nete: string (nullable = true)
|-- netSy: string (nullable = true)
|-- parum: struct (nullable = true)
|-- UE: long (nullable = true)
|-- Parts: long (nullable = true)
|-- sa: struct (nullable = true)
|-- VA: boolean (nullable = true)
|-- ng: string (nullable = true)
|-- stitled: struct (nullable = true)
|-- LUE: boolean (nullable = true)
|-- ng: string (nullable = true)
|-- tvering: struct (nullable = true)
|-- dfLUE: string (nullable = true)
|-- _body: string (nullable = true)
|-- ubting: struct (nullable = true)
|-- _LUE: string (nullable = true)
|-- dy: string (nullable = true)
xml文件的大小为100mb,当我读取xml文件时,Dataframe的计数显示为1。我相信spark正在将整个xml文件读入一行。
用来爆炸的代码,
val readxml = spark.read.format("xml").option("rowTag","on")\
.option("inferschema","true").load("/path")
val co= readxml.withColumn("cId", explode(col("scs.scle.cId")))
.withColumn("eId", explode(col("scs.scle.schedule.eId")))
.withColumn("exploded_sc", explode(col("scs.scle.ent")))
.withColumn("uas", explode(col("exploded_sc.uas")))
.withColumn("ag_dt", explode(col("exploded_sc._date")))
.withColumn("time", explode(col("exploded_sc.tes.time")))
.withColumn("MSId", explode(col("exploded_sc.MSId")))
.withColumn("exploded_tv", explode(col("exploded_sc.tv")))
val finalDF = co.select(col("_sc"),col("header.*"),col("scs._te").as("_te"),
col("cId"),col("eId"),
col("MSId"),col("time"), col("exploded_tv._dour").as("_dour"),
col("exploded_tv.tvering.dfLUE").as("tvra"),
col("exploded_tv.tvering._body").as("body"),
col("exploded_tv.parum.UE").as("pnum"),
col("exploded_tv.parum.Parts").as("npart"),
col("exploded_tv.ubting._LUE").as("tsting"),
col("exploded_tv.ubting.dy").as("tsting_body"),
col("exploded_tv.nete").as("netsrce"),
col("exploded_tv.netSy").as("nettype"),
col("exploded_tv.sa.VA").as("sp"),
col("exploded_tv.sa.ng").as("lg"),
col("exploded_tv.stitled.dfLUE").as("subled"),
col("exploded_tv.stitled.ng").as("sud_ng"),
col("uas"),col("ag_dt"))
我从上面的代码中得到所需的输出模式。但我无法查看finaldf的数据,我怀疑是因为数据太大。然后我在读取xml文件后开始计算每次爆炸的次数。然后我知道explode函数由于重复而以指数方式增加行数。是否有其他方法来实现上述输出。有人能帮忙吗。先谢谢你
暂无答案!
目前还没有任何答案,快来回答吧!