我正在尝试创建ooize协调器。问题是我已经有了等待使用oozie处理的暂存数据。
想象一下这样的情况。
当前日期:2013年3月1日(2013年3月1日)
我有这些输入目录:
/登台/登陆/来源/xvlr/2013/02/01/00(2013年2月1日,一天的第一个小时)/登台/登陆/来源/xvlr/2013/02/01/01
/登台/着陆/震源/xvlr/2013/02/01/02
/登台/着陆/震源/xvlr/2013/02/01/03
/登台/着陆/震源/xvlr/2013/02/01/04
....
/登台/着陆/震源/xvlr/2013/02/28/00
...
/登台/着陆/震源/xvlr/2013/02/28/23
我希望我的oozie协调器使用所有先前创建的着陆数据并生成这样的输出:
/masterdata/source/xvlr/2013/02/01/00
/masterdata/source/xvlr/2013/02/01/01
/masterdata/source/xvlr/2013/02/01/02
/masterdata/source/xvlr/2013/02/01/03
/masterdata/source/xvlr/2013/02/01/04
....
/masterdata/source/xvlr/2013/02/28/00
...
/masterdata/source/xvlr/2013/02/28/23
然后我希望我的协调器每小时运行一次,并为主数据生成新的输出。
如何使用协调器规范?这是我的协调员。它什么也不做。它确实到达了我需要的时间,然后等待。它不能启动工作。
请给我建议。
<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3">
<controls>
<timeout>5</timeout>
<concurrency>4</concurrency>
</controls>
<datasets>
<dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
<uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
<done-flag></done-flag>
</dataset>
<dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
<uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<output-events>
<data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
<instance>${coord:current(0)}</instance>
</data-out>
</output-events>
<action>
<workflow>
<app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>${coord:dataIn('xvlrInputEvent')}</value>
</property>
<property>
<name>outputDir</name>
<value>${coord:dataOut('xvlrOutputEvent')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
1条答案
按热度按时间6yjfywim1#
以下是正确的解决方案(它可以工作几天:):
它有什么作用?
一开始是2013-03-07t16:35z,所以之前
收集的数据已通过底层工作流(具有解析功能的mr作业调用)传递
在处理“过去时间数据集”(数据集时间小于当前时间)时,工作流正在逐个运行:它确实消耗了/pastdate/hour\u 00,然后它立即开始消耗/pastdate/hour\u 01,e.t.c。
当协调器到达当前时间时,它开始每小时调用工作流(按设计:05:35,06:35。。。23:35).
请参阅超时声明:我确实缺少数据集:例如,3月1日的第10个小时没有数据。工作流确实等了3分钟就死了。
问题解决了。