flink mllib的scala-outofboundsexception

7hiiyaii  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(569)

我正在使用这里提供的movielens数据集为电影制作推荐系统:http://grouplens.org/datasets/movielens/
为了计算这个推荐系统,我使用了scala中flink的ml库,特别是als算法( org.apache.flink.ml.recommendation.ALS ).
我先把这部电影的收视率Map成 DataSet[(Int, Int, Double)] 然后创建一个 trainingSet 和一个 testSet (见下面的代码)。
我的问题是当我使用 ALS.fit 功能与整个数据集(所有的评级),但如果我只是删除一个评级,拟合功能不再工作,我不明白为什么。
你有什么想法吗?:)
使用的代码:
等级.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

预处理.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

处理.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

“但如果我只删除一个评级”

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

错误:
2015年6月19日15:00:24 cogroup(cogroup网址:org.apache.flink.ml.recommendation.als$.updatefactors(als。scala:570))(4/4)切换到失败
java.lang.arrayindexoutofboundsexception:5
在org.apache.flink.ml.recommendation.als$blockrating.apply(als。scala:358)
在org.apache.flink.ml.recommendation.als$$anon$111.cogroup(als。scala:635)
在org.apache.flink.runtime.operators.cogroupdriver.run(cogroupdriver。java:152)
...

alen0pnh

alen0pnh1#

问题在于 first 运算符与 setTemporaryPath Flink参数 ALS 实施。为了理解这个问题,让我快速解释一下阻塞als算法是如何工作的。
交替最小二乘法的分块实现首先将给定的评分矩阵按用户和项目分块。对于这些块,将计算路由信息。此路由信息分别表示哪个用户/项目块从哪个项目/用户块接收哪个输入。然后,开始als迭代。
由于flink的底层执行引擎是一个并行流数据流引擎,它试图以流水线方式执行尽可能多的数据流部分。这要求管道的所有操作员同时在线。这有一个优点,Flink避免实现中间结果,这可能是令人望而却步的大。缺点是可用内存必须在所有运行的操作符之间共享。在als的情况下,个体的大小 DataSet 元素(例如用户/项目块)相当大,这是不需要的。
为了解决这个问题,如果设置了 temporaryPath . 路径定义了中间结果的存储位置。因此,如果您定义了一个临时路径 ALS 首先计算用户块的路由信息并将其写入磁盘,然后计算项目块的路由信息并将其写入磁盘,最后但并非最不重要的是,它开始als迭代,从临时路径读取路由信息。
用户和项目块的路由信息的计算都取决于给定的分级数据集。在您的情况下,当您计算用户路由信息时,它将首先读取分级数据集并应用 first 接线员在上面。这个 first 操作员返回 n -来自基础数据集的任意元素。现在的问题是flink没有存储这个结果 first 用于计算项目路由信息的操作。相反,当您开始计算项目路由信息时,flink将从其源重新执行数据流。这意味着它从磁盘读取分级数据集并应用 first 接线员又来了。这将给你在许多情况下一套不同的评级相比,结果的第一 first 操作。因此,生成的路由信息是不一致的 ALS 失败。
你可以通过将结果具体化来规避这个问题 first 运算符,并将此结果用作 ALS 算法。对象 FlinkMLTools 包含一个方法 persist 这需要一个 DataSet ,将其写入给定路径,然后返回一个新的 DataSet 上面写着 DataSet . 这允许您分解生成的数据流图。

val firstTrainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .first((ratings.count()-1).toInt)

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")

val als = ALS()
  .setIterations(10)
  .setNumFactors(10)
  .setBlocks(150)
  .setTemporaryPath("/tmp/tmpALS/")

val parameters = ParameterMap()
  .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
  .add(ALS.Seed, 42L)

als.fit(trainingSet, parameters)

或者,您可以尝试离开 temporaryPath 未设置。然后所有步骤(路由信息计算和als迭代)都以流水线方式执行。这意味着,用户和项目路由信息计算都使用由 first 接线员。
flink社区目前正在努力将操作符的中间结果保存在内存中。这将允许锁定 first 所以它不会被计算两次,因此,由于它的不确定性,不会给出不同的结果。

相关问题