apache-flink-预测处理

2vuwiymt  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(396)

我目前正在使用ApacheFlink的svm类来预测一些文本数据。
该类提供了一个predict函数,该函数将数据集[vector]作为输入,并将数据集[prediction]作为结果提供给我。到现在为止,一直都还不错。
我的问题是,我没有上下文哪个预测属于哪个文本,并且我不能在predict()函数中插入文本以使其随后生效。
代码:

val tweets: DataSet[(SparseVector, String)] =
        source.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
                .map(tweet => (featureVectorService.transform(tweet._2))

    model.predict(tweets).print

result example:
(SparseVector((462,8.73165920153676), (10844,8.508515650222549), (15656,2.931052542245018)),-1.0)

有没有办法把其他数据放在预测的旁边,以便把所有数据都放在一起?因为没有上下文的预测对我没有帮助。
或者有一种方法可以预测一个向量而不是一个数据集,我可以调用上面map函数中的函数。

webghufk

webghufk1#

这个 SVM 预测器期望输入一个子类型的 Vector . 因此,有两种方法可以解决这个问题:
创建子类型 Vector 它包含作为标记的tweet文本。然后它将通过预测器循环。这种方法的优点是不需要额外的操作。但是,需要定义新的类和实用程序来用标记表示不同的向量类型:

val env = ExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements("foobar", "barfo", "test")

val vectorizedInput = input.map(word => {
  val value = word.chars().sum()
  new DenseVectorWithTag(Array(value), word)
})

val svm = SVM().setBlocks(env.getParallelism)

val weights = env.fromElements(DenseVector(1.0))

svm.weightsOption = Option(weights) // skipping the training here

val predictionResult: DataSet[(DenseVectorWithTag, Double)] = svm.predict(vectorizedInput)

class DenseVectorWithTag(override val data: Array[Double], tag: String)
  extends DenseVector(data) {
  override def toString: String = "(" + super.toString + ", " + tag + ")"
}

加入预测 DataSet 与输入 DataSet 关于矩阵的向量化表示 tweets . 这种方法的优点是我们不需要引入新的类。我们为此付出的代价是额外的联接操作,这可能会很昂贵:

val input = env.fromElements("foobar", "barfo", "test")

val vectorizedInput = input.map(word => {
  val value = word.chars().sum()
  (DenseVector(value), word)
})

val svm = SVM().setBlocks(env.getParallelism)

val weights = env.fromElements(DenseVector(1.0))

svm.weightsOption = Option(weights) // skipping the training here

val predictionResult = svm.predict(vectorizedInput.map(a => a._1))
val inputWithPrediction: DataSet[(String, Double)] = vectorizedInput
  .join(predictionResult)
  .where(0)
  .equalTo(0)
  .apply((t, p) => (t._2, p._2))

相关问题