- bounty将在3天后过期**。回答此问题可获得+50声望奖励。RudyVerboven希望引起更多人对此问题的关注:我真的被困在这一点上,完全没有选择。
对于我们的用例,我们需要从S3 bucket加载json文件。作为处理工具,我们使用AWS Glue。但是因为我们很快就要迁移到Amazon EMR,我们已经开发了只带有Spark功能的Glue作业。这样将来的迁移会更容易。这意味着对于我们的用例,我们不能使用任何Glue功能,如grouping input files。
我们面临的问题是,当我们读入这些JSON文件时,我们看到驱动程序的内存将达到100%,直到最终在OOM异常中作业失败。
我们已经尝试通过使用G.2X示例并向Glue作业添加--conf spark.driver.memory=20g
参数来最大化驱动程序内存。
我们使用的代码非常简单:
spark.read.option("inferSchema", value = true).json("s3://bucket_with_json/sub_folder")
输入数据为21个json文件,大小为100MB,文件本身不是有效的json对象,但每个文件包含多个json对象,例如:
{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
}
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}
(not真实数据集)
我们目前使用的涂胶作业规格:
- 工人类型:G.2X
- 工人人数:20
- 其他Spark参数:
'--conf': 'spark.driver.maxResultSize=2g --conf spark.yarn.executor.memory=7g --conf spark.driver.memory=20g'
- 工作语言:阶
- 胶水版本:3.0
此图像显示内存如何超过驱动程序的最大值以及执行器的内存:
在+-10分钟后,我们得到的错误是:
由于内存不足,命令失败
java.lang.OutOfMemoryError:Java堆空间-XX:OnOutOfMemory错误="kill -9% p"正在执行/bin/sh-c "kill -9 8 ..."
另外值得注意的是,当我们在一个较小的数据集上运行时,一切都很好。
在这一点上我有点束手无策。有人能帮我解决这个问题或者给我指出正确的方向吗?还有,如果有人能解释为什么我的驱动程序会溢出。我一直认为json文件是由执行器读取的。在我读取数据后,我没有收集任何数据到驱动程序,所以我不能解释为什么会发生这种情况。
- 编辑**
我尝试将输入文件转换为一个有效的json。因此转换为格式:
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
并使用选项:
.option("multiline", "true")
但不幸的是,这给了我同样的结果/错误。
编辑
我想补充的是,上面的数据示例及其结构与我正在使用的数据并不相似。为了给您一些关于我的数据的信息:
这个结构是非常嵌套的。它包含25个顶级字段。其中7个是嵌套的。如果你把所有的字段都展开,你最终会得到+-200个字段。有可能inferSchema
选项是我的问题的原因。
1条答案
按热度按时间aamkag611#
我认为设置
inferSchema == true
可能是问题所在,因为它是在驱动程序中执行的。那么为什么要在阅读时推断模式(需要额外的数据传递,需要更多的驱动程序资源)呢?也许在这个玩具示例中找不到原因,但也许你可以尝试一下?首先...你的第二个文件格式工作正常(第一个没有)...我创建了几个这样的文件,并把它们都粘在S3上的一个文件夹中。
我尝试的另一种方法是在阅读时自己提供模式。
还有一件事要尝试......只是跳过在阅读时推断模式。我不知道下面的代码是否以同样的方式使用驱动程序资源,但我似乎记得它可能使用了行的一个小子集。
EDIT-回应指出上述内容不好的评论
最后一件要尝试的事情...在这里,我只是想让司机出来的混合,所以我做以下...
1.以纯文本形式阅读,JSON记录跨多行
1.使用
.mapPartitions
迭代每个分区,并将拆分为多行的JSON合并为每个JSON字符串一条记录1.最后......使用您最喜欢的解析器解析为JSON(我使用json 4s没有特别的原因)
如果在此之后,您仍然遇到内存错误,它应该在执行器上,在那里您有更多的选择。
当然,如果你想让Spark自动地把它读入一个200列的 Dataframe ,也许你需要一个更大的驱动程序。
这里有一个函数可以遍历文本行,并尝试将每行合并成一条记录,这个函数在玩具例子中是有效的,但是你可能需要做一些更聪明的事情。
.mapPartitions
将每个分区视为一个迭代器......因此,您需要为它提供一个Iterator[A] => Iterator[B]
类型的函数,在本例中,该函数只是一个.foldLeft
,它使用regex来判断它是否是记录的结尾。这里只是读取数据...合并行...然后解析。同样,我使用RDD是因为它是我通常使用的,但我相信如果需要的话,你可以把它保存在 Dataframe 中。