flink中有用的调试

oxf4rvwz  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(413)

我正在开发一个新的flink流应用程序,无法通过调试逐步完成代码中更关键的部分。
这是我的主程序(删除了一些部分):

def main(args: Array[String]) {

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setStateBackend(new RocksDBStateBackend(statePath))

env.addSource(new KafkaConsumer().getKafkaKeyedConsumer(inTopic, inBrokers))
    .map {
      tup => (tup._2.get("payload").get("itemId").asText, tup._2.get("payload").get("version").asLong, tup._2, tup._1)}
    .keyBy(0)
    .flatMap({
      new FilterPastVersions()
    })
      .print()

env.execute("My Program")

这里是 FilterPastVersions 班级:

class FilterPastVersions extends RichFlatMapFunction[(String, Long, ObjectNode, String), (String, ObjectNode)] {

  private var version: ValueState[Long] = _

  override def flatMap(input: (String, Long, ObjectNode, String), out: Collector[(String, ObjectNode)]): Unit = {

    // access the state value
    val tmpCurrentVersion = version.value()

    // If it hasn't been used before, it will be null
    if (tmpCurrentVersion == null || input._2 > tmpCurrentVersion){
      version.update(input._2)
      out.collect((input._4, input._3))
    }
  }

  override def open(parameters: Configuration): Unit = {
    val versionDesc = new ValueStateDescriptor[Long]("version", createTypeInformation[Long])
    versionDesc.setQueryable("version-state")

    version = getRuntimeContext.getState(versionDesc)
  }
}

如果我在main函数的每一行都设置了一个断点,那么执行就会在每个断点处停止。但是,直到之后才真正处理数据 env.execute ,因此这些断点不显示任何执行。
如果我在 flatmap 的功能 FilterPastVersions ,这些断点永远不会命中。这个程序确实成功地打印了来自Kafka的消息。
我是错过了什么,还是这是Flink的局限?我正在使用intellij,并尝试了远程调试器,以及只需单击应用程序配置的debug按钮。

wwtsj6pe

wwtsj6pe1#

这是因为flink程序执行得很慢。因此,调试会话将只执行flink管道的声明部分。流处理本身是在 execute() 方法。
检查文档(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html)
所有flink程序都是延迟执行的:当程序的main方法被执行时,数据加载和转换不会直接发生。相反,每个操作都会被创建并添加到程序的计划中。这些操作实际上是在执行环境中的execute()调用显式触发执行时执行的。程序是在本地执行还是在集群上执行取决于执行环境的类型
惰性评估允许您构建复杂的程序,flink将其作为一个整体规划的单元执行。

相关问题