使用pyspark创建50Giga的随机整数Parquet文件失败

thtygnil  于 2023-03-07  发布在  Spark
关注(0)|答案(1)|浏览(154)

我尝试过使用不同大小的集群(AWS上的EMR),但由于YARN杀死了所有节点,它总是失败:https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/
我认为这是由于内存要求太高,但我有一个10个m5.4xlarge示例(64Giga RAM)的集群,它仍然失败。
Pyspark代码:

num_of_ints = int(size_in_mb * 1024 * 1024 / 4)
    max_int = 2147483647

    # Create a SparkSession
    spark = SparkSession\
        .builder\
        .appName("GenerateRandomData") \
        .getOrCreate()

    # Generate a DataFrame with num_of_ints rows and a column named "value" between 0 and max_int
    df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer"))

    # Save the DataFrame to a Parquet file
    out_file = os.path.join(out_folder, 'random_list.parquet')
    partitions = math.ceil(size_in_mb/10000) # the parquet file will be broken to chunks of 10giga
    df.repartition(partitions).write.mode("overwrite").parquet(out_file)

    # Stop the SparkSession
    spark.stop()

我是开放的,以任何其他方式来创建一个50Giga的 parquet 文件,其中有随机整数。
此外,数据生成阶段仅通过2次任务完成,但我的集群中有大约140个核心:From spark UI
谢谢!

oewdyzsn

oewdyzsn1#

delta表的核心是parquet文件,正如我们在spark中所知道的,除非你重新分区(1),否则你有多个分区。
让我们使用迭代方法向增量表添加数据,直到行数或大小与您的目标相匹配。
下面的代码创建一个新数据库。

%sql
CREATE DATABASE stack

下面的代码创建一个新表。

%sql 
CREATE TABLE someints (id INT, value INT, stamp STRING);

创建一个函数以向增量表添加数据。

# required libraries
from pyspark.sql.functions import *
from datetime import datetime

def add_random_data():
  
  # get variables
  num_of_ints = int(1024 * 1024)
  max_int = 2147483647
  stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
  table_name = "stack.someints"

  # create dataframe
  df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer")).withColumn("stamp", lit(stamp))

  # write data frame
  df.write.mode("append").format("delta").saveAsTable(table_name)

每次调用要添加一百万行,可以使用for in range循环重复调用add_random_data()函数。

%sql
select stamp, count(*) as cnt from stack.someints group by stamp

我使用stamp来捕获调用函数的日期/时间。

最后但同样重要的是,我们可以使用增量表属性来获取总字节数。

# get size in bytes
spark.sql("describe detail stack.someints").select("sizeInBytes").collect()

一旦你有了正确的大小,你总是可以写出任何格式,你想要的。
请记住,parquet是一种列存储格式,创建一个50 GB的文件可能需要一段时间,但是这种迭代方法应该可以避免内存不足的问题。

相关问题