我正试图建立一个非常基本的flink工作。尝试运行时,出现以下错误:
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)
错误由以下代码引起:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
streamExecutionEnvironment.execute("Test Job")
当我添加一个 print()
调用流的结尾:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()
streamExecutionEnvironment.execute("Test Job")
我不明白为什么 print()
解决了这个问题。在引入接收器之前,流拓扑不会处理其任何操作符吗?是 print()
在这里当Flume?任何帮助都将不胜感激。谢谢。
1条答案
按热度按时间yftpprvb1#
在程序设计语言理论中,延迟求值(lazy evaluation)或按需调用(call by needed)是一种求值策略,它将表达式的求值延迟到需要它的值时进行,并避免了重复求值。懒惰评价的对立面是渴望评价,有时称为严格评价。延迟评估的好处包括:
将控制流(结构)定义为抽象而不是原语的能力。
定义潜在无限数据结构的能力。这允许更直接地实现某些算法。
通过避免不必要的计算和在计算复合表达式时避免错误条件,性能得到了提高。
延迟计算可以减少内存占用,因为值是在需要时创建的。然而,延迟求值很难与诸如异常处理和输入/输出之类的必需特性结合起来,因为操作的顺序变得不确定。
通常,flink将操作分为两类:转换操作和sink操作。正如您所猜测的,flink转换是懒惰的,这意味着在调用sink操作之前不会执行它们。
flink程序是在分布式集合上实现转换的常规程序(例如,过滤、Map、更新状态、连接、分组、定义窗口、聚合)。集合最初是从源创建的(例如,通过从文件、Kafka主题或本地内存集合中读取)。结果通过接收器返回,例如,接收器可以将数据写入(分布式)文件或标准输出(例如,命令行终端)。