我是spark的新手,正在尝试学习如何并行执行以下代码:
# init of results dataframe, empty
schema = StructType([
StructField('user', StringType()),
StructField('mean', FloatType()),
StructField('std_dev', FloatType())
])
df_result = spark.createDataFrame([], schema)
# for each distinct user:
# - build table of his transactions
# - recon mean and std of 'importo'
# - append new row to df_result about this user
for usr in users:
# get usr transactions by union of the rows regarding this user
bons = df_bon[df_bon['user']==usr]
rics = df_ric[df_ric['user']==usr]
tot = bons.union(rics)
# LEFT ANTI JOIN to delete all the fraudolent transactions
no_frauds = tot.join(df_frodi, ['transaction_id'], "leftanti")
# recon mean and standard deviation, it is a 1-row dataframe
no_frauds = no_frauds.select(_mean(col('importo')).alias('mean'), _stddev(col('importo')).alias('std')).collect()
# create dataframe with user field, then append it to the result dataframe via union
df_result = df_result.union(spark.createDataFrame([(usr, no_frauds[0][0], no_frauds[0][1])], ['user', 'mean', 'std']))
我想了解如何增强这段代码,欢迎大家的建议,如果我写了不好的代码请原谅,但这是我第一次接近spark:)
我知道问题是for循环,它没有实现任何并行逻辑,但是我想知道什么是并行化的最佳实践(例如,可以使用标准吗 ThreadPool
方法?)
1条答案
按热度按时间n7taea2i1#
我的答案将是部分的,因为你没有提供任何数据,以充分了解你的情况。
但有点像: