如何高效地连接和分组pysparkDataframe?

v8wbuo2f  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(224)

我正在使用python pysparkDataframe。我有大约70 gb的json文件,基本上是电子邮件。其目的是对邮件正文执行tf-idf。首先,我将面向记录的json转换为hdfs。对于tf-idf实现,首先我使用spark nlp进行了一些数据清理,然后是基本计算tf、idf和tf-idf,其中包括几个groupby()和join()操作。这个实现在一个小样本数据集上运行得很好,但是当我对整个数据集运行if时,我得到了以下错误:

Py4JJavaError: An error occurred while calling o506.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 107 in stage 7.0 failed 4 times, most recent failure: Lost task 107.3 in stage 7.0 : ExecutorLostFailure (executor 72 exited caused by one of the running tasks) Reason: Container from a bad node: container_1616585444828_0075_01_000082 on host: Exit status: 143. Diagnostics: [2021-04-08 01:02:20.700]Container killed on request. Exit code is 143
[2021-04-08 01:02:20.701]Container exited with a non-zero exit code 143. 
[2021-04-08 01:02:20.702]Killed by external signal

示例代码:

df_1 = df.select(df.id,explode(df.final).alias("words"))
df_1= df_1.filter(length(df_1.words) > '3')
df_2 = df_1.groupBy("id","words").agg(count("id").alias("count_id"))

我在第三步出错,即, df_2 = df_1.groupBy("id","words").agg(count("id").alias("count_id")) 我尝试过的事情:
set("spark.sql.broadcastTimeout", "36000") df_1.coalesce(20) 之前 groupBy() 检查空值->任何数据框中都没有空值
到目前为止什么都没用。由于我是pyspark的新手,我非常感谢您对如何提高实现效率和速度的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题