问题归结为以下几个方面:我想在pyspark中使用现有的并行化输入集合和一个函数生成一个dataframe,该函数给定一个输入就可以生成相对较大的一批行。在下面的示例中,我希望使用1000个执行器生成10^12行Dataframe:
def generate_data(one_integer):
import numpy as np
from pyspark.sql import Row
M = 10000000 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
row_type = Row("seed", "n", "x")
return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]
N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
StructField("seed", IntegerType()),
StructField("n", IntegerType()),
StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)
(我真的不想研究给定种子的随机数的分布-这只是一个示例,我可以用它来说明大型Dataframe不是从仓库加载的,而是由代码生成的)
上面的代码几乎完全符合我的要求。问题是,它以一种非常低效的方式来实现它—代价是为每一行创建一个python行对象,然后将python行对象转换为内部spark列表示。
有没有一种方法可以转换已经以列表示形式存在的一批行(例如,上面的一个或几个numpy数组) np_array
)只是让spark知道这些是一批值的列?
e、 g.我可以编写代码来生成python集合rdd,其中每个元素都是pyarrow.recordbatch或pandas.dataframe,但是如果不在这个过程中创建pyspark行对象的rdd,我就找不到方法将这些元素转换成spark dataframe。
至少有十几篇文章举例说明了如何使用pyarrow+pandas有效地将本地(到驱动程序)pandasDataframe转换为sparkDataframe,但这对我来说不是一个选择,因为我需要在执行器上以分布式方式实际生成数据,而不是在驱动程序上生成一个Dataframe并将其发送给执行器。
升级。我找到了一种避免创建行对象的方法——使用python元组的rdd。正如预期的那样,它仍然太慢,但仍然比使用row对象快一点。不过,这并不是我真正想要的(这是从python向spark传递列数据的一种非常有效的方法)。
在机器上做某些操作的测量时间(粗略的方法,测量时间有很大的变化,但在我看来仍然具有代表性):所讨论的数据集是10m行,3列(一列是常量整数,另一列是0到10m-1的整数范围,第三列是使用 np.random.random_sample
:
本地生成Dataframe(10m行):~440-450ms
本地生成spark.sql.row对象的python列表(10m行):~12-15s
本地生成表示行的元组的python列表(10m行):~3.4-3.5s
仅使用1个执行器和1个初始种子值生成sparkDataframe:
使用 spark.createDataFrame(row_rdd, schema=my_schema)
约70-80岁
使用 spark.createDataFrame(tuple_rdd, schema=my_schema)
:~40-45秒
(非分布式创建)使用 spark.createDataFrame(pandas_df, schema=my_schema)
:~0.4-0.5秒(不需要大致相同的时间)-有 spark.sql.execution.arrow.enabled
设置为true。
对于10m行,本地到驱动程序的Dataframe在~1s内转换为sparkDataframe的例子让我有理由相信,对于在执行器中生成的Dataframe,同样的情况应该是可能的。不过,我现在可以实现的最快速度是使用python元组的rdd,10m行的速度是~40s。
所以问题仍然存在-有没有一种方法可以在pyspark中高效地以分布式方式生成一个大的sparkDataframe?
3条答案
按热度按时间b4wnujal1#
听起来瓶颈是从rdd->dataframes的转换,而且手头的函数相当快,通过pyarrow将pandadf转换为spark df也相当快。以下是两种可能的解决方案:
因为并行创建df很容易,所以使用
df.to_parquet
,即:Spark读取结果Parquet文件应该是微不足道的事后。然后瓶颈就变成了io限制,这应该比spark转换元组/行类型快。
如果不允许您将任何内容保存到文件中,
pandas_udf
以及GROUPED_MAP
如果你的spark版本足够新的话,可能会对你有所帮助。它还使用pyarrow在spark dfs和pandas dfs之间进行转换,因此它应该比使用元组更快,并允许您以分布式方式从udf创建和返回pandas dfs。较慢的部分将是
groupby
你可能可以加快速度,这取决于你如何批量种子进入generate_data_udf
,即:0g0grzrc2#
以下是一个不使用rdd或创建行的解决方案,但仅使用Dataframe操作:
(代码是用scala编写的,但用python编写同样的代码应该很简单)
ogq8wdun3#
以下是不使用
Row
-仅基于rdd。我认为这可能是最有效的方法,因为它使用map
计算函数输出和flatMap
为了合并这些输出-这两个操作都在RDD上执行,所以所有的东西都应该被分发。