使用pyspark框架执行xgboost预测

mgdq6dx1  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(285)

我有一个包含预测输入数据集的数据集,它由一些通用列、一些特征列和一个标签列组成。我还有一个xgb.Booster类型的xgboost模型。如何使用xgb模型对输入数据集执行预测?这是我现在拥有的代码

from pyspark.sql.functions import col, DataFrame, udf, array
from pyspark.sql.types import FloatType

feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
all_cols = ["id1", "id2"] + feature_cols + label_col
pred_col_name = "pred"
prediction_path = "/input/path"

prediction_input = spark.read.parquet(prediction_path).select(all_cols)

for column in feature_cols:
    prediction_input = prediction_input.withColumn(column, col(column).cast("float"))

def predict_udf(*features):
    dmatrix = xgb.DMatrix(list(features))
    return float(model.predict(dmatrix)[0])

predict_udf_spark = udf(predict_udf, FloatType())

mask_labeled = prediction_input.filter(col(label_col) != 0)

if mask_labeled.count() > 0:
    prediction_labeled = mask_labeled.withColumn(pred_col_name, predict_udf_spark(*[mask_labeled[col] for col in feature_cols]))
    prediction_labeled.select(*output_cols).repartition(1000).write.csv(output_path, header=False, mode="append")

字符串
但这段代码给了我以下错误

jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'


如何解决此问题?

wkyowqbh

wkyowqbh1#

据我所知,您试图在spark上下文中使用xgboost libraryxgboost算法。请注意,xgboost库中有一个专用的spark实现,您的代码似乎没有使用它。(从您的predict_udf函数中,我了解到您正在尝试处理pyspark数据,执行预测,并将预测转换回pyspark)。您可以使用SparkXGBClassifier来适应您的模型并应用预测,从而使代码变得更加简单。
下面你可以找到一个完全可复制的例子,展示如何拟合SparkXGBClassifier并使用它的预测,其中代码是用pyspark==3.5.0xgboost=2.0.0编写的,请注意,我的示例将机器学习部分保持在绝对最低限度,并且没有展示如何训练机器学习模型。(例如,实际上没有模型评估,超参数调整,验证等)
请注意,由于您没有提供任何示例数据或有关依赖关系的信息,因此很难复制您的问题。使用了我的SparkXGBClassifier示例中的数据后,我从这个函数中得到一个错误:

def predict_udf(*features):
    dmatrix = xgb.DMatrix(list(features))
    return float(model.predict(dmatrix)[0])

字符串
这给了我一个错误:

ValueError: Please reshape the input data into 2-dimensional matrix.

如何在(可能)类似于您的数据上训练SparkXGBClassifier的可重现示例:

# required for Google Colab 
# !pip install pyspark

import random

from xgboost.spark import SparkXGBClassifier
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName('spark_session').getOrCreate()

dataset_size = 100_000
labels = [0, 1, 2]

feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
pred_col_name = "pred"
all_cols = ["id1", "id2"] + feature_cols + [label_col]

data = [
    (i+1, i+1001, random.random(), random.random() * 1_000, random.random() * 1_000, random.choice(labels))
    for i in range(dataset_size)
]

prediction_input = spark.createDataFrame(data, ("id1", "id2", "feature1", "feature2", "feature3", 'label'))

# VectorAssembler required if you cannot train on GPU
vec_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
prediction_input = vec_assembler.transform(prediction_input)

prediction_input.show(5)

# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# |id1| id2|           feature1|         feature2|          feature3|label|            features|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# |  1|1001| 0.5020143404453867|317.9123523833789| 80.99056175390396|    1|[0.50201434044538...|
# |  2|1002| 0.5070869997709387|760.0823371718348| 939.3547071386726|    0|[0.50708699977093...|
# |  3|1003| 0.4075714302471459|196.3132088899051| 618.1676550139244|    2|[0.40757143024714...|
# |  4|1004| 0.2969972133808245|708.3811336815659| 633.2752459199171|    0|[0.29699721338082...|
# |  5|1005|0.18151957704419186|307.2738259905281|416.45499440549196|    2|[0.18151957704419...|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+

# create a xgboost pyspark regressor estimator and set device="cuda"
classifier = SparkXGBClassifier(
  features_col='features',
  label_col=label_col
)

# train and return the model
model = classifier.fit(prediction_input)

# predict on test data
predict_df = model.transform(prediction_input)
predict_df.show(5)

# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
# |id1| id2|           feature1|         feature2|          feature3|label|            features|       rawPrediction|prediction|         probability|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
# |  1|1001| 0.5020143404453867|317.9123523833789| 80.99056175390396|    1|[0.50201434044538...|[0.45034444332122...|       2.0|[0.33440056443214...|
# |  2|1002| 0.5070869997709387|760.0823371718348| 939.3547071386726|    0|[0.50708699977093...|[0.59553760290145...|       0.0|[0.35163819789886...|
# |  3|1003| 0.4075714302471459|196.3132088899051| 618.1676550139244|    2|[0.40757143024714...|[0.48832395672798...|       2.0|[0.33687391877174...|
# |  4|1004| 0.2969972133808245|708.3811336815659| 633.2752459199171|    0|[0.29699721338082...|[0.60149568319320...|       0.0|[0.35905247926712...|
# |  5|1005|0.18151957704419186|307.2738259905281|416.45499440549196|    2|[0.18151957704419...|[0.26893869042396...|       2.0|[0.27391168475151...|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+

相关问题