pyspark 如何从按月分区的Parquet文件中删除特定月份

pxq42qpu  于 2023-03-11  发布在  Spark
关注(0)|答案(3)|浏览(197)

我有过去5年的monthly收入数据,并且我以append模式、partitioned bymonth列、parquet存储各个月份的 Dataframe 。

def Revenue(filename):
    df = spark.read.load(filename)
    .
    .
    df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')

df每月以parquet存储,如下所示-

**问题:**如何删除与特定月份对应的parquet文件夹?

一种方法是在一个大的df中加载所有这些parquet文件,然后使用.where()子句过滤掉那个特定的月份,然后在overwrite模式下将其保存回parquet``partitionBy month,如下所示-

# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')

但是,这种方法相当麻烦。
另一种方法是直接删除那个特定月份的文件夹,但我不确定这是否是一种正确的方法,以免我们以一种不可预见的方式改变metadata
删除特定月份的parquet数据的正确方法是什么?

wpcxdonn

wpcxdonn1#

Spark支持删除分区,包括数据和元数据。
引用scala代码注解

/**
 * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
 *
 * This removes the data and metadata for this partition.
 * The data is actually moved to the .Trash/Current directory if Trash is configured,
 * unless 'purge' is true, but the metadata is completely lost.
 * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
 * Note: purge is always false when the target is a view.
 *
 * The syntax of this command is:
 * {{{
 *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
 * }}}
 */

在您的情况下,没有备份表。我们可以将 Dataframe 注册为临时表,并使用上面的语法(临时表文档)
在pyspark中,我们可以使用以下链接中的语法运行SQL示例:

df = spark.read.format('parquet').load('Revenue.parquet'). registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")
ddrv8njm

ddrv8njm2#

下面的语句将只删除与分区信息相关的元数据。

ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");

如果你想删除数据,你需要把你的配置单元外部表的tblproperties设置为False。2它会把你的配置单元表设置为托管表。

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='FALSE');

您可以将其设置回外部表。

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='TRUE');

我尝试使用spark session设置给定属性,但遇到了一些问题。

spark.sql("""alter table db.test_external set tblproperties ("EXTERNAL"="TRUE")""")
pyspark.sql.utils.AnalysisException: u"Cannot set or change the preserved property key: 'EXTERNAL';"

我相信一定有办法做到这一点。我最终使用了Python。我在PysPark中定义了下面的函数,它完成了这项工作。

query=""" hive -e 'alter table db.yourtable set tblproperties ("EXTERNAL"="FALSE");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");' """

def delete_partition():
        print("I am here")
        import subprocess
        import sys
        p=subprocess.Popen(query,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1) 

>>> delete_partition()

这将同时删除元数据和数据。注意。我已经用HiveORC外部分区表测试过了,它在loaded_date分区

# Partition Information
# col_name              data_type               comment

loaded_date             string

更新:基本上你的数据是躺在hdfs位置的子目录命名为

/Revenue/month=2015-02-01
/Revenue/month=2015-03-01
/Revenue/month=2015-03-01

等等

def delete_partition(month_delete):
      print("I am here")
      hdfs_path="/some_hdfs_location/Revenue/month="
      final_path=hdfs_path+month_delete
      import subprocess
      subprocess.call(["hadoop", "fs", "-rm", "-r", final_path])
      print("got deleted")

delete_partition("2015-02-01")
oug3syen

oug3syen3#

如果你想在pyspark中完成而不使用配置单元表,你可以按照以下步骤完成:
1-获取新数据的分区
2-检查其对应的 parquet 分区是否存在并删除
3-以附加方式写入
因此,这里我假设'month'是 Dataframe 中的分区列:

new_data_paritions = df.select('month').distinct().collect()
new_data_paritions = [v['month'] for v in new_data_paritions] #list of all new partitions

#Check if they exist and delete them:
for partition in new_data_paritions:
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    if fs.exists(sc._jvm.org.apache.hadoop.fs.Path("/path/Revenue/month={}".format(partition))):
        dbutils.fs.rm("/path/Revenue/month={}".format(partition)), True)

df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

我使用Databricks笔记本,因此我使用“dbutils.fs.rm”删除文件夹(分区)。

相关问题