我在Apache Spark中有一个dataframe,看起来像这样:
+------------+--------------------+------------+--------------------+
|sourcemmsi_1| coordinates_1|sourcemmsi_2| coordinates_2|
+------------+--------------------+------------+--------------------+
| 227705102|(-4.4239483,48.30...| 227580520|(-4.423927,48.301...|
| 227705102|(-4.4239483,48.30...| 227580520|(-4.4240417,48.30...|
| 227705102|(-4.4239483,48.30...| 227580520|(-4.42384,48.3015...|
| 227705102|(-4.4239483,48.30...| 227580520|(-4.4239235,48.30...|
| 227705102|(-4.4239483,48.30...| 227592820|(-4.4238935,48.30...|
| 227705102|(-4.4239483,48.30...| 227113100|(-4.423935,48.301...|
| 227306100|(-4.4624915,48.34...| 227113100|(-4.462385,48.349...|
| 227306100|(-4.4624915,48.34...| 227590030|(-4.462485,48.348...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.51476,48.3693...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.5147367,48.36...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.5148015,48.36...|
| 259019000|(-4.514735,48.369...| 228064900|(-4.5147567,48.36...|
| 259019000|(-4.514735,48.369...| 228762000|(-4.514828,48.369...|
| 259019000|(-4.514735,48.369...| 228762000|(-4.514692,48.369...|
| 259019000|(-4.514735,48.369...| 228762000|(-4.5148416,48.36...|
| 259019000|(-4.514735,48.369...| 228762000|(-4.5148416,48.36...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.5147867,48.36...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.51473,48.3694...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.51474,48.3694...|
| 259019000|(-4.514735,48.369...| 205204000|(-4.514735,48.369...|
+------------+--------------------+------------+--------------------+
sourcemmsi_1和sourcemmsi_2是整数,而coordinates_1和coordinates_2是字符串(基于命令printSchema()
)。我试图实现的是创建一个文本文件,其中的每一行看起来像这样:
{227705102, {(-4.4239483,48.30, 43.43), ... }, 227580520 (-4.423927,48.301,12.54) ... }
{227705102, {(-4.4239483,48.30, 43.43), ... }, 227592820 (-4.4238935,48.30,12.54) ... }
我基本上想要的是基于sourcemmsi_1
和sourcemmsi_2
的数据分组,然后创建一个具有上述格式的文本文件。但是,我遇到了一个内存错误(java.lang.OutOfMemoryError:Java堆空间并不能达到我想要的结果。有没有可能实现它而不遇到任何类型的这种错误?
P.S.下面的代码是我现在的代码:
val df_1 = grouped_df.withColumn("coordinates_1", concat(lit("("), grouped_df("coord_1"), lit(")"))).withColumn("coordinates_2", concat(lit("("), grouped_df("coord_2"), lit(")")))
val parenthesis_df = df_1.select("sourcemmsi_1", "coordinates_1", "sourcemmsi_2", "coordinates_2")
val structed_df = parenthesis_df.groupBy("sourcemmsi_1", "sourcemmsi_2").agg(collect_list("coordinates_1").as("coordinates_1"),collect_list("coordinates_2").as("coordinates_2"))
val expressed_df = structed_df.selectExpr("concat('{', sourcemmsi_1, ',', concat_ws(',', coordinates_1),'}', '{', sourcemmsi_2, ',', concat_ws(',', coordinates_2),'}') as data")
expressed_df.coalesce(1).write.text("/detected_stops_pairs")
1条答案
按热度按时间3pmvbmvn1#
在构建sparkSession时,请尝试根据您的计算机或集群RAM配置offHeap、执行器和驱动程序内存。
操纵x,y和z直到它适合你。
它也可以增加分区的数量,持久化你的dataFrame等。
请查看我的answer,了解更多关于将数据持久化到不同存储级别的详细信息。