我正在尝试缓存Dataframe,然后在awss3 bucket中运行写操作。我的pyspark工作在aws bucket中以Parquet文件格式写入Dataframe。由于我的数据源是cassandra,我已经完成了缓存,并考虑到内存中有Dataframe,当我在不同的s3中写入相同的Dataframe时,它不应该击中cassandra来再次读取数据。
下面是代码:
1.从cassandra读取的数据:
data = spark_session.read \
.format('org.apache.spark.sql.cassandra') \
.options(table=table, keyspace=keyspace) \
.load()
2.在数据框中创建md5列:
df_wth_json_col = data \
.withColumn('json_data', f.to_json(f.struct([data[x] for x in data.columns])))
data_md5= df_wth_json_col.withColumn("hash", md5('json_data')).drop('json_data')
3.缓存Dataframe:
data_md5.cache()
4.count()已执行,以便Pypark操作触发器将数据加载到内存中:
data_md5.count()
5.将数据写入两个s3存储桶:
data_md5.write.parquet(bucket_1,mode="overwrite")
data_md5.write.parquet(bucket_2,mode="overwrite")
以上两种操作都是用同样的时间将数据写入s3,我使用缓存的目的是当spark作业再次将数据写入另一个s3存储桶时,所以不应该查看cassandra的源代码,而应该将内存中的数据写入aws s3存储桶。
暂无答案!
目前还没有任何答案,快来回答吧!