我正在使用apache flink进行流处理。
在订阅来自源(例如:kafka、aws kinesis数据流)的消息,然后在流数据上使用flink操作符应用转换、聚合等之后,我希望缓冲最终消息(例如:计数为1000),并在单个请求中将每个批发布到外部rest api。
如何在apacheflink中实现缓冲机制(每1000条记录创建一个批)?
flink PipiLine:流源-->使用运算符转换/减少-->缓冲区1000消息-->post to rest api
谢谢你的帮助!
我正在使用apache flink进行流处理。
在订阅来自源(例如:kafka、aws kinesis数据流)的消息,然后在流数据上使用flink操作符应用转换、聚合等之后,我希望缓冲最终消息(例如:计数为1000),并在单个请求中将每个批发布到外部rest api。
如何在apacheflink中实现缓冲机制(每1000条记录创建一个批)?
flink PipiLine:流源-->使用运算符转换/减少-->缓冲区1000消息-->post to rest api
谢谢你的帮助!
1条答案
按热度按时间06odsfpq1#
我将创建一个状态为的接收器,它将保留传入的消息。当计数足够高(1000)时,接收器发送批。状态可以在内存中(例如,一个示例变量包含一个消息的arraylist),但是您应该使用检查点,以便在发生某种故障时可以恢复该状态。
当您的接收器具有checkpointed状态时,它需要实现checkpointedfunction(在org.apache.flink.streaming.api.checkpoint中),这意味着您需要向接收器添加两个方法:
如果计数未达到阈值,还可以使用接收器中的计时器(如果输入流已设置键控/分区)定期发送。