在windowedstream中查找计数-flink

ycl3bljg  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(241)

我对溪流的世界还很陌生,我在第一次尝试时就面临着一些问题。
更具体地说,我正在尝试使用flink在滑动窗口中实现count和groupby功能。
我是以正常的方式做的 DateStream 但我不能让它在一个 WindowedStream .
你对我怎么做有什么建议吗?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
          record.get
        )
      }

val result: DataStream[((String, Response), Int)] = parsedStream
      .map((_, 1))
      .keyBy(_._1._1)
      .sum(1)

// The output of result is 
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)

result
      .keyBy(_._1._1)
      .timeWindow(Time.seconds(5))

//the following part doesn't compile

      .apply(
        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
          def apply(
                   key: Tuple,
                   window: TimeWindow,
                   values: Iterable[(String, Response)],
                   out: Collector[(String, Int)]
                   ) {}
        }
      )

编译错误:

overloaded method value apply with alternatives:
  [R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
  [R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
      .apply(
l2osamch

l2osamch1#

我已经试过你的代码,发现了错误,似乎你有一个错误时,宣布你的类型 WindowFunction .
文档中说 WindowFunctionWindowFunction[IN, OUT, KEY, W <: Window] . 现在,如果你看看你的代码 IN 是正在计算windows的数据流的类型。流的类型是 ((String, Response), Int) 而不是代码中声明的那样 (String, Int) .
如果要将未编译的部分更改为:

.apply(new WindowFunction[((String, Response), Int), (String, Response), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[((String, Response), Int)], out: Collector[(String, Response)]): Unit = ???
})

编辑:对于第二个例子,错误发生的原因大体相同。当你使用 keyByTuple 有两个可能的函数可以使用 keyBy(fields: Int*) ,它使用integer访问元组的字段,使用提供的索引(这就是您所使用的)。还有 keyBy(fun: T => K) 提供一个函数来提取将要使用的密钥。
但是这些函数之间有一个重要的区别,其中一个函数返回key作为 JavaTuple 另一个返回键的确切类型。所以基本上如果你改变 StringTuple 在您的简化示例中,它应该编译得很清楚。

ergxz8rk

ergxz8rk2#

这是一个我们可以研究的更简单的例子

val source: DataStream[(JsonField, Int)] = env.fromElements(("hello", 1), ("hello", 2))

    val window2 = source
      .keyBy(0)
      .timeWindow(Time.minutes(1))
      .apply(new WindowFunction[(JsonField, Int), Int, String, TimeWindow] {})

相关问题