是否可以实现上述场景?系统从一个键值对开始,并将发现新的键值对。首先,键值对的数量将增加,然后在迭代过程中收缩。更新:为了支持迭代,我不得不转向flink流媒体。但我会和Kafka一起尝试!
krcsximq1#
使用apache flink,可以通过 iterate api调用。这个 iterate 方法需要一个阶跃函数,给定输入流的参数,该函数产生反馈流和输出流。前一个流被反馈给step函数,后一个流被发送给下游操作符。一个简单的例子如下所示:
iterate
val env = StreamExecutionEnvironment.getExecutionEnvironment val input = env.fromElements(1).map(x => (x, math.random)) val output = input.iterate { inputStream => val iterationBody = inputStream.flatMap { randomWalk => val (step, position) = randomWalk val direction = 2 * (math.random - 0.5) val bifurcate = math.random >= 0.75 Seq( Some((step + 1, position + direction)), if (bifurcate) Some((step + 1, position - direction)) else None).flatten } val feedback = iterationBody.filter { randomWalk => math.abs(randomWalk._2) < 1.0 } val output = iterationBody.filter { randomWalk => math.abs(randomWalk._2) >= 1.0 } (feedback, output) } output.print() // execute program env.execute("Random Walk with Bifurcation")
在这里,我们计算一个随机游动,在这里,我们随机分割我们的游动,向相反的方向前进。当一个随机游动的绝对位置值大于或等于 1.0 .
1.0
1条答案
按热度按时间krcsximq1#
使用apache flink,可以通过
iterate
api调用。这个iterate
方法需要一个阶跃函数,给定输入流的参数,该函数产生反馈流和输出流。前一个流被反馈给step函数,后一个流被发送给下游操作符。一个简单的例子如下所示:
在这里,我们计算一个随机游动,在这里,我们随机分割我们的游动,向相反的方向前进。当一个随机游动的绝对位置值大于或等于
1.0
.