我有一个包含预测输入数据集的数据集,它由一些通用列、一些特征列和一个标签列组成。我还有一个xgb.Booster类型的xgboost模型。如何使用xgb模型对输入数据集执行预测?这是我现在拥有的代码
from pyspark.sql.functions import col, DataFrame, udf, array
from pyspark.sql.types import FloatType
feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
all_cols = ["id1", "id2"] + feature_cols + label_col
pred_col_name = "pred"
prediction_path = "/input/path"
prediction_input = spark.read.parquet(prediction_path).select(all_cols)
for column in feature_cols:
prediction_input = prediction_input.withColumn(column, col(column).cast("float"))
def predict_udf(*features):
dmatrix = xgb.DMatrix(list(features))
return float(model.predict(dmatrix)[0])
predict_udf_spark = udf(predict_udf, FloatType())
mask_labeled = prediction_input.filter(col(label_col) != 0)
if mask_labeled.count() > 0:
prediction_labeled = mask_labeled.withColumn(pred_col_name, predict_udf_spark(*[mask_labeled[col] for col in feature_cols]))
prediction_labeled.select(*output_cols).repartition(1000).write.csv(output_path, header=False, mode="append")
字符串
但这段代码给了我以下错误
jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'
型
如何解决此问题?
1条答案
按热度按时间wkyowqbh1#
据我所知,您试图在
spark
上下文中使用xgboost library的xgboost
算法。请注意,xgboost
库中有一个专用的spark
实现,您的代码似乎没有使用它。(从您的predict_udf
函数中,我了解到您正在尝试处理pyspark
数据,执行预测,并将预测转换回pyspark
)。您可以使用SparkXGBClassifier
来适应您的模型并应用预测,从而使代码变得更加简单。下面你可以找到一个完全可复制的例子,展示如何拟合
SparkXGBClassifier
并使用它的预测,其中代码是用pyspark==3.5.0
,xgboost=2.0.0
编写的,请注意,我的示例将机器学习部分保持在绝对最低限度,并且没有展示如何训练机器学习模型。(例如,实际上没有模型评估,超参数调整,验证等)请注意,由于您没有提供任何示例数据或有关依赖关系的信息,因此很难复制您的问题。使用了我的
SparkXGBClassifier
示例中的数据后,我从这个函数中得到一个错误:字符串
这给了我一个错误:
型
如何在(可能)类似于您的数据上训练
SparkXGBClassifier
的可重现示例:型