如何使Spark在从Dataset[String]加载时不会丢失CSV文件的最后一行?

eivnm1vs  于 2023-03-27  发布在  Spark
关注(0)|答案(2)|浏览(127)

我们有以下CSV文件:

RWA
Country of exposure
Credit risk asset class
Projection period
Scenario
RWA

第一行的RWA是header。最后一行等于header,但它不是header。当CSV文件内容从Dataset[String]加载时,如

import spark.implicits._
val source: Array[String] = (
      "RWA\n" +
      "Country of exposure\n" +
      "Credit risk asset class\n" +
      "Projection period\n" +
      "Scenario\n" +
      "RWA"
      ).split("\n")
val csvData: Dataset[String] = spark.sparkContext.parallelize(source).toDS()
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header", value = true).csv(csvData)
// df.count() == 4 unexpectedly

df.count()应为5,但实际为4。
如果从文件加载相同的CSV,则不会发生此问题:

val tempFile = Files.createTempFile("tmp", ".csv")
val res = "RWA\n" +
      "Country of exposure\n" +
      "Credit risk asset class\n" +
      "Projection period\n" +
      "Scenario\n" +
      "RWA"
Files.writeString(tempFile, res)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header", value = true)
  .csv(tempFile.toString)
// df.count() == 5 as expected

从Dataset加载时,是否有方法告诉Spark行为相同?

Scala版本:2.12.14
Spark版本:3.0.3

ny6fqffe

ny6fqffe1#

正是这个函数过滤了最后一行:org.apache.spark.sql.execution.datasources.csv.CSVUtils#filterHeaderLine
如果您将header选项设置为true,则此函数将删除与第一行(标题)相等的所有行。
你可能想知道为什么?!使用此脚本保存 Dataframe ,并调查文件:df.repartition(2).write.option("header", true).csv("output.csv")
现在,尝试读取output.csv文件。这种行为背后的原因是,当Spark想要读取csv文件时,可能会有多个文件,因此可能会有多个头行!

khbbv19g

khbbv19g2#

这是一个非常有趣的行为,我无法解释
现在,我不知道你的真实的用例,但是如果你有一个Array[String]要转换为RDD,使用第一行作为头,我认为你不应该这样做(转换为RDD[String]并使用spark.read.csv(...))
像这样的人应该工作:

import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val header = source(0).split(",")
val schema = StructType(header.map(f => StructField(f, StringType, nullable = false))))
val data = source.tail.map(line => Row(line.split(","):_*)).toSeq

spark.createDataset(data)(RowEncoder(schema)).show

相关问题