如何在pyspark中运行多k means集群和使用groupby

dxpyg8gm  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(455)

我有这样一个数据集:

|Seq_key|   |Class_id|  |value|
Seq_key 1   Class_id 1  value 1
Seq_key 1   Class_id 2  value 2
Seq_key 1   Class_id 3  value 3
Seq_key 1   Class_id 4  value 4
Seq_key 1   Class_id 5  value 5
Seq_key 1   Class_id 6  value 6
Seq_key 2   Class_id 1  value 1
Seq_key 2   Class_id 2  value 2
Seq_key 2   Class_id 3  value 3
Seq_key 2   Class_id 4  value 4
Seq_key 2   Class_id 5  value 5
Seq_key 2   Class_id 6  value 6
Seq_key 2   Class_id 7  value 7
Seq_key 3   Class_id 1  value 1
Seq_key 3   Class_id 2  value 2
Seq_key 3   Class_id 3  value 3
Seq_key 3   Class_id 4  value 4
Seq_key 3   Class_id 5  value 5
Seq_key 3   Class_id 6  value 6
Seq_key 3   Class_id 7  value 7
Seq_key 3   Class_id 8  value 8

这个 Class_ids 以及 values 对于每个 Seq_key 相互排斥。我将k-均值聚类应用于每个 Seq_key 并找到最优的簇数、质心等,使得每个簇的输出 Seq_key 它们是这样的:

|Seq_key|   |Class id|  |Cluster|  |Centroid|
Seq_key 1   Class_id 1     1         128
Seq_key 1   Class_id 2     2         56
Seq_key 1   Class_id 3     3         100
Seq_key 1   Class_id 4     1         128
Seq_key 1   Class_id 5     1         128
Seq_key 1   Class_id 6     4         72
Seq_key 2   Class_id 1     1         5.5
Seq_key 2   Class_id 2     1         5.5
Seq_key 2   Class_id 3     2         3.4
Seq_key 2   Class_id 4     3         1.7
Seq_key 2   Class_id 5     1         5.5
Seq_key 2   Class_id 6     2         3.4
Seq_key 2   Class_id 7     2         3.4
Seq_key 3   Class_id 1     4         500
Seq_key 3   Class_id 2     1         700
Seq_key 3   Class_id 3     3         274
Seq_key 3   Class_id 4     2         189
Seq_key 3   Class_id 5     2         189
Seq_key 3   Class_id 6     4         500
Seq_key 3   Class_id 7     1         700
Seq_key 3   Class_id 8     3         274

目前,我正在每个 Seq_key 手动并应用 pyspark.ml.clustering 图书馆。但这显然是低效的,因为 seq_keys 增加到数万。另外,我没有正确地利用spark的分布式计算。
这个 Seq_key 是互斥的,因此它们不能与其他 Seq_keys 有没有一种方法可以通过 groupBy 喜欢的方法 ml 图书馆?即使只是计算由 Seq_key 就够了。这可能吗?

dvtswwa3

dvtswwa31#

您可能可以通过水平并行来改进运行时,即并行运行多个spark作业,如下所示:

from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count

def run_kmeans(seqid, data=sens):

    df_tmp=data.filter(col('SEQ_ID')==seqid)\
        .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')

    return cluster

pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)
ijnw1ujt

ijnw1ujt2#

因此,我实施了一个临时解决方案,从这篇文章中获得了一些想法。
我收集了一份不同的 Seq_keys 然后手动for循环遍历每一个并应用pyspark kmeans方法,如下所示:

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler    
fleets=list(sens.select('SEQ_KEY').distinct().toPandas()['SEQ_KEY'])
for seqid in fleets:
    df_tmp=sens.filter(col('SEQ_ID')==seqid)\
    .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')
    fleet_clusters.append(cluster)

final_clustered_fleet=reduce(DataFrame.unionByName, fleet_clusters)

我暂时不考虑质心。仅仅获取集群信息就足够了。
这显然是肮脏和低效的。事实上,由于天气原因,我的工作运行了大约8个小时 collect 由kmeans函数调用的方法。90%的工作节点处于空闲状态。如果有一种更有效的方法来做到这一点(最好利用spark提供的多个工作节点),那就太好了。

相关问题