我想做的和下面的Pyspark脚本一样,但是在Scala中:
source_rdd = spark.sparkContext.textFile(sourcePath)
.zipWithIndex()
.filter(lambda x: x[1] > int(n_skip_rows))
.map(lambda x: x[0])
df = spark.read.csv(source_rdd, header=True, multiLine=True, quote='\"', escape='\"', ignoreTrailingWhiteSpace=True, ignoreLeadingWhiteSpace=True)
我尝试过这种方法,但是在执行spark.read.csv时,我得到了重载方法:
val rdd = spark.read.textFile(sourcePath).rdd
.zipWithIndex() // get tuples (line, Index)
.filter({case (line, index) => index > numberOfLinesToSkip.toInt})
.map({case (line, index) => index}) //get rid of index
val ds = spark.createDataset(rdd) //convert rdd to dataset*/
var df = spark.read.csv(ds, header=true, ignoreTrailingWhiteSpace=true, ignoreLeadingWhiteSpace=true)
错误堆栈跟踪如下:
(paths: String*)org.apache.spark.sql.DataFrame <and>
(csvDataset: org.apache.spark.sql.Dataset[String])org.apache.spark.sql.DataFrame <and>
(path: String)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.sql.Dataset[Long], header: Boolean, quote: String, escape: String, ignoreTrailingWhiteSpace: Boolean, ignoreLeadingWhiteSpace: Boolean)
var df = spark.read.csv(ds, header=true, quote="\\", escape="\\" , ignoreTrailingWhiteSpace=true, ignoreLeadingWhiteSpace=true)``
1条答案
按热度按时间avwztpqn1#
这个问题是由Spark和PySpark API的细微差别引起的。请看,当你使用PySpark时,你传递的选项是
**kwargs
,然后它被解释为阅读数据集的选项。但是在Scala的Spark API中,你应该使用选项来完成这一操作:stacktrace也是这么说的,它说
csv
方法的唯一选项(重载)是: