检查pysparkDataframe是否为空导致内存问题

w9apscun  于 2021-07-29  发布在  Java
关注(0)|答案(1)|浏览(464)

我有一张有10亿条记录的表。我对它运行一个查询,基本上是查找重复项。如果查询结果为0行,则不存在重复项,否则存在重复项。如果有重复的,我想把表名写到一个文本文件中。所以我要做的是

df = spark.sql("SELECT count(*) FROM table GROUP BY key1,key2,key3 HAVING count(*) > 1 LIMIT 1)
if df.count() > 0:
    with open('duplicate_tables.txt','a') as file:
        file.write('\n' + table)

df.count() 行,我得到一个错误,比如 java.io.IOException: No space left on device . 这是因为 count() 效率低下。当我尝试使用

if len(df.head(1)) != 0:

在我的查询中,我认为(希望)添加limit 1会有所帮助,这样它就不必遍历成百上千行,只需检查它是否为空。如果我去掉计数部分,就可以了。
我已经看到了一些重写count语句的方法(我已经讨论了如何检查spark dataframe是否为空?),但是到目前为止我还没有任何运气。

wkyowqbh

wkyowqbh1#

斯帕克很懒。也就是说,当你跑的时候 spark.sql() 实际上什么都没发生。你可以通过注意到 spark.sql() 无论sql的复杂性如何,都会立即“执行”。实际处理是在需要动作时进行的;你的情况是什么时候 .count() 开始发挥作用。由于sql的复杂性和表的大小,后者可能会导致内存问题。
也许你可以尝试的另一件事是阅读整个表格,让spark检查是否有重复的。但是,考虑到表的原始大小,这也可能导致内存问题。

df = spark.sql("SELECT * FROM table") # or select particular column(s)
if df.count() != df.dropDuplicates().count():
    with open('duplicate_tables.txt','a') as file:
        file.write('\n' + table)

相关问题