我有一个spark应用程序(2.4.5),使用kafka作为源代码,使用大批量窗口(5分钟),在我们的应用程序中,我们只关心从特定时间间隔到处理数据的rdd。
我们的应用程序有时会崩溃,驱动程序(以客户机模式运行)上会出现outofmemory异常,执行程序上会出现gc outofmemory异常。经过大量的研究,我们似乎没有正确地处理状态,这导致了血统无限增长。我们考虑通过使用批处理方法来解决这个问题,在批处理方法中,我们控制从kafka获取的偏移量并从中创建rdd(这将截断沿袭),或者启用检查点。
在调查过程中,有人发现了一个不太类似的问题,通过调整一些ui参数(Yarn堆使用率随时间增长)得到了解决:
spark.ui.retainedjobs=50
spark.ui.retainedstages=50
spark.ui.retainedtasks=500
spark.worker.ui.retainedexecutors=50
spark.worker.ui.retaineddrivers=50
spark.sql.ui.retainedexecutions=50
spark.streaming.ui.retainedbatches=50
因为这些都是ui参数,所以它们不会影响应用程序的内存使用,除非它们影响应用程序存储发送到ui的信息的方式。早期的测试表明,应用程序确实运行的时间更长,没有oom问题。
有人能解释一下这些参数对应用程序有什么影响吗?它们真的会影响应用程序的内存使用吗?有没有其他参数,我应该研究,以获得整个画面(我想知道是否有一个“因素”参数,需要调整,以便内存分配是适合我们的情况)?
谢谢您
1条答案
按热度按时间wmvff8tz1#
经过大量测试后,我们的团队设法将问题缩小到这个特定参数:
spark.sql.ui.retainedexecutions
我决定深入研究,所以我下载了spark的代码。我发现有关解析的逻辑计划的信息不仅保存在应用程序的内存中,而且还受此参数控制。
创建sparksession会话时,示例化的许多对象之一是sqlappstatuslistener。此类实现两个方法:
onexecutionstart—每次执行时,都会创建一个新的sparkplangraphwrapper,它将保存对已解析逻辑计划的引用,并将其添加到sharedstate对象中,在本例中,该对象跟踪创建了多少个对象示例。
cleanupexecution-如果存储对象的数目大于spark.sql.ui.retainedexecutions的值(默认值为1000),则从sharedstate对象中删除sparkplangraphwrapper。
具体地说,在我们的例子中,逻辑计划占用4mb内存,因此以一种简单的方式,我们必须分配4gb内存来容纳保留的执行。