大量色谱柱导致性能下降,Pyspark

x7yiwoj4  于 2023-02-09  发布在  Apache
关注(0)|答案(2)|浏览(165)

我在处理spark wide Dataframe 时遇到了问题(大约9000列,有时更多)。
任务:
1.通过groupBy和透视创建宽DF。
1.将列转换为载体,并从pyspark.ml处理到KMeans中。
所以我做了一个扩展的框架,尝试用VectorAssembler创建向量,缓存它并在上面训练KMeans。
在我的PC上,在独立模式下,对于帧~500x9000,7个不同数量的集群,组装大约花了11分钟,KMeans花了2分钟。另一方面,在Pandas中(旋转df和迭代7个集群),这种处理只需要不到一分钟。

  • 显然 * 我理解独立模式和缓存等会降低开销和性能,但这真的让我很沮丧。

有人能解释一下我怎样才能避免这种开销吗?
人们是如何使用宽DF而不是使用矢量汇编器来工作的,这会导致性能下降?
更正式的问题(对于sof规则)听起来像-我怎样才能加速这段代码?

%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
       .groupBy('User')
       .pivot('ObjectPath')
       .agg({'PropertyFlagValue':'max'})
       .fillna(0))
ignore = ['User']
assembler = VectorAssembler(
    inputCols=[x for x in tmp.columns if x not in ignore],
    outputCol='features')
Wall time: 36.7 s

print(tmp.count(), len(tmp.columns))
552, 9378

%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s

%%time
lst_levels = []
for num in range(3, 14):
    kmeans = KMeans(k=num, maxIter=50)
    model = kmeans.fit(transformed)
    lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
    if i - j < j:
        print(rs.index(i))
        kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
        model = kmeans.fit(transformed)
        break
 Wall time: 1min 32s

配置:

.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \
6uxekuva

6uxekuva1#

VectorAssembler的transform函数处理所有列,并在每列上存储元数据以及原始数据,这需要时间,也会占用RAM。
为了给出一个精确的数字来说明增加了多少,您可以将转换前后的 Dataframe 转储为 parquet 文件并进行比较。与由VectorAssembler构建的特征向量相比,由手工或其它特征提取方法构建的特征向量可导致大小增加10倍,这是针对仅具有10个参数的逻辑回归。如果数据集的列数与您拥有的列数一样多,情况会变得更糟。
几点建议:

  • 看看你是否可以用另一种方法构建你的特征向量。我不确定这在Python中的性能如何,但我在Scala中已经从这种方法中得到了很多里程碑。我注意到,对于手动构建的向量或使用除VectorAssembled方法之外的其他提取方法(TF-IDF)构建的向量,比较逻辑回归(10个参数),性能相差5 - 6倍。
  • 看看是否可以调整数据的形状以减少需要由VectorAssembler处理的列数。
  • 看看增加Spark可用的RAM是否有帮助。
chhqkbe1

chhqkbe12#

实际上,在map中找到了rdd的解决方案。
1.首先,我们要创建价值观Map。
1.同时提取所有不同名称。
1.倒数第二步,我们在名称字典中搜索行Map的每个值,如果没有找到,则返回值或0。
1.结果上的向量汇编程序。
优点:
1.您不必创建具有大量列数的宽 Dataframe ,从而避免开销。(速度从11分钟上升到1分钟。)
1.您仍然在集群上工作,并在Spark范例中执行代码。
代码示例:scala implementation.

相关问题