scala Spark-SQL:如何将TSV或CSV文件读入DataFrame并应用定制模式?

gopyfrb3  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(215)

我在使用制表符分隔值(TSV)和逗号分隔值(CSV)文件时使用的是Spark 2.0。我希望将数据加载到Spark-SQL DataFrame中,在该 Dataframe 中,我希望在读取文件时完全控制模式。我不希望Spark从文件中的数据猜测模式。
我如何将TSV或CSV文件加载到Spark SQL Dataframe并对其应用模式?

bxfogqkk

bxfogqkk1#

下面是装入制表符分隔值(TSV)文件并应用模式的完整Spark 2.0示例。
我以Iris data set in TSV format from UAH.edu为例。下面是该文件的前几行:

Type    PW      PL      SW      SL
0       2       14      33      50
1       24      56      31      67
1       23      51      31      69
0       2       10      36      46
1       20      52      30      65

要强制执行架构,可以使用以下两种方法之一以编程方式构建该架构:
A.使用StructType创建模式:

import org.apache.spark.sql.types._

var irisSchema = StructType(Array(
    StructField("Type",         IntegerType, true),
    StructField("PetalWidth",   IntegerType, true),
    StructField("PetalLength",  IntegerType, true),
    StructField("SepalWidth",   IntegerType, true),
    StructField("SepalLength",  IntegerType, true)
    ))

B.或者,使用case classEncoders创建模式(此方法不太冗长):

import org.apache.spark.sql.Encoders

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int, 
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

一旦创建了模式,就可以使用spark.read读入TSV文件。请注意,只要正确设置了option("delimiter", d)选项,实际上也可以读取逗号分隔值(CSV)文件或任何分隔文件。此外,如果您的数据文件具有标题行,请确保设置option("header", "true")
以下是完整的最终代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders

val spark = SparkSession.builder().getOrCreate()

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

var irisDf = spark.read.format("csv").     // Use "csv" regardless of TSV or CSV.
                option("header", "true").  // Does the file have a header line?
                option("delimiter", "\t"). // Set delimiter to tab or comma.
                schema(irisSchema).        // Schema that was built above.
                load("iris.tsv")

irisDf.show(5)

下面是输出结果:

scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
|   0|         2|         14|        33|         50|
|   1|        24|         56|        31|         67|
|   1|        23|         51|        31|         69|
|   0|         2|         10|        36|         46|
|   1|        20|         52|        30|         65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows

相关问题