从一维列表pyspark高效添加列

kh212irz  于 2022-12-22  发布在  Spark
关注(0)|答案(1)|浏览(102)

我正在进行集成学习,我想把集成的单列矩阵放入用于收集数据的原始Dataframe中。
df如下所示,每六行重复一次UserId。

+------+-------+----------+----------+----------+----------+----------+----------+
|UserId|TrackId|Predictor1|Predictor2|Predictor3|Predictor4|Predictor5|Predictor6|
+------+-------+----------+----------+----------+----------+----------+----------+
|199810| 105760|         1|         1|         1|         1|         1|         1|
|199810|  18515|         1|         1|         1|         1|         1|         1|
|199810| 242681|         1|         1|        -1|        -1|        -1|        -1|
|199810|  74139|        -1|        -1|        -1|        -1|        -1|        -1|
|199810| 208019|        -1|        -1|        -1|        -1|        -1|        -1|
|199810|   9903|        -1|        -1|        -1|        -1|        -1|        -1|
|199812| 142408|         1|         1|         1|         1|         1|         1|
+------+-------+----------+----------+----------+----------+----------+----------+

s_ensemble矩阵看起来像

[[ 0.72892909]
 [ 0.72892909]
 [-0.58959307]
 ...
 [-0.72892909]
 [-0.72892909]
 [-0.72892909]]

两者都具有相同的行数120000。
我正在为扁平化为数组的矩阵创建一个Dataframe,并向这两个dataframe添加一个索引列,将这两个dataframe连接到同一个索引上,然后删除它。

df = df.select('UserId', 'TrackID').withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = spark.createDataFrame([np.array(s_ensemble).flatten().tolist(),], ['Ensembled_Prediction']).withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

df = df.join(b, df.row_idx == b.row_idx).drop("row_idx")

这可以创建它,但我无法看到它看起来像什么,我相信我的机器的内存最大化,因为它在合并这两个 Dataframe 在一起尖峰。我认为这是一个例外,试图df.show(5)

Py4JJavaError: An error occurred while calling o1086.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 159.0 failed 1 times, most recent failure: Lost task 9.0 in stage 159.0 (TID 166) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.

我撞了我的Jupyter笔记本使用16GS,我还在免费版本的Colab上刷爆了RAM。
有没有另一种方法可以在不创建多个临时大型数据框的情况下向现有数据框添加单个维度矩阵?

ruarlubt

ruarlubt1#

很明显,Apache Spark处理创建 Dataframe 索引的方法并不是很好,如果两个 Dataframe 有相同数量的分区(一定要明确指定),你可以用途:
withColumn(“行标识符”,单调递增标识符())
不过,在将输入数据馈送到pyspark之前在其中创建索引更为明智,例如使用np.ndenumerate(s_ensemble)。

相关问题