使用apache flink流处理缓冲转换的消息(例如,1000 count)

2fjabf4q  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(306)

我正在使用apache flink进行流处理。
在订阅来自源(例如:kafka、aws kinesis数据流)的消息,然后在流数据上使用flink操作符应用转换、聚合等之后,我希望缓冲最终消息(例如:计数为1000),并在单个请求中将每个批发布到外部rest api。
如何在apacheflink中实现缓冲机制(每1000条记录创建一个批)?
flink PipiLine:流源-->使用运算符转换/减少-->缓冲区1000消息-->post to rest api
谢谢你的帮助!

06odsfpq

06odsfpq1#

我将创建一个状态为的接收器,它将保留传入的消息。当计数足够高(1000)时,接收器发送批。状态可以在内存中(例如,一个示例变量包含一个消息的arraylist),但是您应该使用检查点,以便在发生某种故障时可以恢复该状态。
当您的接收器具有checkpointed状态时,它需要实现checkpointedfunction(在org.apache.flink.streaming.api.checkpoint中),这意味着您需要向接收器添加两个方法:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkpointedState.clear();

    // HttpSinkStateItem is a user-written class 
    // that just holds a collection of messages (Strings, in this case)
    //
    // Buffer is declared as ArrayList<String>

    checkpointedState.add(new HttpSinkStateItem(buffer));

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // Mix and match different kinds of states as needed:
    //   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
    //        - types are list and union        
    //   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
    //        - types are value, list, reducing, aggregating and map
    //   - Distinguish between state data using state name (e.g. "HttpSink-State")      

    ListStateDescriptor<HttpSinkStateItem> descriptor =
        new ListStateDescriptor<>(
            "HttpSink-State",
            HttpSinkStateItem.class);

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {

        for (HttpSinkStateItem item: checkpointedState.get()) {
            buffer = new ArrayList<>(item.getPending());  
        }

    }       

}

如果计数未达到阈值,还可以使用接收器中的计时器(如果输入流已设置键控/分区)定期发送。

相关问题