我有一个spark程序,通过ml模型(两个随机林的集合)处理来自kafka的数据流。我的总体rf每几批更新一次,添加(在一个林中)和替换(在另一个林中)决策树。
关键是,分类的处理时间相当大,因为任务反序列化的时间(我猜是以不合理的方式)取决于整个模型中的树的数量。
更具体地说:我使用的是sparkv1.6.1和mllib模型。我“手动”更新第一个rf,方法是训练randomforestmodel的一个示例,然后将其树添加到更新林的数组中。此外,由于后者没有被赋予返回一个类的概率的方法,因此我将其重新编写为
var predictProba = udf((v: Vector) => {
// map data to prediction
val tree_predictions = forest.trees.map(tree => tree.predict(v))
// out
scalarProduct(tree_predictions, tree_weights) // <-- this is a basic scalar product
})
def classify(df: DataFrame): DataFrame = {
if (counter > 0) {
// output: create new columns with predictions
df.withColumn("de_pred_proba", predictProba(df("features")))
}
else {
println(":: rf :: model is not tuned")
// output: if forest is empty returns column of default value -1.0
df.withColumn("de_pred_proba", lit(-1.0))
}
}
对于300/400树的分类持续30秒,任务反序列化时间中值为0.1秒(在本地测试中),这真的正常吗?如何加速?
先谢谢你。
暂无答案!
目前还没有任何答案,快来回答吧!