机器学习算法的flink hbase输入

uhry853o  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(439)

我想使用flink hbase插件读取数据,然后作为flink机器学习算法的输入,分别是svm和mlr。现在,我首先将提取的数据写入一个临时文件,然后通过libsvm方法读取它,但我想应该有更复杂的方法。
你有一个代码片段或一个如何做到这一点的想法?

sqyvllje

sqyvllje1#

不需要将数据写入磁盘,然后用 MLUtils.readLibSVM . 原因如下。 MLUtils.readLibSVM 需要一个文本文件,其中每一行都是稀疏特征向量及其相关标签。它使用以下格式表示标签特征向量对:

<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>

哪里 <feature> 是后续项目的索引 value 在特征向量中。 MLUtils.readLibSVM 可以读取此格式的文件,并在 LabeledVector 示例。因此,您将获得 DataSet[LabeledVector] 在读取了libsvm文件之后。这正是你需要的输入格式 SVM 以及 MultipleLinearRegression 预测器。
但是,根据从hbase获得的数据格式,首先必须将数据转换为 libSVM 格式。否则, MLUtils.readLibSVM 无法读取写入的文件。如果您转换数据,那么您也可以直接将数据转换为 DataSet[LabeledVector] 并将其用作flink的ml算法的输入。这样可以避免不必要的磁盘周期。
如果您从hbase a获得 DataSet[String] 每根弦都有 libSVM 格式(请参阅上面的规范),然后您可以应用 map hbase上的操作 DataSet 具有以下Map功能。

val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
  line =>
    // remove all comments which start with a '#'
    val commentFreeLine = line.takeWhile(_ != '#').trim

    if(commentFreeLine.nonEmpty) {
      val splits = commentFreeLine.split(' ')
      val label = splits.head.toDouble
      val sparseFeatures = splits.tail
      val coos = sparseFeatures.map {
        str =>
          val pair = str.split(':')
          require(
            pair.length == 2, 
            "Each feature entry has to have the form <feature>:<value>")

          // libSVM index is 1-based, but we expect it to be 0-based
          val index = pair(0).toInt - 1
          val value = pair(1).toDouble

          (index, value)
      }

      Some((label, coos))
    } else {
      None
    }

// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
  labelCOO =>
    labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))

val labeledVectors: DataSet[LabeledVector] = 
  labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
  var dimension = 0

  override def open(configuration: Configuration): Unit = {
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
  }

  override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
  }
}}.withBroadcastSet(dimensionDS, DIMENSION)

这将把libsvm格式的数据转换成 LabeledVectors .

相关问题