请注意,由于这篇文章的篇幅,我建议你看一下每个功能介绍。这是因为,这些函数在没有任何错误的情况下成功执行。我只是把它们呈现给读者,让读者大致了解执行的代码。所以要多注意我的理论部分和问题部分,而不是技术部分。
[理论部分-说明情况]
首先,我想指出这是一个与执行时间相关的问题。虽然时间执行是我关心的问题,但所演示的代码工作得非常完美。
我想听听你对我最近几天在一个拥有32个cpu核的databricks集群上处理线程和多处理python模块的经历的看法。非常简单地说,我创建了一个函数(如下所示),它将sparkDataframe作为输入,并训练两个spark mllib分类器。在培训之前,使用类似spark的命令对sparkDataframe进行一些额外的清理和准备。将给出训练和预处理每个sparkDataframe所需的时间。该函数包括训练和预处理功能,应用了15次(即15个不同的sparkDataframe)。因此,您可以理解,我使用线程和多处理的目标是一次执行这15个迭代,而不是按顺序(一个接一个)执行。试想一下,这15次迭代在不久的将来将变成1500次。因此,这是数据规模扩大的一个基准。
在继续之前,我想说明一下我在处理线程和多处理时得出的一些结论。根据brendan fortuner的这篇文章,线程主要用于受gil限制的i/o绑定任务(防止两个线程在同一个程序中同时执行)。另一方面,多处理模块使用进程来加速cpu密集型python操作,因为它们受益于多核并避免了gil。因此,尽管我最初创建了一个线程相似的应用程序,可以同时应用我的函数15次,但由于上面写的原因,我后来改为使用多处理方法。
【技术部分】
sparkDataframe
spark_df= pd.DataFrame({ 'IMEI' : ['358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721735', '358639059721735', '358639059721735', '358639059721735', '358639059721735'],
'PoweredOn': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
'InnerSensorConnected': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
'averageInnerTemperature': [2.5083397819149877, 12.76785419845581, 2.5431994716326396, 2.5875612214150556, 2.5786447594332143, 2.6642078435610212, 12.767857551574707, 12.767857551574707, 2.6131772499486625, 2.5172743565284166]
'OuterSensorConnected':[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
'EnergyConsumption': [70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0],
'DaysDeploymentDate': [10.0, 20.0, 21.0, 31.0, 41.0, 11.0, 19.0, 57.0, 44.0, 141.0],
'label': [0, 0, 1, 1, 1, 0, 0, 1, 1, 1]
}
)
spark_df= spark.createDataFrame(spark_df)
Dataframe的出现只是为了记住所使用的sparkDataframe。假设这10行是7000行,2个imei实际上是15个惟一的imei,因为我告诉过你,每个imei有15个sparkDataframe1(['358639059721529','358639059721735'])。
- [应用的功能]
def training_models_operation_multiprocess(asset_id, location, asset_total_number, timestamp_snap, joined_spark_dataset):
#-------------------------------------------------------------------------------------------------------------------------
# KEYWORDS INITIALIZATION
#-------------------------------------------------------------------------------------------------------------------------
device_length=int(asset_total_number)
list_string_outputs=list()
max_workers=16*2
training_split_ratio=0.5
testing_split_ratio=0.5
cross_validation_rounds=2
optimization_metric="ROC_AUC"
features_column_name="features"
disable_logging_value=1 # a value that prevents standard output to be logged at Application insights
logger_initialization=instantiate_logger(instrumentation_key_value) #a logger instance
# Time format
date_format = '%Y-%m-%d %H-%M-%S'
#-------------------------------------------------------------------------------------------------------------------------
# KEYWORDS INITIALIZED
#-------------------------------------------------------------------------------------------------------------------------
try:
print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length))
begin_time_0 = time.time()
#1.1 Filter the rows related to the current asset
begin_time_1 = time.time()
filtered_dataset=joined_spark_dataset.where(joined_spark_dataset.IMEI.isin([asset_id]))
filtered_dataset=apply_repartitioning(filtered_dataset, max_workers)
end_time_1 = time.time() - begin_time_1
list_string_outputs.append("{0}: FINISH Step 1.1 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_1)))
#------------------------
# FUNCTION: 1.2 Preprocess
begin_time_2 = time.time()
target_column_name=None
target_column_name='label'
preprocessed_spark_df=preprocess_data_pipeline(filtered_dataset, drop_columns_not_used_in_training, target_column_name, executor)
preprocessed_spark_df=apply_repartitioning(preprocessed_spark_df, max_workers)
end_time_2 = time.time() - begin_time_2
list_string_outputs.append("{0}: FINISH Step 1.2 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_2)))
#------------------------
#FUNCTION: 1.3 Train-Test split
begin_time_3 = time.time()
target_column_name=None
target_column_name='target'
training_data, testing_data=spark_train_test_split(asset_id, preprocessed_spark_df, training_split_ratio, testing_split_ratio, target_column_name, disable_logging_value, logger_initialization)
training_data=apply_repartitioning(training_data, max_workers)
testing_data=apply_repartitioning(testing_data, max_workers)
end_time_3 = time.time() - begin_time_3
list_string_outputs.append("{0}: FINISH Step 1.3 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_3)))
#FUNCTION: 1.4 Train the algorithms
begin_time_4 = time.time()
best_classifier_asset_id=spark_ml_classification(asset_id, cross_validation_rounds, training_data, testing_data, target_column_name, features_column_name, optimization_metric, disable_logging_value,
logger_initialization)
end_time_4 = time.time() - begin_time_4
list_string_outputs.append("{0}: FINISH Step 1.4 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_4)))
end_time_0 = time.time() - begin_time_0
list_string_outputs.append("{0}: END EXECUTION PLAN OF ASSET ID {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_0)))
except Exception as e:
custom_logging_function(logger_initialization, disable_logging_value, "ERROR", "ERROR EXCEPTION captured in asset id {0}: {1}".format(asset_id, e))
raise
print(" ".join(list_string_outputs))
[函数1.1]:根据imei过滤数据集描述:从包含所有imei ID的整个数据集中,仅过滤属于每个迭代编号的imei的行
device\u id=['358639059721529','358639059721735']filtered\u dataset=spark\u df.where(spark\u df.imei.isin([device\u id]))
[功能1.2]:预处理spark df说明:对可训练特征应用矢量汇编程序,对标签应用stringindexer
def preprocess_data_pipeline(spark_df, target_variable)
stages = []
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol=target_variable, outputCol="target").setHandleInvalid("keep") #target variable shoule be IntegerType
stages += [label_stringIdx]
numeric_columns=["PoweredOn", "InnerSensorConnected", "averageInnerTemperature", "OuterSensorConnected", "OuterHumidity", "EnergyConsumption", "DaysDeploymentDate"]
# Vectorize trainable features
assemblerInputs = numeric_columns
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
stages += [assembler]
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(spark_df)
preppedDataDF = pipelineModel.transform(spark_df)
# Keep relevant columns
selectedcols = ["target", "features"]
dataset = preppedDataDF.select(selectedcols)
dataset=dataset.drop(target_variable)
#dataset.printSchema()
return dataset
[功能1.3:列车测试分割]使用分层方法将数据分割为列车和测试Spark测向仪
def spark_train_test_split(device_id, prepared_spark_df, train_split_ratio, test_split_ratio, target_variable):
trainingData = prepared_spark_df.sampleBy(target_variable, fractions={0: train_split_ratio, 1: train_split_ratio}, seed=10)
testData = prepared_spark_df.subtract(trainingData)
return trainingData, testData
【功能1.4:训练ml算法】描述:训练两种分类算法,然后选择roc\U auc得分最高的一种。每个分类器都使用spark mllib的crossvalidator类进行训练…对于第一个分类器(随机林),我交叉验证了4个模型,而对于第二个分类器(梯度增强树),我交叉验证了8个模型。为了加快这方面的速度,我将cross validator类的parallelism参数设置为8(这里有解释)
def machine_learning_estimator_initialization(model_name, target_variable, features_variable):
try:
dictionary_best_metric={}
dictionary_best_estimator={}
list_of_classifiers=["RandomForest Classifier", "GradientBoost Classifier"]
begin_time_train=time.time()
for i in list_of_classifiers:
pipeline_object, paramGrid, evaluator=machine_learning_estimator_initialization(i, target_column, features_column)
start_time_classifier=time.time()
# THE MOST TIME CONSUMING PART OF MY EXECUTION
classification_object = CrossValidator(estimator=pipeline_object, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=cross_validation_rounds, parallelism=8)
classificationModel = classification_object.fit(training_dataset)
end_time_classifier=time.time()-start_time_classifier
print("Time passed to complete training for classifier {0} of asset id {1}: {2}".format(i, device_id, format_timespan(end_time_classifier)))
predictions = classificationModel.transform(testing_dataset)
evaluation_score_classifier=evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
y_true = predictions.select([target_column]).collect()
y_pred = predictions.select(['prediction']).collect()
confusion_mat=confusion_matrix(y_true, y_pred)
confusion_table=pd.DataFrame(confusion_mat,
columns=['0','1'],
index=['0','1'])
accuracy_value=accuracy_score(y_true, y_pred)
f1_value=f1_score(y_true, y_pred, zero_division=1)
precision_value=precision_score(y_true, y_pred, zero_division=1)
recall_value=recall_score(y_true, y_pred, zero_division=1)
hamming_loss_value=hamming_loss(y_true, y_pred)
zero_one_loss_value=zero_one_loss(y_true, y_pred, normalize=False)
list_of_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall', 'hamming_loss', 'zero_one_loss']
list_of_metric_values=[evaluation_score_classifier, accuracy_value, f1_value, precision_value, recall_value, hamming_loss_value, zero_one_loss_value]
evaluation_metric_name_index=list_of_metrics.index(evaluation_metric) # With this index I can locate any value selected for the evaluation metric
if evaluation_metric=='ROC_AUC':
dictionary_best_metric.update({"{0}_best_score".format(i): evaluation_score_classifier}) #alternative hamming_loss_value
else:
dictionary_best_metric.update({"{0}_best_score".format(i): list_of_metric_values[evaluation_metric_name_index]})
dictionary_best_estimator.update({"{0}_best_estimator".format(i): classificationModel.bestModel})
end_time_train=time.time()-begin_time_train
print("Total time of training execution of two MLlib algorithms for the asset {0}: {1}".format(device_id, format_timespan(end_time_train)))
maximum_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall']
minimum_metrics=['hamming_loss', 'zero_one_loss']
if evaluation_metric in maximum_metrics:
index_of_best_model_score=list(dictionary_best_metric.keys()).index(max(dictionary_best_metric, key=dictionary_best_metric.get))
else:
index_of_best_model_score=list(dictionary_best_metric.keys()).index(min(dictionary_best_metric, key=dictionary_best_metric.get))
classification_model_for_scoring=list(dictionary_best_estimator.values())[index_of_best_model_score]
except Exception as e:
print(e)
return classification_model_for_scoring
上面介绍的四个函数是应用15次的函数(15个sparkDataframe,每个imei 1个惟一id)。因为我关心的是执行这15个函数迭代所花费的时间。如前所述,我已经按照线程模块实现了一种方法。方法如下:
[线程方法]
import threading
# Creating a list of threads
device_ids=spark_df.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
device_ids=device_ids[-15:]
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)
date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)
thread_list = list()
# #looping all objects, creating a thread for each element in the loop, and append them to thread_list
for location, i in enumerate(device_ids, 1):
try:
thread = threading.Thread(target=training_models_operation_multiprocess, args=(i, location, asset_total_number, timestamp_snap, spark_df,)
thread_list.append(thread)
thread.start()
except Exception as e:
print(e)
**[BENCHMARK OF MULTIPROCESSING APPROACH]**
# --------------------------------------
# Wait for all threads to finish
for thread in thread_list:
thread.join()
print("Finished executing all threads")
基准测试:在具有32个cpu核的集群上:~16m
然而,正如前面提到的,踏步并不是我最后的方法。最后,在阅读了有关多重处理的一些内容之后,我选择了这种方法。
[多处理方法]
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import freeze_support
from itertools import cycle
if __name__ == '__main__':
freeze_support()
device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
device_ids=device_ids[-15:] #15 UNIQUE IMEI's
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)
pool_list=list()
with Pool(mp.cpu_count()) as pool:
start_time = time.time()
tasks = [*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([datalake_spark_dataframe_downsampled]))]
pool.starmap(training_models_operation_multiprocess,
iterable=(tasks),
chunksize=1)
pool.close()
pool.join()
pool.terminate()
end_time = time.time()
secs_per_iteration = (end_time - start_time) / len(device_ids)
print("Time per iteration: {0}".format(format_timespan(secs_per_iteration)))
[在具有32个cpu核的集群上进行基准测试]
对于每个imei及其相关的spark df,randomforest和gradientboostedtrees的平均执行时间分别为:5分钟和6分钟。
下面您将注意到执行上述4个子功能(1.1、1.2、1.3、1.4)所需的时间
[我的问题]
把我实验的所有事实和结果都讲出来了,是时候写我的问题了。
一个由32个cpu核组成的集群(2个工人,每个工人有16个核)怎么可能得到如此耗时的结果?一个池执行是否可能花费12分钟在一个大约有467行的Dataframe上运行交叉验证。。。我的spark-df总共有7000行,每个imeid有15个id,我得到467行。在我看来,32个cpu核的计算能力很强,但是它们在15分钟内执行一系列函数。
因此,我想了解为什么会发生这种情况:
是Spark的问题吗?也就是说,它不能正确分配32个cpu内核来执行4个简单的函数?结合多处理模块,我希望在更短的时间内完成15次迭代。也许这是我对多重处理还不了解的地方,我的执行只能达到这个执行时间。
我真的很感激你对这件事的意见,因为也许我错过了多重处理的要点。我无法理解这样一个事实:我有32个cpu核,每个池的执行需要1分钟,spark才能完成。请不要考虑我使用spark来训练500行dataframe数据的事实,因为在不久的将来这个df将有100000行甚至更多。所以我想到了sparkoverpython在如此少的行数上的缺点。但我更想了解的是多重处理方法。
暂无答案!
目前还没有任何答案,快来回答吧!