spark sftp库在emr中运行时无法从sftp服务器下载文件

50pmv0ei  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(531)

我正在spark作业中使用com.springml.spark.sftp从sftp服务器下载文件。基本代码如下。

val sftpDF = spark.read.
            schema(my_schema).
            format("com.springml.spark.sftp").
            option("host", "myhost.test.com").
            option("username", "myusername").
            option("password", "mypassword").
            option("inferSchema", "false").
            option("fileType", "csv").
            option("delimiter", ",").
            option("codec", "org.apache.hadoop.io.compress.GzipCodec").
            load("/data/test.csv.gz")

当我使用“spark submit spark.jar”在本地机器上运行它时,它运行得很好。但是,当我尝试在emr中运行它时,它显示了以下错误。似乎spark job试图在hdfs而不是sftp中查找文件。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://ip-10-61-82-166.ap-southeast-2.compute.internal:8020/tmp/test.csv.gz;
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at com.springml.spark.sftp.DatasetRelation.read(DatasetRelation.scala:44)
        at com.springml.spark.sftp.DatasetRelation.<init>(DatasetRelation.scala:29)
        at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:316)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at com.example.App$.main(App.scala:134)
        at com.example.App.main(App.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

有什么问题吗?我需要为sftp模块注册一些数据源吗?
谢谢!

mklgxw1f

mklgxw1f1#

好吧,我找到原因了。使用了该sftp库的1.1.0版本。它将文件下载到驱动程序的文件夹而不是执行器。这就是我在集群模式而不是独立模式下运行时生成错误的原因。我也注意到同样的问题。https://github.com/springml/spark-sftp/issues/24. 将sftp库的版本从1.1.0升级到1.1.3后,问题得到了解决。

相关问题