pyspark 多重动作引发故障

ezykj2lf  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(104)

我是Spark的新手。
我遇到了一些问题与部分关于保存df到Hive表。

def insert_into_hive_table(df: DataFrame, table_name: str):
        # for debugging - this action is working and contain 1528 rows
        logger.info(f'check if df in insert_into_hive_table can do action before saving')
        df = df.persist()
        logger.info(f'df count is {df.count()} rows')

        with spark_conf_context("hive.exec.dynamic.partition.mode", "nonstrict"), spark_conf_context("spark.sql.sources.partitionOverwriteMode", "dynamic"), spark_conf_context("spark.sql.parquet.output.committer.class", "org.apache.parquet.hadoop.ParquetOutputCommitter"), spark_conf_context("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol"):
            df.select(output_cols).write.insertInto(table_name)
        
        # for debbuging - this action is failing
        logger.info(f'check if df in insert_into_hive_table can do action after saving')
        df = df.persist()
        logger.info(f'df count is {df.count()} rows')

我原来的函数只是函数的中间部分。第一个计数和保存操作正在快速工作,我在Hive表中看到了正确的结果。但是最后一次计数总是在广播超时时失败,我甚至不知道广播在哪里发生。
一些可能影响的细节:
1.我得到这个错误:

No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

同时保存到Hive。
2.我的默认分区号是200。
3.我使用CDP和Spark 2.4.8
4.我已经增加了广播超时到720秒
5.该表包含2个分区
edit:我看到在insertInto之后,我在我的spark UI存储上看不到df(但是在insertInto之前我看到了),有什么建议吗?

relj7zay

relj7zay1#

如果你只是想保存spark df在配置单元表中,那么只需要使用下面的df.write.mode(“overwrite”).format(“delta”).saveAsTable(“Giveyourtablename”)

相关问题