scala 无法找到使用from_json到Dataset的列

n9vozmp4  于 2023-04-06  发布在  Scala
关注(0)|答案(1)|浏览(145)
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.Encoders

import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values}
import org.apache.spark.sql.types.{MapType, StringType}

case class MyMeta(op: Option[String], table: Option[String])
val metaSchema = Encoders.product[MyMeta].schema

val path = "/FileStore/tables/json_0002C_file.txt"
val df = spark.read.text(path)   
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))    
val df3 = df2.select(map_values(col("value")))  
val df4 = df3.select($"map_values(value)"(0).as("meta"))  
df4.show(false)
df4.printSchema()
val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta]
df5.show(false)
df5.printSchema()

返回以下内容:

+------------------------------------+
|meta                                |
+------------------------------------+
|{"op":"upd","table":"BILL.PRODUCTS"}|
|{"op":"upd","table":"BILL.PRODUCTS"}|
|{"op":"upd","table":"BILL.SALES"}   |
+------------------------------------+

root
 |-- meta: string (nullable = true)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `op` cannot be resolved. Did you mean one of the following? [`parsedMeta`]

看不出这里有什么问题。
对于DF是可以的,但不是DS。

{  "meta":{    "op":"upd",     "table":"BILL.PRODUCTS"  },  "data":{    "PRICE":"3599"  }, "key":{    "PRODUCT_ID":"230117",    "DESCRIPTION":"Hamsberry vintage tee, cherry",    "PRICE":"4099"  }}
{  "meta":{    "op":"upd",     "table":"BILL.PRODUCTS"  },  "data":{    "PRICE":"4000"  }, "key":{    "PRODUCT_ID":"230117",    "DESCRIPTION":"Hamsberry vintage tee, cherry",    "PRICE":"3599"  }}
{  "meta":{    "op":"upd",     "table":"BILL.SALES"  },  "data":{    "NUM":"20"  }, "key":{    "PRODUCT_ID":"230117",    "DESCRIPTION":"Hamsberry vintage tee, cherry",    "NUM":"10"  }}
smtd7mpg

smtd7mpg1#

你的case类MyMeta的类型是op: Option[String], table: Option[String],这是列的类型,而不是名为parsedMeta且类型为MyMeta的列本身,所以你需要添加正确的case类:

case class MyMeta2(parsedMeta: MyMeta)

然后将dataframe转换为MyMeta2:

val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta2]

结果:

+--------------------+
|parsedMeta          |
+--------------------+
|{upd, BILL.PRODUCTS}|
|{upd, BILL.PRODUCTS}|
|{upd, BILL.SALES}   |
+--------------------+

更新:完整解决方案:

case class MyMeta(op: Option[String], table: Option[String])
case class MyMeta2(parsedMeta: MyMeta)

import spark.implicits._
val metaSchema = Encoders.product[MyMeta].schema

val path = "src/main/resources/input/files/json_0002C_file.txt"
val df = spark.read.text(path)
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))
val df3 = df2.select(map_values(col("value")))
val df4 = df3.select(col("map_values(value)")(0).as("meta"))
df4.show(false)
df4.printSchema()
val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta2]
df5.show(false)
df5.printSchema()

更新2:进一步解释

问题来自于这段代码:.as[MyMeta2]
您正尝试在不具有相同架构的数据框上应用case类。
当将模式应用于数据框时,它应该与该数据框的列的名称和类型完全匹配。
在你的情况下,在这一行:

val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta]

您正在尝试应用此架构:

|-- op: string (nullable = true)
|-- table: string (nullable = true)

到这种类型的dataframe:

|-- parsedMeta: struct (nullable = true)
 |    |-- op: string (nullable = true)
 |    |-- table: string (nullable = true)

正如你所看到的,在你的case类中没有名为parsedMeta的列,所以它会寻找第一个属性,它是op,并试图将它匹配到你的dataframe,但不会找到它,因为根列被称为parsedMeta,这就是为什么你得到这个错误:

A column or function parameter with name `op` cannot be resolved. Did you mean one of the following? [`parsedMeta`]

因此,您需要应用正确的模式,我在答案中将其定义为MyMeta2。
如何避免将来出现模式问题:
从case类创建一个schema,如下所示,然后打印它:

val metaSchema = Encoders.product[MyMeta].schema
metaSchema.printTreeString()

结果:

root
 |-- op: string (nullable = true)
 |-- table: string (nullable = true)

然后打印要应用模式的数据框的模式:

df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").printSchema()

结果:

root
 |-- parsedMeta: struct (nullable = true)
 |    |-- op: string (nullable = true)
 |    |-- table: string (nullable = true)

如果模式与每个属性的名称和类型不匹配,那么您将得到一个错误。
希望有帮助。

相关问题