IllegalArgumentException,从s3而不是hdfs指定输入/输出时出现错误FS

i7uaboj4  于 2022-12-09  发布在  HDFS
关注(0)|答案(4)|浏览(209)

我一直在本地集群上运行Spark作业,本地集群有hdfs,从那里读取输入,也写入输出。现在我已经设置了AWS EMR和S3 bucket,在那里我有我的输入,我希望我的输出也写入S3。
错误:
用户类抛出异常:异常错误:错误的FS:s3://something/输入,应为://IP-一些数字.eu-west-1.计算.内部:8020
我试着搜索相同的问题,有几个关于这个问题的问题。有些人建议它只是输出,但即使我禁用输出,我得到同样的错误。
另一个建议是我的代码中的FileSystem有问题。下面是我的程序中出现的所有输入/输出:
第一次出现在我的自定义FileInputFormat中,在getSplits(JobContext job)中,我实际上没有修改自己,但我可以:

FileSystem fs = path.getFileSystem(job.getConfiguration());

类似的情况在我自定义的RecordReader中也有,我自己也没有修改过:

final FileSystem fs = file.getFileSystem(job);

在我自己编写的自定义RecordReadernextKeyValue()中,我用途:

FileSystem fs = FileSystem.get(jc);

最后,当我想检测我使用的文件夹中的文件数量时:

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))

我假设问题出在我的代码上,但是我如何修改FileSystem调用以支持来自S3的输入/输出呢?

jchrr9hc

jchrr9hc1#

这是我在EMR上启动spark-job时为解决此问题所做的工作:

val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)

请确保使用存储桶的名称替换s3_bucket
我希望这对那些

kt06eoxx

kt06eoxx2#

hadoop文件系统api不提供对S3的开箱即用支持。有两种hadoop文件系统api的S3实现:S3 A和S3 N。S3 A似乎是首选的实现。要使用它,你必须做几件事:
1.将aws-java-sdk-bundle.jar添加到类路径中。
1.创建文件系统时,请在文件系统的配置中包括以下属性的值:

fs.s3a.access.key
fs.s3a.secret.key

1.在S3上指定路径时,不要使用s3://,而应使用s3a://

**注意:**创建一个简单的用户,先尝试基本的身份验证。可以让它与AWS更高级的临时凭证机制一起工作,但这有点复杂,我不得不对FileSystem代码做一些修改,以便让它在我尝试时工作。

信息来源为here

inkz8wg9

inkz8wg93#

尝试设置文件系统的默认URI:

FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s"s3a://$s3bucket"))

使用指定密钥和机密后

fs.s3a.access.key
fs.s3a.secret.key

并获取文件系统,如下所示:

val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)

我仍然会收到错误
java.lang.IllegalArgumentException: Wrong FS: s3a:// ... , expected: file:///
要检查默认文件系统,可以查看上面创建的hdfs FileSystem:hadoopfs.getUri对我来说仍然返回file:///
为了使其正确工作,* 在 * 运行FileSystem.get之前,设置文件系统的默认URI。

val s3URI = s"s3a://$s3bucket"
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s3URI))

val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
o8x7eapl

o8x7eapl4#

EMR被配置为避免在代码或作业配置中使用键。问题是在示例中如何创建FileSystem。
Hadoop创建的默认文件系统是用于hdfs模式的文件系统。
因此,如果path模式为s3://,则下一个代码将不起作用。

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))

要创建正确的文件系统,您需要将路径与要使用的架构一起使用。例如,类似以下内容:

val conf = sc.hadoopConfiguration
val pObj = new Path(path)
val status = pObj.getFileSystem(conf).listStatus(pObj)

从Hadoop代码中:
在FileSystem.get中的实现

public static FileSystem get(Configuration conf) throws IOException {
      return get(getDefaultUri(conf), conf);
   }

使用Path.getFileSystem实现:

public FileSystem getFileSystem(Configuration conf) throws IOException {
      return FileSystem.get(this.toUri(), conf);
   }

相关问题