scala spark:将结构列类型转换为十进制类型

tcbh2hod  于 2021-05-16  发布在  Spark
关注(0)|答案(2)|浏览(554)

我有csv存储在s3的位置有这样的数据

column1 | column2 | 
--------+----------
| adsf  | 2000.0  |   
| fff   | 232.34  |

我在scalaxx有一个aws胶水作业,它把这个文件读入dataframe

var srcDF= glueContext.getCatalogSource(database = '',
                                        tableName = '',
                                        redshiftTmpDir = "",
                                        transformationContext = "").getDynamicFrame().toDF()

当我打印模式时,它会这样推断自己

srcDF.printSchema()

|-- column1 : string | 
|-- column2 : struct (double, string) |

dataframe看起来像

column1 | column2    | 
--------+-------------
| adsf  | [2000.0,]  |   
| fff   | [232.34,]  |

当我试图将dataframe保存到csv时,它会抱怨

org.apache.spark.sql.AnalysisException CSV data source does not support struct<double:double,string:string> data type.

如何将dataframe转换为只有struct类型(如果存在)的列才是decimal类型?像这样输出

column1 | column2 | 
--------+----------
| adsf | 2000.0   |   
| fff  | 232.34   |

编辑:
谢谢你的回复。我试过使用以下代码

df.select($"column2._1".alias("column2")).show()

但两者都有相同的错误

org.apache.spark.sql.AnalysisException No such struct field _1 in double, string;

编辑2:
似乎spark,列被压平并重命名为“double,string”
所以,这个方法对我有效

df.select($"column2.double").show()
3df52oht

3df52oht1#

可以使用“getItem”从结构中提取字段。代码可以是这样的:

import spark.implicits._
import org.apache.spark.sql.functions.{col, getItem}

val df = Seq(
  ("adsf", (2000.0,"")),
  ("fff", (232.34,""))
).toDF("A", "B")
df.show()
df.select(col("A"), col("B").getItem("_1").as("B")).show()

它将打印:

before select:
+----+----------+
|   A|         B|
+----+----------+
|adsf|[2000.0, ]|
| fff|[232.34, ]|
+----+----------+

after select:
+----+------+
|   A|     B|
+----+------+
|adsf|2000.0|
| fff|232.34|
+----+------+
vsaztqbk

vsaztqbk2#

也可以使用点符号“column2.\u 1”按名称获取结构域:

val df = Seq(
  ("adsf", (2000.0,"")),
  ("fff", (232.34,""))
).toDF("column1", "column2")

df.show
+-------+----------+
|column1|   column2|
+-------+----------+
|   adsf|[2000.0, ]|
|    fff|[232.34, ]|
+-------+----------+

val df2 = df.select($"column1", $"column2._1".alias("column2"))

df2.show
+-------+-------+
|column1|column2|
+-------+-------+
|   adsf| 2000.0|
|    fff| 232.34|
+-------+-------+

df2.coalesce(1).write.option("header", "true").csv("output")

您的csv文件将位于“output/”文件夹中:

column1,column2
adsf,2000.0
fff,232.34

相关问题