Spark流:如何将输出反馈到输入

n6lpvg4x  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(289)


是否可以实现上述场景?
系统从一个键值对开始,并将发现新的键值对。首先,键值对的数量将增加,然后在迭代过程中收缩。
更新:为了支持迭代,我不得不转向flink流媒体。但我会和Kafka一起尝试!

krcsximq

krcsximq1#

使用apache flink,可以通过 iterate api调用。这个 iterate 方法需要一个阶跃函数,给定输入流的参数,该函数产生反馈流和输出流。前一个流被反馈给step函数,后一个流被发送给下游操作符。
一个简单的例子如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1).map(x => (x, math.random))

val output = input.iterate {
  inputStream =>
    val iterationBody = inputStream.flatMap {
      randomWalk =>
        val (step, position) = randomWalk
        val direction = 2 * (math.random - 0.5)
        val bifurcate = math.random >= 0.75

        Seq(
          Some((step + 1, position + direction)),
          if (bifurcate) Some((step + 1, position - direction)) else None).flatten
    }

    val feedback = iterationBody.filter {
      randomWalk => math.abs(randomWalk._2) < 1.0
    }

    val output = iterationBody.filter {
      randomWalk => math.abs(randomWalk._2) >= 1.0
    }

    (feedback, output)
}

output.print()

// execute program
env.execute("Random Walk with Bifurcation")

在这里,我们计算一个随机游动,在这里,我们随机分割我们的游动,向相反的方向前进。当一个随机游动的绝对位置值大于或等于 1.0 .

相关问题