我正试图在flink中实现一个批处理数据处理管道,在这里我需要
读取超过9000万行的文件
执行一些转换
在ElasticSearch中发布数据
向下游系统发布带有一些元数据的通知,说明作业已完成,并且他们可以从elasticsearch中读取
问题1:我没有找到任何简单的方法将数据发布到elasticsearch,flink dataset api中没有提供现成的输出函数将数据发布到elasticsearch,我只能找到写入文件或实现自定义函数的选项
问题2:为了解决问题1,我在Map函数中将数据发布到elasticsearch。map运算符是用parallelism 4定义的。我正在用parallelism 1定义一个输出函数,但我不知道何时所有操作符示例都处理完毕,以便我可以向下游发布消息。
我不想为主题上的每个事件发送通知,但只需要一条主题消息,确认所有9000万行都已处理,消费者现在可以从elasticsearch查询数据。
有什么办法解决这个问题吗?
其次,在map函数中编写elasticsearch似乎不太正确,它不是一个转换,而是一个输出,因为我需要两个连续的接收器,有没有更干净的方法呢?下面是示例流程图
1条答案
按热度按时间dgiusagp1#
我认为解决你的案子最好的办法是;
将文件加载到表
将表转换为数据流时,可以使用表api或在转换后操作数据。
使用弹性连接器将数据插入弹性体
如果您想了解作业是否已完成,可以检查numrecordsoutpersecond度量。