我们有以下CSV文件:
RWA
Country of exposure
Credit risk asset class
Projection period
Scenario
RWA
第一行的RWA是header。最后一行等于header,但它不是header。当CSV文件内容从Dataset[String]加载时,如
import spark.implicits._
val source: Array[String] = (
"RWA\n" +
"Country of exposure\n" +
"Credit risk asset class\n" +
"Projection period\n" +
"Scenario\n" +
"RWA"
).split("\n")
val csvData: Dataset[String] = spark.sparkContext.parallelize(source).toDS()
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", value = true).csv(csvData)
// df.count() == 4 unexpectedly
df.count()应为5,但实际为4。
如果从文件加载相同的CSV,则不会发生此问题:
val tempFile = Files.createTempFile("tmp", ".csv")
val res = "RWA\n" +
"Country of exposure\n" +
"Credit risk asset class\n" +
"Projection period\n" +
"Scenario\n" +
"RWA"
Files.writeString(tempFile, res)
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", value = true)
.csv(tempFile.toString)
// df.count() == 5 as expected
从Dataset加载时,是否有方法告诉Spark行为相同?
Scala版本:2.12.14
Spark版本:3.0.3
2条答案
按热度按时间ny6fqffe1#
正是这个函数过滤了最后一行:
org.apache.spark.sql.execution.datasources.csv.CSVUtils#filterHeaderLine
如果您将
header
选项设置为true
,则此函数将删除与第一行(标题)相等的所有行。你可能想知道为什么?!使用此脚本保存 Dataframe ,并调查文件:
df.repartition(2).write.option("header", true).csv("output.csv")
现在,尝试读取
output.csv
文件。这种行为背后的原因是,当Spark想要读取csv文件时,可能会有多个文件,因此可能会有多个头行!khbbv19g2#
这是一个非常有趣的行为,我无法解释
现在,我不知道你的真实的用例,但是如果你有一个Array[String]要转换为RDD,使用第一行作为头,我认为你不应该这样做(转换为RDD[String]并使用spark.read.csv(...))
像这样的人应该工作: