在Scala中读取csv文件并跳过前N行

waxmsbnn  于 2022-12-13  发布在  Scala
关注(0)|答案(1)|浏览(247)

我想做的和下面的Pyspark脚本一样,但是在Scala中:

source_rdd = spark.sparkContext.textFile(sourcePath)
.zipWithIndex()
.filter(lambda x: x[1] > int(n_skip_rows))
.map(lambda x: x[0])

df = spark.read.csv(source_rdd, header=True, multiLine=True, quote='\"', escape='\"', ignoreTrailingWhiteSpace=True, ignoreLeadingWhiteSpace=True)

我尝试过这种方法,但是在执行spark.read.csv时,我得到了重载方法:

val rdd = spark.read.textFile(sourcePath).rdd
      .zipWithIndex() // get tuples (line, Index)
      .filter({case (line, index) => index > numberOfLinesToSkip.toInt})
      .map({case (line, index) => index}) //get rid of index
    val ds = spark.createDataset(rdd) //convert rdd to dataset*/
 
    var df = spark.read.csv(ds, header=true, ignoreTrailingWhiteSpace=true, ignoreLeadingWhiteSpace=true)

错误堆栈跟踪如下:

(paths: String*)org.apache.spark.sql.DataFrame <and>
  (csvDataset: org.apache.spark.sql.Dataset[String])org.apache.spark.sql.DataFrame <and>
  (path: String)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.Dataset[Long], header: Boolean, quote: String, escape: String, ignoreTrailingWhiteSpace: Boolean, ignoreLeadingWhiteSpace: Boolean)
    var df = spark.read.csv(ds, header=true, quote="\\", escape="\\" , ignoreTrailingWhiteSpace=true, ignoreLeadingWhiteSpace=true)``
avwztpqn

avwztpqn1#

这个问题是由Spark和PySpark API的细微差别引起的。请看,当你使用PySpark时,你传递的选项是**kwargs,然后它被解释为阅读数据集的选项。但是在Scala的Spark API中,你应该使用选项来完成这一操作:

val df = spark.read
  .options(Map(
    "header" -> "true",
    "quote" -> "...", // all the other options
  ))
  .csv(rdd)

stacktrace也是这么说的,它说csv方法的唯一选项(重载)是:

  • 提供csv文件的文件路径
  • 或传递数据集
  • 或单个文件路径

相关问题