scala 无法将包含数组的 Dataframe 列转换为String

6za6bjd0  于 2023-02-08  发布在  Scala
关注(0)|答案(1)|浏览(201)

我有以下 Dataframe :

我想把结果列转换成另一个 Dataframe 。
这是我正在尝试执行的代码:

val JsonString = df.select(col("results")).as[String]
val resultsDF = spark.read.json(JsonString)

但第一行返回以下错误:

AnalysisException: Cannot up cast `results` from array<struct<auctions:bigint,bid_price_sum:double,bid_selected_price_sum:double,bids_cancelled:bigint,bids_done:bigint,bids_fail_currency:bigint,bids_fail_parsing:bigint,bids_failed:bigint,bids_filtered_blockrule:bigint,bids_filtered_duration:bigint,bids_filtered_floor_price:bigint,bids_lost:bigint,bids_selected:bigint,bids_timeout:bigint,clicks:bigint,content_owner_id:string,content_owner_name:string,date:bigint,impressions:bigint,intext_inventory:bigint,ivt_blocked:struct<blocked_reason_automated_browsing:bigint,blocked_reason_data_center:bigint,blocked_reason_false_representation:bigint,blocked_reason_irregular_pattern:bigint,blocked_reason_known_crawler:bigint,blocked_reason_manipulated_behavior:bigint,blocked_reason_misleading_uer_interface:bigint,blocked_reason_undisclosed_classification:bigint,blocked_reason_undisclosed_classification_ml:bigint,blocked_reason_undisclosed_use_of_incentives:bigint,ivt_blocked_requests:bigint>,no_bid:bigint,requests:bigint,requests_country:bigint,revenue:double,vtr0:bigint,vtr100:bigint,vtr25:bigint,vtr50:bigint,vtr75:bigint>> to string.
The type path of the target object is:
- root class: "java.lang.String"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object
5gfr0r5j

5gfr0r5j1#

这意味着results不是String
例如

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.col

object Main extends App {
  val spark = SparkSession.builder
    .master("local")
    .appName("Spark app")
    .getOrCreate()

  import spark.implicits._

  case class MyClass(auctions: Int, bid_price_sum: Double)

  val df: DataFrame =
    Seq(
      ("xxx", "yyy", 1, """[{"auctions":9343, "bid_price_sum":1.062}, {"auctions":1225, "bid_price_sum":0.153}]"""),
      ("xxx1", "yyy1", 2, """{"auctions":1111, "bid_price_sum":0.111}"""),
    )
    .toDF("col1", "col2", "col3", "results")
  df.show()

  val JsonString = df.select(col("results")).as[String]
  val resultsDF = spark.read.json(JsonString)
  resultsDF.show()
}

生产

+----+----+----+--------------------+
|col1|col2|col3|             results|
+----+----+----+--------------------+
| xxx| yyy|   1|[{"auctions":9343...|
|xxx1|yyy1|   2|{"auctions":1111,...|
+----+----+----+--------------------+

+--------+-------------+
|auctions|bid_price_sum|
+--------+-------------+
|    9343|        1.062|
|    1225|        0.153|
|    1111|        0.111|
+--------+-------------+

// ........................

  val df: DataFrame =
    Seq(
      ("xxx", "yyy", 1, Seq(MyClass(9343, 1.062), MyClass(1225, 0.153))),
      ("xxx1", "yyy1", 2, Seq(MyClass(1111, 0.111))),
    )
    .toDF("col1", "col2", "col3", "results")

// ........................

生成异常

org.apache.spark.sql.AnalysisException: Cannot up cast results
from "ARRAY<STRUCT<auctions: INT, bid_price_sum: DOUBLE>>" to "STRING".
The type path of the target object is:
- root class: "java.lang.String"
You can either add an explicit cast to the input data 
or choose a higher precision type of the field in the target object

您可以使用以下命令修复异常

df.select(col("results")).as[Seq[MyClass]]

代替

df.select(col("results")).as[String]

类似地,

val df = spark.read.json("src/main/resources/file.json")

在上面的代码中,为以下file.json生成正确的结果

{"col1": "xxx", "col2": "yyy", "col3": 1, "results": "[{\"auctions\":9343, \"bid_price_sum\":1.062}, {\"auctions\":1225, \"bid_price_sum\":0.153}]"}
{"col1": "xxx1", "col2": "yyy1", "col3": 2, "results": "{\"auctions\":1111, \"bid_price_sum\":0.111}"}

但对以下文件抛出

{"col1": "xxx", "col2": "yyy", "col3": 1, "results": [{"auctions":9343, "bid_price_sum":1.062}, {"auctions":1225, "bid_price_sum":0.153}]}
{"col1": "xxx1", "col2": "yyy1", "col3": 2, "results": [{"auctions":1111, "bid_price_sum":0.111}]}

Extract columns from a json and write into a dataframe

相关问题