我的pyspark代码尝试创建一个框架,并将框架写入s3位置。完成此操作后,我将有一个文件名为part-*,我试图使用hadoop file util libraries重命名此文件,但总是得到以下错误。此操作在pyspark中可能吗?注意:我不能在这里使用boto 3,因为我将在EMR上运行此操作。
我在pyspark中使用的代码如下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test_rename").getOrCreate()
sc = spark.sparkContext
l = [['Column1', 'Column2', 'Column3'], ['Col1Value1', 'Col2Value1', 'Col3Value1'], ['Col1Value2', 'Col2Value2', 'Col3Value2']]
#Read the string data into a DataFrame
df = spark.createDataFrame(l[1:],l[0])
df.coalesce(1) \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.mode("overwrite") \
.save("s3://<bucket>/<prefix>")
from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('s3://<bucket>/<prefix>/' + file), sc._jvm.Path('mydata.csv'))
fs.delete(sc._jvm.Path('s3://<bucket>/<prefix>'), True)
字符串
错误信息:
File "/mnt/tmp/spark-471166fb-d7c7-4839-a308-2e3f5c01c185/test_rename.py", line 20, in <module>
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS: s3://<bucket>/<prefix>, expected: hdfs://<emr-ip>:8020```
型
1条答案
按热度按时间vngu2lb81#
Hadoop的FileSystem需要一个URI来确定客户端实现。您可以在
FileSystem
对象的初始化过程中使用uri
构造函数参数或fs.defaultFS
配置参数来提供它。字符串
Python和Scala都是一样的;你实际上是在使用Py4J访问相同的Java对象。如果相同的代码在Scala中工作,那么你在Hadoop配置中可能有不同的
fs.defaultFS
值。请参阅
FileSystem
的源代码和fs.defaultFS
description。默认文件系统的名称。一个URI,其方案和权限决定文件系统实现。URI的方案决定命名文件系统实现类的配置属性(fs.SCHEME.impl)。URI的权限用于确定文件系统的主机、端口等。