scala 如何将平面df转换为结构 Dataframe 并创建视图?

epggiuax  于 2023-03-12  发布在  Scala
关注(0)|答案(1)|浏览(177)

这是我的表结构:

customer_item(struct)
   customer_name(string)
   customer_detail(struct)
       bill_to_city(string)
       bill_to_country(string)
       bill_to_state(string)
       contact_phone(string)
       country_name(string)
       partner_name(string)

这是我得到的 Dataframe ,我想用这个df创建一个将来可以使用的视图,但是我这里得到的结构是扁平的,我需要把它转换成上面的结构,否则我将来创建的视图就不能用了。

+-------------+------------+---------------+-------------+-------------+-------------+------------+
|customer_name|bill_to_city|bill_to_country|bill_to_state|contact_phone|country_name |partner_name|
+-------------+------------+---------------+-------------+-------------+-------------+------------+
|abc          |Arlington   |US             |Texas        |123.456.7890 |United States|name        |
+-------------+------------+---------------+-------------+-------------+-------------+------------+

这是我的代码,出现错误
对未解析对象的限定符调用无效,树:'客户_项目'
如何解决此问题?

val selectedDf = df.select(col("customer_detail")).select("customer_detail.*")

selectedDf.show(false)

val tempViewName = "tempStageTransView"

selectedDf.createOrReplaceTempView(tempViewName)
executeInsertIntoBookingTransactionQuery(spark).show()

private def executeInsertIntoTable(sparkSession: SparkSession) = {
    sparkSession.sql(raw"""
        INSERT OVERWRITE TABLE ${myTable}
        SELECT
            null,
            (customer detail here)
            null,
            null,
            null,
            null
        FROM tempStageTransView
    """)
}
cedebl8k

cedebl8k1#

可以使用org.apache.spark.sql.functions中的struct函数创建结构。
例如,如下所示(注意,我使用了两次struct,因为您的模式中有两个嵌套的结构体):

import org.apache.spark.sql.functions.{col, struct}

val df = Seq(
  ("abc","Arlington","US","Texas","123.456.7890","United States","name")
).toDF("customer_name","bill_to_city","bill_to_country","bill_to_state","contact_phone","country_name","partner_name")

val output = df.select(
  struct(
    col("customer_name"),
    struct(
      "bill_to_city",
      "bill_to_country",
      "bill_to_state",
      "contact_phone",
      "country_name",
      "partner_name").as("customer_detail")
    ).as("customer_item")
  )

scala> output.printSchema
root
 |-- customer_item: struct (nullable = false)
 |    |-- customer_name: string (nullable = true)
 |    |-- customer_detail: struct (nullable = false)
 |    |    |-- bill_to_city: string (nullable = true)
 |    |    |-- bill_to_country: string (nullable = true)
 |    |    |-- bill_to_state: string (nullable = true)
 |    |    |-- contact_phone: string (nullable = true)
 |    |    |-- country_name: string (nullable = true)
 |    |    |-- partner_name: string (nullable = true)

使用.as方法可以重命名列,这样就可以使用customer_detailcustomer_item列名。

相关问题