我在处理spark wide Dataframe 时遇到了问题(大约9000列,有时更多)。
任务:
1.通过groupBy和透视创建宽DF。
1.将列转换为载体,并从pyspark.ml处理到KMeans中。
所以我做了一个扩展的框架,尝试用VectorAssembler创建向量,缓存它并在上面训练KMeans。
在我的PC上,在独立模式下,对于帧~500x9000,7个不同数量的集群,组装大约花了11分钟,KMeans花了2分钟。另一方面,在Pandas中(旋转df和迭代7个集群),这种处理只需要不到一分钟。
- 显然 * 我理解独立模式和缓存等会降低开销和性能,但这真的让我很沮丧。
有人能解释一下我怎样才能避免这种开销吗?
人们是如何使用宽DF而不是使用矢量汇编器来工作的,这会导致性能下降?
更正式的问题(对于sof规则)听起来像-我怎样才能加速这段代码?
%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
.groupBy('User')
.pivot('ObjectPath')
.agg({'PropertyFlagValue':'max'})
.fillna(0))
ignore = ['User']
assembler = VectorAssembler(
inputCols=[x for x in tmp.columns if x not in ignore],
outputCol='features')
Wall time: 36.7 s
print(tmp.count(), len(tmp.columns))
552, 9378
%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s
%%time
lst_levels = []
for num in range(3, 14):
kmeans = KMeans(k=num, maxIter=50)
model = kmeans.fit(transformed)
lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
if i - j < j:
print(rs.index(i))
kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
model = kmeans.fit(transformed)
break
Wall time: 1min 32s
配置:
.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \
2条答案
按热度按时间6uxekuva1#
VectorAssembler的transform函数处理所有列,并在每列上存储元数据以及原始数据,这需要时间,也会占用RAM。
为了给出一个精确的数字来说明增加了多少,您可以将转换前后的 Dataframe 转储为 parquet 文件并进行比较。与由VectorAssembler构建的特征向量相比,由手工或其它特征提取方法构建的特征向量可导致大小增加10倍,这是针对仅具有10个参数的逻辑回归。如果数据集的列数与您拥有的列数一样多,情况会变得更糟。
几点建议:
chhqkbe12#
实际上,在map中找到了rdd的解决方案。
1.首先,我们要创建价值观Map。
1.同时提取所有不同名称。
1.倒数第二步,我们在名称字典中搜索行Map的每个值,如果没有找到,则返回值或0。
1.结果上的向量汇编程序。
优点:
1.您不必创建具有大量列数的宽 Dataframe ,从而避免开销。(速度从11分钟上升到1分钟。)
1.您仍然在集群上工作,并在Spark范例中执行代码。
代码示例:scala implementation.