我是Spark的新手。
我遇到了一些问题与部分关于保存df到Hive表。
def insert_into_hive_table(df: DataFrame, table_name: str):
# for debugging - this action is working and contain 1528 rows
logger.info(f'check if df in insert_into_hive_table can do action before saving')
df = df.persist()
logger.info(f'df count is {df.count()} rows')
with spark_conf_context("hive.exec.dynamic.partition.mode", "nonstrict"), spark_conf_context("spark.sql.sources.partitionOverwriteMode", "dynamic"), spark_conf_context("spark.sql.parquet.output.committer.class", "org.apache.parquet.hadoop.ParquetOutputCommitter"), spark_conf_context("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol"):
df.select(output_cols).write.insertInto(table_name)
# for debbuging - this action is failing
logger.info(f'check if df in insert_into_hive_table can do action after saving')
df = df.persist()
logger.info(f'df count is {df.count()} rows')
我原来的函数只是函数的中间部分。第一个计数和保存操作正在快速工作,我在Hive表中看到了正确的结果。但是最后一次计数总是在广播超时时失败,我甚至不知道广播在哪里发生。
一些可能影响的细节:
1.我得到这个错误:
No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
同时保存到Hive。
2.我的默认分区号是200。
3.我使用CDP和Spark 2.4.8
4.我已经增加了广播超时到720秒
5.该表包含2个分区
edit:我看到在insertInto之后,我在我的spark UI存储上看不到df(但是在insertInto之前我看到了),有什么建议吗?
1条答案
按热度按时间relj7zay1#
如果你只是想保存spark df在配置单元表中,那么只需要使用下面的df.write.mode(“overwrite”).format(“delta”).saveAsTable(“Giveyourtablename”)