这个问题在这里已经有答案了:
在spark sql中自动优雅地展平Dataframe(15个答案)
三个月前关门了。
我创建了以下case类:
case class Data(ads:Option[Ads])
case class Ads(subject: Option[String]
, body:Option[String]
, price:Option[Int]
, location:Option[Location]
, attribut:Option[Seq[Attribut]]
)
case class Location(city:Option[String]
, zipcode:Option[String])
case class Attribut(key_label:Option[String]
, value_label:Option[String]
)
我用play框架解析json格式(html的一部分)。
我终于得到一个对象
JsSuccess(Ads(Some("Subject"), SOme("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label"))
我想用以下方式将其保存在csv文件中:
Subject Body Price City Zipcode Key_Label Value_Label
Play Playing games 532 Geneve 95 GEN Gen2
我将对象转换为 Ads(Some("Subject"), Some("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label")
并将此列表转换为Dataframe。
但是我只有一个列值,它包含对象的所有元素。
Value
(Some("Subject"), SOme("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label")
有人有主意吗?我不太明白如何将scala对象与dataset和dataframe链接起来。谢谢你的帮助。
1条答案
按热度按时间wvmv3b1j1#
注解是有帮助的,但是泛型展平函数可能不会按所需顺序输出列,和/或处理将数组元素放入它们自己的单独列中的问题。
假设json文件包含以下行:
如果文件相当一致,并且您已经将spark作为一个依赖项,那么您可能不需要使用单独的库来解析json。
你需要使用
explode
函数来处理“attribute”列是列表这一事实。使用explode_outer
函数,如果列表可能为空,但您希望保留其他列的值。+-------+---------------+-------+-------+---------+-----------+
|Subject| Body| City|Zipcode|key_label|value_label|
+-------+---------------+-------+-------+---------+-----------+
| abc|doing something|Houston| 39014| a| b|
+-------+---------------+-------+-------+---------+-----------+
resultDF.repartition(1).write.option("header", "true").csv("/tmp/my-output-dir/")