我正在运行kubernetes星团上的spark。当用许多分区重新划分数据时,强制每个分区只有一个文件,我的pod被逐出。
错误如下:
The node was low on resource: ephemeral-storage. Container sosreport-spark-cluster-opendatahub-w was using 56291400Ki, which exceeds its request of 0.
我的spark配置是:
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
print('Spark cluster is: {}'.format(spark_cluster))
sc_conf = (
pyspark.SparkConf().setMaster(spark_cluster) \
.set('spark.driver.host', HOSTNAME) \
.set('spark.driver.port', 42000) \
.set('spark.driver.bindAddress', '0.0.0.0') \
.set('spark.driver.blockManager.port', 42100) \
.set('spark.executor.memory', '1536M') \
.set('spark.executor.cores', '2') \
.set('spark.sql.parquet.enableVectorizedReader', True) \
.set('spark.kubernetes.memoryOverheadFactor', '0.20')
)
return sc_conf
我就是这样重新划分数据的:
def save_repartitioned_dataframe(bucket_name, df):
dest_path = form_path_string(bucket_name, repartitioned_data=True)
print('Trying to save repartitioned data at: {}'.format(dest_path))
df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
"created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
print('Data repartitioning complete with at the following location: ')
print(dest_path)
_, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
return count, distinct_count, num_partitions
1条答案
按热度按时间fnx2tebb1#
您的问题可能不是使用临时存储,而是将整个数据发送给单个工作进程。
.repartition(1,“创建年”,“创建月”,“创建日”)
您将所有数据合并到一个spark分区中,然后将其写入所有分区。
您可能需要做的是按分区键进行全局排序,然后执行write.partionby。这样做的目的是将大部分数据发送到单个表分区,并发送到数量有限的spark分区(如果分区很小,通常是单Spark分区)
这通常看起来像。。。