apache flink:processwindowfunction不适用

hk8txs48  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(365)

我想用一个 ProcessWindowFunction 在我的apache-flink项目中。但我在使用process函数时遇到了一些错误,请参见下面的代码片段
错误是:
windowedstream,tuple,timewindow>类型中的方法进程(processwindowfunction,r,tuple,timewindow>)不适用于参数(jdbcexample.myprocesswindows)
我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

我的 ProcessWindowFunction :

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}
6kkfgxo0

6kkfgxo01#

费边说:)使用 Tuple 应该管用,但确实涉及到一些丑陋的类型在您的模型 ProcessWindowFunction . 使用 KeySelector 简单,代码更清晰。例如

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

然后,通过上面的内容可以定义 ProcessWindowFunction 比如:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {
0aydgbwb

0aydgbwb2#

问题可能是 ProcessWindowFunction .
您正在按位置引用密钥( keyBy(0) ). 因此,编译器无法推断其类型( String )你需要改变一下 ProcessWindowFunction 收件人:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

通过替换 StringTuple 现在您有了一个可以强制转换到的键的通用占位符 Tuple1<String> 当您需要访问 processElement() 方法:

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

如果定义了 KeySelector<IN, KEY> 函数提取键,因为返回类型 KEYKeySelector 编译器已知。

相关问题