来自配置单元查询的持久化pysparkDataframe

anauzrmj  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(242)

我从一个Hive表中得到一些数据:

df = sqlContext.sql('select shubiru, date from thebigtable bt where bt.num > 10 ')
df.show() # here the query is processed and the results shown

这很有效。现在我想对df进行操作,但每次对df执行操作时,它都会再次对配置单元运行查询:

import pyspark.sql.functions as func
from datetime import datetime
from pyspark.sql.types import TimestampType

dt_udt = func.udf(lambda x: datetime.strptime(str(x), '%Y%m%d') if x else None, TimestampType())
df = df.withColumn('fdate', dt_udt(df.date)) 
df.show()  # here the query is run again and the transformation is done

所以我想如果我打电话 persist 在df上,查询将不会再次运行:

df.cache()
df = df.withColumn('fdate', dt_udf(df.date))

但是没有骰子,查询将再次针对配置单元运行,并由udf处理。有没有一种方法可以在内存中缓存查询的结果并在Dataframe上运行操作,而不必每次都命中配置单元?

m0rkklqb

m0rkklqb1#

每当对数据执行操作时,sparksql就会从数据源(在您的案例中是hive)中提取数据。在本例中,您尝试在 cache() 那就没用了。我的建议是

df = df.withColumn('fdate', dt_udf(df.date)).withColumn('date_column_2', dt_udf(df.date)).cache()

此语句之后的所有操作都将对spark中持久化的数据进行操作。但是,缓存大量数据会自动逐出较旧的rdd分区,并且需要返回到配置单元以重新生成丢失的分区。

相关问题