scala 无法使用Apache Spark读取AWS Glue中的json文件

y0u0uwnf  于 2023-01-30  发布在  Scala
关注(0)|答案(1)|浏览(196)
    • bounty将在3天后过期**。回答此问题可获得+50声望奖励。RudyVerboven希望引起更多人对此问题的关注:我真的被困在这一点上,完全没有选择。

对于我们的用例,我们需要从S3 bucket加载json文件。作为处理工具,我们使用AWS Glue。但是因为我们很快就要迁移到Amazon EMR,我们已经开发了只带有Spark功能的Glue作业。这样将来的迁移会更容易。这意味着对于我们的用例,我们不能使用任何Glue功能,如grouping input files
我们面临的问题是,当我们读入这些JSON文件时,我们看到驱动程序的内存将达到100%,直到最终在OOM异常中作业失败。
我们已经尝试通过使用G.2X示例并向Glue作业添加--conf spark.driver.memory=20g参数来最大化驱动程序内存。
我们使用的代码非常简单:

spark.read.option("inferSchema", value = true).json("s3://bucket_with_json/sub_folder")

输入数据为21个json文件,大小为100MB,文件本身不是有效的json对象,但每个文件包含多个json对象,例如:

{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
}
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}

(not真实数据集)
我们目前使用的涂胶作业规格:

  • 工人类型:G.2X
  • 工人人数:20
  • 其他Spark参数:'--conf': 'spark.driver.maxResultSize=2g --conf spark.yarn.executor.memory=7g --conf spark.driver.memory=20g'
  • 工作语言:阶
  • 胶水版本:3.0

此图像显示内存如何超过驱动程序的最大值以及执行器的内存:

在+-10分钟后,我们得到的错误是:
由于内存不足,命令失败
java.lang.OutOfMemoryError:Java堆空间-XX:OnOutOfMemory错误="kill -9% p"正在执行/bin/sh-c "kill -9 8 ..."
另外值得注意的是,当我们在一个较小的数据集上运行时,一切都很好。
在这一点上我有点束手无策。有人能帮我解决这个问题或者给我指出正确的方向吗?还有,如果有人能解释为什么我的驱动程序会溢出。我一直认为json文件是由执行器读取的。在我读取数据后,我没有收集任何数据到驱动程序,所以我不能解释为什么会发生这种情况。

    • 编辑**

我尝试将输入文件转换为一个有效的json。因此转换为格式:

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

并使用选项:

.option("multiline", "true")

但不幸的是,这给了我同样的结果/错误。

编辑

我想补充的是,上面的数据示例及其结构与我正在使用的数据并不相似。为了给您一些关于我的数据的信息:
这个结构是非常嵌套的。它包含25个顶级字段。其中7个是嵌套的。如果你把所有的字段都展开,你最终会得到+-200个字段。有可能inferSchema选项是我的问题的原因。

aamkag61

aamkag611#

我认为设置inferSchema == true可能是问题所在,因为它是在驱动程序中执行的。那么为什么要在阅读时推断模式(需要额外的数据传递,需要更多的驱动程序资源)呢?也许在这个玩具示例中找不到原因,但也许你可以尝试一下?
首先...你的第二个文件格式工作正常(第一个没有)...我创建了几个这样的文件,并把它们都粘在S3上的一个文件夹中。

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

我尝试的另一种方法是在阅读时自己提供模式。

import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }

val schema = {
    new StructType()
      .add(StructField("RecordNumber", IntegerType, false))
      .add(StructField("Zipcode", IntegerType, true))
      .add(StructField("ZipCodeType", StringType, true))
      .add(StructField("City", StringType, true))
      .add(StructField("State", StringType, true))
  }

val df = spark.read.option("multiline", value = true).schema(schema).json("s3://bucket/path/")

还有一件事要尝试......只是跳过在阅读时推断模式。我不知道下面的代码是否以同样的方式使用驱动程序资源,但我似乎记得它可能使用了行的一个小子集。

val df = spark.read.option("multiline", value = true).json("s3://bucket/path/")

val schema = df.schema
schema.printTreeString

df.printSchema

EDIT-回应指出上述内容不好的评论

最后一件要尝试的事情...在这里,我只是想让司机出来的混合,所以我做以下...
1.以纯文本形式阅读,JSON记录跨多行
1.使用.mapPartitions迭代每个分区,并将拆分为多行的JSON合并为每个JSON字符串一条记录
1.最后......使用您最喜欢的解析器解析为JSON(我使用json 4s没有特别的原因)
如果在此之后,您仍然遇到内存错误,它应该在执行器上,在那里您有更多的选择。
当然,如果你想让Spark自动地把它读入一个200列的 Dataframe ,也许你需要一个更大的驱动程序。
这里有一个函数可以遍历文本行,并尝试将每行合并成一条记录,这个函数在玩具例子中是有效的,但是你可能需要做一些更聪明的事情。
.mapPartitions将每个分区视为一个迭代器......因此,您需要为它提供一个Iterator[A] => Iterator[B]类型的函数,在本例中,该函数只是一个.foldLeft,它使用regex来判断它是否是记录的结尾。

import org.apache.spark.rdd.RDD // RDD because that's what I use; probably similar on dataframes
import org.json4s._             // json4s for no particular reason
import org.json4s.jackson.JsonMethods._

/** `RDD.mapPartitions` treats each partition as an iterator
  * so use a .foldLeft on the partition and a little regex
  * merge multiple lines into one
  * 
  * probably need something a smarter for more nested JSON
  */
val mergeJsonRecords: (Iterator[String] => Iterator[String]) = (oneRawPartition) => {
  // val patternStart = "^\\[?\\{".r
  val patternEnd = "(.*?\\})[,\\]]?$".r // end of JSON record
  oneRawPartition
    .foldLeft(List[String]())((list, next) => list match {
      case Nil => List(next.trim.drop(1))
      case x :: Nil => { 
        x.trim match {
          case patternEnd(e) => List(next.trim, e)
          case _ => List(x + next.trim)
        }
      }
      case x :: xs => {
        x.trim match {
          case patternEnd(e) => next.trim :: e :: xs
          case _ => x.trim + next.trim :: xs
        }
      }
    })
    .map { case patternEnd(e) => e; case x => x } // lame way to clean up last JSON in each partitions
    .iterator
}

这里只是读取数据...合并行...然后解析。同样,我使用RDD是因为它是我通常使用的,但我相信如果需要的话,你可以把它保存在 Dataframe 中。

// read JSON in as plain text; each JSON record over multiple lines
val rdd: RDD[String] = spark.read.text("s3://bucket/path/").rdd.map(_.getAs[String](0))
rdd.count // 56 rows == 8 records

// one record per JSON object
val rdd2: RDD[String] = rdd.mapPartitions(mergeJsonRecords)
rdd2.collect.foreach(println)
rdd2.count // 8

// parsed JSON
object Parser extends Serializable {
  implicit val formats = DefaultFormats
  val func: (String => JValue) = (s) => parse(s)
}
val rddJson = rdd2.map(Parser.func)

相关问题