我有一个包含大量行(8000万-1亿行)的pyspark Dataframe 。我在上面推理一个模型,以获得每行的模型得分(概率)。如下面的代码:
import tensorflow as tf
from tensorflow import keras
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
def inference(df):
col_names = [c for c in df.columns]
@pandas_udf(returnType=T.DoubleType())
def predict_pandas_udf(*features):
X = pd.concat(features, axis=1)
X.columns = col_names
# Create a dataset
ds = tf.data.Dataset.from_tensor_slices(dict(X))
ds = ds.batch(1024)
# Get model
nn = keras.models.load_model(f'all_models/model_1')
prob = nn.predict(ds)
return pd.Series(prob[:, 0])
df = df. \
withColumn('probability', predict_pandas_udf(*(F.col(c) for c in df.columns )))
return df
df=spark.table('p13n_features_explore.ss_ltr_low_confidence_pairs_with_features_amalgam')
scores=inference(df)
scores.write.mode('overwrite').parquet(f'gs://p13n-storage2/data/features/smart_subs/lcp_pairs_amalgam')
这段代码的整个执行,也就是说,直到写入数据路径,大约需要22分钟。我想请十个模特儿来演。在其上推断十个模型并在最终得分中取平均值,如下面的代码。
import tensorflow as tf
from tensorflow import keras
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
def inference(df):
col_names = [c for c in df.columns]
@pandas_udf(returnType=T.DoubleType())
def predict_pandas_udf(*features):
X = pd.concat(features, axis=1)
X.columns = col_names
# Create a dataset
ds = tf.data.Dataset.from_tensor_slices(dict(X))
ds = ds.batch(1024)
# Get model
prob_scores_list=[]
for i in range(1,11):
nn = keras.models.load_model(f'all_models/model_{i}')
prob = nn.predict(ds)
prob_scores_list.append(prob[:, 0])
prob_scores = np.array(prob_scores_list)
return pd.Series(prob_scores.mean(axis=0))
df = df. \
withColumn('probability', predict_pandas_udf(*(F.col(c) for c in df.columns )))
return df
df=spark.table('p13n_features_explore.ss_ltr_low_confidence_pairs_with_features_amalgam')
scores=inference(df)
scores.write.mode('overwrite').parquet(f'gs://p13n-storage2/data/features/smart_subs/lcp_pairs_amalgam')
但整个行刑过程需要大量时间大约需要五个小时。如何在不到一个小时的时间内使其可执行?我们能去掉这个循环并以某种方式将这个实现向量化吗?
在pyspark中,有哪些方法可以使多模块集成在大量数据上的推理更快?
谢谢.
1条答案
按热度按时间nlejzf6q1#
这不必如此复杂。这里有一个具体的例子来说明如何做到这一点。
首先加载列表中的所有模型。广播名单。使用
value
访问列表广播变量的值。你可以像我下面做的那样将你的特征连接到一个数组中,然后逐个对样本进行推断。您可以通过在rdd上使用mapPartition函数来实现批处理语义,然后将结果转换回嵌套框架,如下所示。输出如下: