在flink中调试自定义管道转换器

q9yhzks0  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(371)

我试图在flink中实现一个定制的转换器,但当我试图执行它时,它似乎 fit 操作从未被调用。以下是我迄今为止所做的:

class InfoGainTransformer extends Transformer[InfoGainTransformer] {

  import InfoGainTransformer._

  private[this] var counts: Option[collection.immutable.Vector[Map[Key, Double]]] = None

  // here setters for params, as Flink does

}

object InfoGainTransformer {

  // ====================================== Parameters =============================================
  // ...

  // ==================================== Factory methods ==========================================
  // ...

  // ========================================== Operations =========================================

  implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
    override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
      val counts = collection.immutable.Vector[Map[Key, Double]]()
      input.map {
        v =>
          v.vector.map {
            case (i, value) =>
              println("INSIDE!!!")
              val key = Key(value, v.label)
              val cval = counts(i).getOrElse(key, .0)
              counts(i) + (key -> cval)
          }
      }
    }
  }

  implicit def fitVectorInfoGain[T <: Vector] = new FitOperation[InfoGainTransformer, T] {
    override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[T]): Unit = {
      input
    }
  }

  implicit def transformLabeledVectorsInfoGain = {
    new TransformDataSetOperation[InfoGainTransformer, LabeledVector, LabeledVector] {
      override def transformDataSet(
                                     instance: InfoGainTransformer,
                                     transformParameters: ParameterMap,
                                     input: DataSet[LabeledVector]): DataSet[LabeledVector] = input
    }
  }

  implicit def transformVectorsInfoGain[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] = {
    new TransformDataSetOperation[InfoGainTransformer, T, T] {
      override def transformDataSet(instance: InfoGainTransformer, transformParameters: ParameterMap, input: DataSet[T]): DataSet[T] = input
    }
  }
}

然后我试着用两种方法:

val scaler = StandardScaler()
val polyFeatures = PolynomialFeatures()
val mlr = MultipleLinearRegression()
val gain = InfoGainTransformer().setK(2)

// Construct the pipeline
val pipeline = scaler
  .chainTransformer(polyFeatures)
  .chainTransformer(gain)
  .chainPredictor(mlr)

val r = pipeline.predict(dataSet map (_.vector))
r.print()

只有我的变压器:

pipeline.fit(dataSet)

在这两种情况下,当我在 fitLabeledVectorInfoGain ,例如在 input.map ,调试器在那里停止,但是如果我也在嵌套Map中设置断点,例如bellow println("INSIDE!!!") ,它永远不会停在那里。
有人知道我如何调试这个自定义转换器吗?

d7v8vwbk

d7v8vwbk1#

现在看来它起作用了。我想发生的事情是我没有实施正确的计划 FitOperation 因为示例状态中没有保存任何内容,所以现在执行以下操作:

implicit def fitLabeledVectorInfoGain = new FitOperation[InfoGainTransformer, LabeledVector] {
    override def fit(instance: InfoGainTransformer, fitParameters: ParameterMap, input: DataSet[LabeledVector]): Unit = {
      //      val counts = collection.immutable.Vector[Map[Key, Double]]()
      val r = input.map {
        v =>
          v.vector.foldLeft(Map.empty[Key, Double]) {
            case (m, (i, value)) =>
              println("INSIDE fit!!!")
              val key = Key(value, v.label)
              val cval = m.getOrElse(key, .0) + 1.0
              m + (key -> cval)
          }
      }
      instance.counts = Some(r)
    }
  }

现在调试器在所有断点和 TransformOperation 它也被称为。

相关问题