我正在开发一个新的flink流应用程序,无法通过调试逐步完成代码中更关键的部分。
这是我的主程序(删除了一些部分):
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setStateBackend(new RocksDBStateBackend(statePath))
env.addSource(new KafkaConsumer().getKafkaKeyedConsumer(inTopic, inBrokers))
.map {
tup => (tup._2.get("payload").get("itemId").asText, tup._2.get("payload").get("version").asLong, tup._2, tup._1)}
.keyBy(0)
.flatMap({
new FilterPastVersions()
})
.print()
env.execute("My Program")
这里是 FilterPastVersions
班级:
class FilterPastVersions extends RichFlatMapFunction[(String, Long, ObjectNode, String), (String, ObjectNode)] {
private var version: ValueState[Long] = _
override def flatMap(input: (String, Long, ObjectNode, String), out: Collector[(String, ObjectNode)]): Unit = {
// access the state value
val tmpCurrentVersion = version.value()
// If it hasn't been used before, it will be null
if (tmpCurrentVersion == null || input._2 > tmpCurrentVersion){
version.update(input._2)
out.collect((input._4, input._3))
}
}
override def open(parameters: Configuration): Unit = {
val versionDesc = new ValueStateDescriptor[Long]("version", createTypeInformation[Long])
versionDesc.setQueryable("version-state")
version = getRuntimeContext.getState(versionDesc)
}
}
如果我在main函数的每一行都设置了一个断点,那么执行就会在每个断点处停止。但是,直到之后才真正处理数据 env.execute
,因此这些断点不显示任何执行。
如果我在 flatmap
的功能 FilterPastVersions
,这些断点永远不会命中。这个程序确实成功地打印了来自Kafka的消息。
我是错过了什么,还是这是Flink的局限?我正在使用intellij,并尝试了远程调试器,以及只需单击应用程序配置的debug按钮。
1条答案
按热度按时间wwtsj6pe1#
这是因为flink程序执行得很慢。因此,调试会话将只执行flink管道的声明部分。流处理本身是在
execute()
方法。检查文档(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html)
所有flink程序都是延迟执行的:当程序的main方法被执行时,数据加载和转换不会直接发生。相反,每个操作都会被创建并添加到程序的计划中。这些操作实际上是在执行环境中的execute()调用显式触发执行时执行的。程序是在本地执行还是在集群上执行取决于执行环境的类型
惰性评估允许您构建复杂的程序,flink将其作为一个整体规划的单元执行。