我有一个spark代码,它从配置文件中添加列到一个框架中,最后只选择配置文件中现有的列来创建一个新的框架。
当配置文件中的字段少于2000个时,脚本运行良好。然而,当我在配置文件中添加2000列时,这使得它类似于4000列,我有一个java.lang.StackOverflowError。在调试时,我可以看到for循环和withcolumn函数是java.lang.StackOverflowError的原因。
下面是代码行:
val configuration_file = spark.textFile("s3://fields.txt").map(cols=>cols.split(";").take(1)).flatMap(x=>x).collect().toList
var df_1 = df.toDF().cache()
for(i<- configuration_file) {
if(!df_1.schema.fieldNames.contains(i)){
df_1 = df_1.withColumn(i, lit(null).cast(StringType))
}
}
val df_2 = df_1.select( configuration_file.head, configuration_file.tail: _*)
有谁知道我如何优化它吗?我想for循环是造成麻烦的原因。我有一个有10个工作线程的集群,我已经增加了JVM内存
spark.driver.extraJavaOptions=-Xss100M --conf spark.executor.extraJavaOptions=-Xss100M
1条答案
按热度按时间e5nszbig1#
试试这个。这段代码读取文件,拆分行,并以更有效的方式直接获取第一列