scala对象、数据集和Dataframe之间的链接

yhived7q  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(455)

这个问题在这里已经有答案了

在spark sql中自动优雅地展平Dataframe(15个答案)
三个月前关门了。
我创建了以下case类:

case class Data(ads:Option[Ads])

case class Ads(subject: Option[String]
           , body:Option[String]
           , price:Option[Int]
           , location:Option[Location]
           , attribut:Option[Seq[Attribut]]
 )

case class Location(city:Option[String]
                , zipcode:Option[String])

case class Attribut(key_label:Option[String]
                , value_label:Option[String]
)

我用play框架解析json格式(html的一部分)。
我终于得到一个对象

JsSuccess(Ads(Some("Subject"), SOme("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label"))

我想用以下方式将其保存在csv文件中:

Subject   Body           Price   City  Zipcode  Key_Label  Value_Label
Play      Playing games  532     Geneve 95      GEN        Gen2

我将对象转换为 Ads(Some("Subject"), Some("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label") 并将此列表转换为Dataframe。
但是我只有一个列值,它包含对象的所有元素。

Value
    (Some("Subject"), SOme("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label")

有人有主意吗?我不太明白如何将scala对象与dataset和dataframe链接起来。谢谢你的帮助。

wvmv3b1j

wvmv3b1j1#

注解是有帮助的,但是泛型展平函数可能不会按所需顺序输出列,和/或处理将数组元素放入它们自己的单独列中的问题。
假设json文件包含以下行:

{"ads": {"subject": "abc", "body": "doing something", "price": 13, "location": {"city": "Houston", "zipcode": 39014}, "attribut": [{"key_label": "a", "value_label": "b"}]}}

如果文件相当一致,并且您已经将spark作为一个依赖项,那么您可能不需要使用单独的库来解析json。
你需要使用 explode 函数来处理“attribute”列是列表这一事实。使用 explode_outer 函数,如果列表可能为空,但您希望保留其他列的值。

import org.apache.spark.sql.functions._
// assuming spark is the Spark Session
val df = spark.read.json("mydata.json")

val df1 = df.select(col("ads.subject").alias("Subject"), col("ads.body").alias("Body"),
          col("ads.location.city").alias("City"), col("ads.location.zipcode").alias("Zipcode"),
          explode(col("ads.attribut")))

val resultDF = df1.select(col("Subject"), col("Body"), col("City"), col("Zipcode"),
               col("col.key_label"), col("col.value_label"))
``` `resultDF.show` 将输出:

+-------+---------------+-------+-------+---------+-----------+
|Subject| Body| City|Zipcode|key_label|value_label|
+-------+---------------+-------+-------+---------+-----------+
| abc|doing something|Houston| 39014| a| b|
+-------+---------------+-------+-------+---------+-----------+

要在指定目录中以单个csv文件的形式输出,请执行以下操作:

resultDF.repartition(1).write.option("header", "true").csv("/tmp/my-output-dir/")

相关问题