正如标题所说,我想在spark应用程序之间进行通信,这基本上意味着我有一个程序生成它的输出到stdout,我想把结果作为输入传递给另一个spark应用程序。
我现在将介绍我们团队中的问题和解决方案:
我们正在使用cdh5.1x,使用hdfs作为我们的数据库。我们的数据是按以下方式存储的:一个目录被称为“暂存”的数据流进程中的许多小文件,一个目录被称为“成功”的“生产”数据,这是相同的数据聚合到一个(或多个)更大的文件。数据存储在以下路径中:
/basePath/staged/{sensor_name}/Y/M/D/H
/basePath/success/{sensor_name}/Y/M/D/H
此外,我们还在hdfs上使用hive分区来向客户机屏蔽hdfs结构。
我们有3个产品,每个产品在将数据从“staged”目录移动到“success”目录的过程中有不同的部分:
聚合-从“staged”目录获取数据并将其合并到一个(或多个)文件中。
hivepartitioner-根据聚合的输出添加/删除配置单元分区。
看门人-根据聚合的输出删除目录。
管道是:聚合->hivepartitioner->看门人。
我们使用气流来安排我们的管道与DAG。
所以,现在我的问题是如何在产品之间进行沟通。聚合应该将结果打印到stdout,hivepartitioner和看门人应该接收结果作为输入。
以下是我们尝试/想到的:
选项1:在airflow中创建一个dag,它读取聚合的标准输出,并使用xcom(airflow在任务之间的通信方式)传输数据。这是个好主意,但问题是,airflow已经在处理stdout的日志,所以我无法访问输出。
选项2:在气流中生成以下dag:aggregation->getyarnlog->hivepartitioner->jantior。
聚合将把它的输出打印到stdout,stdout被写入集群中的日志文件中,并且可以通过yarn api访问。提取日志,处理输出并将其发送给hivepartitioner和看门人。
这就是我们所想到的,而不需要运行单独的流程,基本上也不需要制造自己的气流。
我想听听你们对我们的选择的看法,如果你们知道一个更好的方法来实现spark应用程序之间的通信。重要的是,所有3个产品必须是独立的,而不是依赖于其他产品(解耦),而且每个应用程序的输入都应该传递到main方法中。
谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!