我尝试过使用不同大小的集群(AWS上的EMR),但由于YARN杀死了所有节点,它总是失败:https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/
我认为这是由于内存要求太高,但我有一个10个m5.4xlarge示例(64Giga RAM)的集群,它仍然失败。
Pyspark代码:
num_of_ints = int(size_in_mb * 1024 * 1024 / 4)
max_int = 2147483647
# Create a SparkSession
spark = SparkSession\
.builder\
.appName("GenerateRandomData") \
.getOrCreate()
# Generate a DataFrame with num_of_ints rows and a column named "value" between 0 and max_int
df = spark.range(num_of_ints).withColumn("value", (rand(seed=42) * max_int).cast("integer"))
# Save the DataFrame to a Parquet file
out_file = os.path.join(out_folder, 'random_list.parquet')
partitions = math.ceil(size_in_mb/10000) # the parquet file will be broken to chunks of 10giga
df.repartition(partitions).write.mode("overwrite").parquet(out_file)
# Stop the SparkSession
spark.stop()
我是开放的,以任何其他方式来创建一个50Giga的 parquet 文件,其中有随机整数。
此外,数据生成阶段仅通过2次任务完成,但我的集群中有大约140个核心:From spark UI
谢谢!
1条答案
按热度按时间oewdyzsn1#
delta表的核心是parquet文件,正如我们在spark中所知道的,除非你重新分区(1),否则你有多个分区。
让我们使用迭代方法向增量表添加数据,直到行数或大小与您的目标相匹配。
下面的代码创建一个新数据库。
下面的代码创建一个新表。
创建一个函数以向增量表添加数据。
每次调用要添加一百万行,可以使用for in range循环重复调用add_random_data()函数。
我使用stamp来捕获调用函数的日期/时间。
最后但同样重要的是,我们可以使用增量表属性来获取总字节数。
一旦你有了正确的大小,你总是可以写出任何格式,你想要的。
请记住,parquet是一种列存储格式,创建一个50 GB的文件可能需要一段时间,但是这种迭代方法应该可以避免内存不足的问题。