scala 在Apache Spark中保存文本文件时出现内存错误

mfuanj7w  于 2023-06-23  发布在  Scala
关注(0)|答案(1)|浏览(178)

我在Apache Spark中有一个dataframe,看起来像这样:

+------------+--------------------+------------+--------------------+
|sourcemmsi_1|       coordinates_1|sourcemmsi_2|       coordinates_2|
+------------+--------------------+------------+--------------------+
|   227705102|(-4.4239483,48.30...|   227580520|(-4.423927,48.301...|
|   227705102|(-4.4239483,48.30...|   227580520|(-4.4240417,48.30...|
|   227705102|(-4.4239483,48.30...|   227580520|(-4.42384,48.3015...|
|   227705102|(-4.4239483,48.30...|   227580520|(-4.4239235,48.30...|
|   227705102|(-4.4239483,48.30...|   227592820|(-4.4238935,48.30...|
|   227705102|(-4.4239483,48.30...|   227113100|(-4.423935,48.301...|
|   227306100|(-4.4624915,48.34...|   227113100|(-4.462385,48.349...|
|   227306100|(-4.4624915,48.34...|   227590030|(-4.462485,48.348...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.51476,48.3693...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.5147367,48.36...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.5148015,48.36...|
|   259019000|(-4.514735,48.369...|   228064900|(-4.5147567,48.36...|
|   259019000|(-4.514735,48.369...|   228762000|(-4.514828,48.369...|
|   259019000|(-4.514735,48.369...|   228762000|(-4.514692,48.369...|
|   259019000|(-4.514735,48.369...|   228762000|(-4.5148416,48.36...|
|   259019000|(-4.514735,48.369...|   228762000|(-4.5148416,48.36...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.5147867,48.36...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.51473,48.3694...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.51474,48.3694...|
|   259019000|(-4.514735,48.369...|   205204000|(-4.514735,48.369...|
+------------+--------------------+------------+--------------------+

sourcemmsi_1和sourcemmsi_2是整数,而coordinates_1和coordinates_2是字符串(基于命令printSchema())。我试图实现的是创建一个文本文件,其中的每一行看起来像这样:

{227705102, {(-4.4239483,48.30, 43.43), ... }, 227580520 (-4.423927,48.301,12.54) ... }
{227705102, {(-4.4239483,48.30, 43.43), ... }, 227592820 (-4.4238935,48.30,12.54) ... }

我基本上想要的是基于sourcemmsi_1sourcemmsi_2的数据分组,然后创建一个具有上述格式的文本文件。但是,我遇到了一个内存错误(java.lang.OutOfMemoryError:Java堆空间并不能达到我想要的结果。有没有可能实现它而不遇到任何类型的这种错误?
P.S.下面的代码是我现在的代码:

val df_1 = grouped_df.withColumn("coordinates_1", concat(lit("("), grouped_df("coord_1"), lit(")"))).withColumn("coordinates_2", concat(lit("("), grouped_df("coord_2"), lit(")")))
val parenthesis_df = df_1.select("sourcemmsi_1", "coordinates_1", "sourcemmsi_2", "coordinates_2")
val structed_df = parenthesis_df.groupBy("sourcemmsi_1", "sourcemmsi_2").agg(collect_list("coordinates_1").as("coordinates_1"),collect_list("coordinates_2").as("coordinates_2"))
val expressed_df = structed_df.selectExpr("concat('{', sourcemmsi_1, ',', concat_ws(',', coordinates_1),'}', '{', sourcemmsi_2, ',', concat_ws(',', coordinates_2),'}') as data")
expressed_df.coalesce(1).write.text("/detected_stops_pairs")
3pmvbmvn

3pmvbmvn1#

在构建sparkSession时,请尝试根据您的计算机或集群RAM配置offHeap、执行器和驱动程序内存。

val spark = SparkSession
 .builder()
 .master("")
 .config("spark.memory.offHeap.enabled",true)
 .config("spark.memory.offHeap.size","x") 
 .config("spark.executor.memory", "y")
 .config("spark.driver.memory", "z")  
 .getOrCreate()

操纵x,y和z直到它适合你。
它也可以增加分区的数量,持久化你的dataFrame等。
请查看我的answer,了解更多关于将数据持久化到不同存储级别的详细信息。

相关问题