如何将多个ml管道(模型)应用于同一个spark流

mepcadol  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(379)

我有一个用例,在这个用例中,我必须在同一个Spark流(从kafka获取)上应用多个已经训练过的模型(例如m1,m2,…mn)。
使用隔离林算法对模型进行训练:https://github.com/titicaca/spark-iforest
我在这里发现了和我的案子相似的东西https://www.youtube.com/watch?v=ehrhqpcdldi但不幸的是,我不知道genesys公司(前altocloud)是否将这个api(streampipeline,异构管道)开源。
我用上面的模式代码处理了这个问题,但我不知道它有多理想。

//read the stream
val kafkaStreamDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", broker)
      .option("subscribe", "topic")
      .load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the same stream, otherwise blocked??
 myModels.par.foreach(lm => {

     //load the model     
     val model = PipelineModel.load(lm)

      kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
        //apply model
        val pdf = model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS value").write
          .format("json")
          .save("anom/" + lm +  System.currentTimeMillis())
      }).start().awaitTermination()
    })

问题:1。因此,我想知道是否有任何sparkapi来处理这样的用例?
如果是,我在哪里可以找到它?
如果没有,我如何才能最佳地实现这一点?
如有任何意见和建议,我们将不胜感激。

oug3syen

oug3syen1#

如果可以这样做。。。。但是如果你的处理时间超过了接收时间呢。消息将堆积起来,这将导致流接收速度减慢。iforest使用树结构来建模数据。完成这个算法需要一些时间。
我更喜欢像hdfs分区存储。。。并以固定的时间间隔分批地在其上施加ml。这样您就可以毫无延迟地接收消息并有效地处理它们。

相关问题