我有一个用例,在这个用例中,我必须在同一个Spark流(从kafka获取)上应用多个已经训练过的模型(例如m1,m2,…mn)。
使用隔离林算法对模型进行训练:https://github.com/titicaca/spark-iforest
我在这里发现了和我的案子相似的东西https://www.youtube.com/watch?v=ehrhqpcdldi但不幸的是,我不知道genesys公司(前altocloud)是否将这个api(streampipeline,异构管道)开源。
我用上面的模式代码处理了这个问题,但我不知道它有多理想。
//read the stream
val kafkaStreamDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", "topic")
.load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the same stream, otherwise blocked??
myModels.par.foreach(lm => {
//load the model
val model = PipelineModel.load(lm)
kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
//apply model
val pdf = model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS value").write
.format("json")
.save("anom/" + lm + System.currentTimeMillis())
}).start().awaitTermination()
})
问题:1。因此,我想知道是否有任何sparkapi来处理这样的用例?
如果是,我在哪里可以找到它?
如果没有,我如何才能最佳地实现这一点?
如有任何意见和建议,我们将不胜感激。
1条答案
按热度按时间oug3syen1#
如果可以这样做。。。。但是如果你的处理时间超过了接收时间呢。消息将堆积起来,这将导致流接收速度减慢。iforest使用树结构来建模数据。完成这个算法需要一些时间。
我更喜欢像hdfs分区存储。。。并以固定的时间间隔分批地在其上施加ml。这样您就可以毫无延迟地接收消息并有效地处理它们。