import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values}
import org.apache.spark.sql.types.{MapType, StringType}
case class MyMeta(op: Option[String], table: Option[String])
val metaSchema = Encoders.product[MyMeta].schema
val path = "/FileStore/tables/json_0002C_file.txt"
val df = spark.read.text(path)
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))
val df3 = df2.select(map_values(col("value")))
val df4 = df3.select($"map_values(value)"(0).as("meta"))
df4.show(false)
df4.printSchema()
val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta]
df5.show(false)
df5.printSchema()
返回以下内容:
+------------------------------------+
|meta |
+------------------------------------+
|{"op":"upd","table":"BILL.PRODUCTS"}|
|{"op":"upd","table":"BILL.PRODUCTS"}|
|{"op":"upd","table":"BILL.SALES"} |
+------------------------------------+
root
|-- meta: string (nullable = true)
AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `op` cannot be resolved. Did you mean one of the following? [`parsedMeta`]
看不出这里有什么问题。
对于DF是可以的,但不是DS。
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":"3599" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"4099" }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":"4000" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"3599" }}
{ "meta":{ "op":"upd", "table":"BILL.SALES" }, "data":{ "NUM":"20" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "NUM":"10" }}
1条答案
按热度按时间smtd7mpg1#
你的case类MyMeta的类型是
op: Option[String], table: Option[String]
,这是列的类型,而不是名为parsedMeta且类型为MyMeta的列本身,所以你需要添加正确的case类:然后将dataframe转换为MyMeta2:
结果:
更新:完整解决方案:
更新2:进一步解释
问题来自于这段代码:
.as[MyMeta2]
您正尝试在不具有相同架构的数据框上应用case类。
当将模式应用于数据框时,它应该与该数据框的列的名称和类型完全匹配。
在你的情况下,在这一行:
您正在尝试应用此架构:
到这种类型的dataframe:
正如你所看到的,在你的case类中没有名为parsedMeta的列,所以它会寻找第一个属性,它是op,并试图将它匹配到你的dataframe,但不会找到它,因为根列被称为parsedMeta,这就是为什么你得到这个错误:
因此,您需要应用正确的模式,我在答案中将其定义为MyMeta2。
如何避免将来出现模式问题:
从case类创建一个schema,如下所示,然后打印它:
结果:
然后打印要应用模式的数据框的模式:
结果:
如果模式与每个属性的名称和类型不匹配,那么您将得到一个错误。
希望有帮助。