为什么pyspark程序中没有缓存Dataframe?

de90aj5v  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(244)

我正在尝试缓存Dataframe,然后在awss3 bucket中运行写操作。我的pyspark工作在aws bucket中以Parquet文件格式写入Dataframe。由于我的数据源是cassandra,我已经完成了缓存,并考虑到内存中有Dataframe,当我在不同的s3中写入相同的Dataframe时,它不应该击中cassandra来再次读取数据。
下面是代码:
1.从cassandra读取的数据:

data = spark_session.read \
        .format('org.apache.spark.sql.cassandra') \
        .options(table=table, keyspace=keyspace) \
        .load()

2.在数据框中创建md5列:

df_wth_json_col = data \
        .withColumn('json_data', f.to_json(f.struct([data[x] for x in data.columns])))

data_md5=  df_wth_json_col.withColumn("hash", md5('json_data')).drop('json_data')

3.缓存Dataframe:

data_md5.cache()

4.count()已执行,以便Pypark操作触发器将数据加载到内存中:

data_md5.count()

5.将数据写入两个s3存储桶:

data_md5.write.parquet(bucket_1,mode="overwrite")
data_md5.write.parquet(bucket_2,mode="overwrite")

以上两种操作都是用同样的时间将数据写入s3,我使用缓存的目的是当spark作业再次将数据写入另一个s3存储桶时,所以不应该查看cassandra的源代码,而应该将内存中的数据写入aws s3存储桶。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题