如何在scala中用flink应用一个简单的过滤器

vlju58qv  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(377)

我用的是老版本的flink。我升级到了1.2.0,并且我在过滤器方面遇到了一些问题。
我有一个日志数据流,它工作得很好:

val logs: DataStream[Log] = env.addSource(new LogSource(
      data, delay, factor))

 // DISPLAY TUPLE IN CONSOLE
 logs.print()

 // EXECUTE SCRIPT
 env.execute("stream")

我当然已经阅读了文件,其中显示:

dataStream.filter { _ != 0 }

我试过很多类似的方法:

val cleanLogs = logs.filter { _.isComplete }

但我有以下错误:
类型不匹配,应为:filterfunction[log],actual:(any)=>an
所以我看不到文档和这个错误之间的联系。有什么帮助吗?举例说明?
谢谢

eqqqjvef

eqqqjvef1#

问题首先是进口错误 StreamExecutionEnvironment 导致这个问题的基本函数是 filter .
当我使用旧版本的flink时 LocalExecutionEnvironment flink 1.x中不再提供的类。
取而代之的是: StreamExecutionEnvironment.createLocalEnvironment(1)

相关问题