flink:流拓扑中没有定义操作符无法执行

sycxhyv7  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(527)

我正试图建立一个非常基本的flink工作。尝试运行时,出现以下错误:

Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)

错误由以下代码引起:

val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")

streamExecutionEnvironment.execute("Test Job")

当我添加一个 print() 调用流的结尾:

val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()

streamExecutionEnvironment.execute("Test Job")

我不明白为什么 print() 解决了这个问题。在引入接收器之前,流拓扑不会处理其任何操作符吗?是 print() 在这里当Flume?任何帮助都将不胜感激。谢谢。

yftpprvb

yftpprvb1#

在程序设计语言理论中,延迟求值(lazy evaluation)或按需调用(call by needed)是一种求值策略,它将表达式的求值延迟到需要它的值时进行,并避免了重复求值。懒惰评价的对立面是渴望评价,有时称为严格评价。延迟评估的好处包括:
将控制流(结构)定义为抽象而不是原语的能力。
定义潜在无限数据结构的能力。这允许更直接地实现某些算法。
通过避免不必要的计算和在计算复合表达式时避免错误条件,性能得到了提高。
延迟计算可以减少内存占用,因为值是在需要时创建的。然而,延迟求值很难与诸如异常处理和输入/输出之类的必需特性结合起来,因为操作的顺序变得不确定。
通常,flink将操作分为两类:转换操作和sink操作。正如您所猜测的,flink转换是懒惰的,这意味着在调用sink操作之前不会执行它们。
flink程序是在分布式集合上实现转换的常规程序(例如,过滤、Map、更新状态、连接、分组、定义窗口、聚合)。集合最初是从源创建的(例如,通过从文件、Kafka主题或本地内存集合中读取)。结果通过接收器返回,例如,接收器可以将数据写入(分布式)文件或标准输出(例如,命令行终端)。

相关问题