我试图在s3中用pyspark中的一个文件名写一个数组,我可以在scala中这样做,但在py中给出了下面的错误

brccelvz  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(151)

我的pyspark代码尝试创建一个框架,并将框架写入s3位置。完成此操作后,我将有一个文件名为part-*,我试图使用hadoop file util libraries重命名此文件,但总是得到以下错误。此操作在pyspark中可能吗?注意:我不能在这里使用boto 3,因为我将在EMR上运行此操作。

我在pyspark中使用的代码如下:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test_rename").getOrCreate()
sc = spark.sparkContext

l = [['Column1', 'Column2', 'Column3'], ['Col1Value1', 'Col2Value1', 'Col3Value1'], ['Col1Value2', 'Col2Value2', 'Col3Value2']]
#Read the string data into a DataFrame
df = spark.createDataFrame(l[1:],l[0])

df.coalesce(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("s3://<bucket>/<prefix>")

from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('s3://<bucket>/<prefix>/' + file), sc._jvm.Path('mydata.csv'))
fs.delete(sc._jvm.Path('s3://<bucket>/<prefix>'), True)

字符串

错误信息:

File "/mnt/tmp/spark-471166fb-d7c7-4839-a308-2e3f5c01c185/test_rename.py", line 20, in <module>
    file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS: s3://<bucket>/<prefix>, expected: hdfs://<emr-ip>:8020```

vngu2lb8

vngu2lb81#

Hadoop的FileSystem需要一个URI来确定客户端实现。您可以在FileSystem对象的初始化过程中使用uri构造函数参数或fs.defaultFS配置参数来提供它。

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.java.net.URI("s3://bucket/"), spark._jsc.hadoopConfiguration())
fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path("s3://bucket/prefix/"))

// Or
spark._jsc.hadoopConfiguration().set("fs.defaultFS", "s3://bucket/")
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

字符串
Python和Scala都是一样的;你实际上是在使用Py4J访问相同的Java对象。如果相同的代码在Scala中工作,那么你在Hadoop配置中可能有不同的fs.defaultFS值。
请参阅FileSystem的源代码和fs.defaultFSdescription
默认文件系统的名称。一个URI,其方案和权限决定文件系统实现。URI的方案决定命名文件系统实现类的配置属性(fs.SCHEME.impl)。URI的权限用于确定文件系统的主机、端口等。

相关问题