vectorassembler强制数据进入驱动程序我们怎样才能避免呢?

z9smfwbn  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(309)

当使用 pyspark.ml.feature 向量汇编程序用于将列列表转换为单个向量列,有利于特征提取和应用ml模型等。。。然而 transform 函数通过设计实现 collect 将所有数据传送给驱动程序的函数。我在spark ui的历史中看到了很多收集工作

代码:

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')

dag可视化

这抵消了Spark执行的效率。有没有一个对应的vectorassembler或者有没有一种方法可以在保持spark提供的分布式计算的同时使用它?

mf98qq94

mf98qq941#

我对spark mllib的理解非常有限。然而,看代码似乎 collect 您所指的操作是 BisectingKMeans 算法(正如@mck在其中一条评论中指出的)。

private def summarize(
      d: Int,
      assignments: RDD[(Long, VectorWithNorm)],
      distanceMeasure: DistanceMeasure): Map[Long, ClusterSummary] = {
    assignments.aggregateByKey(new ClusterSummaryAggregator(d, distanceMeasure))(
        seqOp = (agg, v) => agg.add(v),
        combOp = (agg1, agg2) => agg1.merge(agg2)
      ).mapValues(_.summary)
      .collect().toMap
  }

参考文献:https://github.com/apache/spark/blob/v2.4.6/mllib/src/main/scala/org/apache/spark/mllib/clustering/bisectingkmeans.scala#l304
从上面的代码片段来看,这个操作似乎并没有收集驱动程序上的全部数据,而且我认为我们也不能做很多事情来避免这个收集操作。
请注意,我从spark master branch的最新版本中选取了上述代码指针。根据您的spark版本,行号可能会有所不同。

相关问题