apache flink进程函数状态未保持该状态

dluptydi  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(342)

我正在为一个 processElement apache flink 1.4中的函数:

public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{

    private ListState<String> listState;

    public void processElement(Tuple2<String, String> tuple2,  Context context, Collector<Tuple2<String, String>> collector) {

        // if the state is empty, start a timer
        if (listState.get().iterator().hasNext() == false)
            context.timerService().registerEventTimeTimer(10000);

        listState.add("someStringToBeStored");

        // ...
    }

}

当计时器过期时,我有这个函数:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
    Iterable<String> strings = listState.get();
    int cnt = 0;
    int totalLength = 0;
    Iterator<String> it = strings.iterator();
    while (it.hasNext()) {
        cnt++;
        totalLength += it.next().length();
    }
    LOGGER.info("cnt is:" + cnt);
    LOGGER.info("totalLength is:" + totalLength);

    // clearing the state
    listState.clear();
}

但是每次运行应用程序时 cnt 始终为1,并且 totalLength 此时已处理的特定字符串的长度。看起来这个状态没有保存在系统中。从这个代码可以清楚地看出我做错了什么吗?

pu82cl6c

pu82cl6c1#

进程函数使用键分区状态,这意味着每个键都有一个单独的列表。我的猜测是没有一个键在10秒的时间内有多个事件。

xt0899hw

xt0899hw2#

你的 ProcessFunctionClass 需要延长flink ProcessFunction .

相关问题