如何在emr上使用spark高效地读取/解析s3文件夹中的.gz文件

bgibtngc  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(427)

我正在尝试通过一个在emr上执行的spark应用程序读取s3目录中的所有文件。
数据以“s3a://some/path/yyyy/mm/dd/hh/blah.gz”这样的典型格式存储
如果我使用深度嵌套的通配符(例如“s3a://somebucket/somefolder///*.gz”),那么性能很差,读取几万个小的gzip json文件需要大约40分钟。这是可行的,但是浪费40分钟来测试一些代码是非常糟糕的。
我的研究告诉我还有两种方法更有效。
使用hadoop.fs库(2.8.5),我尝试读取我提供的每个文件路径。

private def getEventDataHadoop(
    eventsFilePaths: RDD[String]
  )(implicit sqlContext: SQLContext): Try[RDD[String]] =
    Try(
      {
        val conf = sqlContext.sparkContext.hadoopConfiguration

        eventsFilePaths.map(eventsFilePath => {
          val p                            = new Path(eventsFilePath)
          val fs                           = p.getFileSystem(conf)
          val eventData: FSDataInputStream = fs.open(p)
          IOUtils.toString(eventData)
        })
      }
    )

这些文件路径由以下代码生成:

private[disneystreaming] def generateInputBucketPaths(
    s3Protocol: String,
    bucketName: String,
    service: String,
    region: String,
    yearsMonths: Map[String, Set[String]]
  ): Try[Set[String]] =
    Try(
      {
        val days                         = 1 to 31
        val hours                        = 0 to 23
        val dateFormatter: Int => String = buildDateFormat("00")

        yearsMonths.flatMap { yearMonth: (String, Set[String]) =>
          for {
            month: String <- yearMonth._2
            day: Int      <- days
            hour: Int     <- hours
          } yield
            s"$s3Protocol$bucketName/$service/$region/${dateFormatter(yearMonth._1.toInt)}/${dateFormatter(month.toInt)}/" +
              s"${dateFormatter(day)}/${dateFormatter(hour)}/*.gz"
        }.toSet
      }
    )

hadoop.fs代码失败,因为路径类不可序列化。我想不出我怎么能避开那件事。
因此,我使用amazons3client找到了另一种方法,我只要求客户端提供一个文件夹(或前缀)中的所有文件路径,然后将文件解析为一个字符串,这可能会因为压缩而失败:

private def getEventDataS3(bucketName: String, prefix: String)(
    implicit sqlContext: SQLContext
  ): Try[RDD[String]] =
    Try(
      {
        import com.amazonaws.services.s3._, model._
        import scala.collection.JavaConverters._

        val request = new ListObjectsRequest()
        request.setBucketName(bucketName)
        request.setPrefix(prefix)
        request.setMaxKeys(Integer.MAX_VALUE)
        val s3 = new AmazonS3Client(new ProfileCredentialsProvider("default"))

        val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
        sqlContext.sparkContext
          .parallelize(objs.getObjectSummaries.asScala.map(_.getKey).toList)
          .flatMap { key =>
            Source
              .fromInputStream(s3.getObject(bucketName, key).getObjectContent: InputStream)
              .getLines()
          }
      }
    )

由于概要文件不能为null(“java.lang.illegalargumentexception:profile file cannot be null”),因此此代码生成null异常。请记住,此代码是在aws内的emr上运行的,因此如何提供它所需的凭据?其他人是如何使用这个客户端在emr上运行spark作业的?
我们非常感谢您对这些方法的任何帮助。

tquggr8v

tquggr8v1#

路径在以后的hadoop版本中是可以序列化的,因为它可以在spark rdd中使用。在此之前,将该路径转换为uri,然后在闭包中从该uri创建一个新路径。

相关问题