oozie协调器如何将过去的数据提供给mapreduce作业?

sycxhyv7  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(333)

我正在尝试创建ooize协调器。问题是我已经有了等待使用oozie处理的暂存数据。
想象一下这样的情况。
当前日期:2013年3月1日(2013年3月1日)
我有这些输入目录:
/登台/登陆/来源/xvlr/2013/02/01/00(2013年2月1日,一天的第一个小时)/登台/登陆/来源/xvlr/2013/02/01/01
/登台/着陆/震源/xvlr/2013/02/01/02
/登台/着陆/震源/xvlr/2013/02/01/03
/登台/着陆/震源/xvlr/2013/02/01/04
....
/登台/着陆/震源/xvlr/2013/02/28/00
...
/登台/着陆/震源/xvlr/2013/02/28/23
我希望我的oozie协调器使用所有先前创建的着陆数据并生成这样的输出:

/masterdata/source/xvlr/2013/02/01/00 
/masterdata/source/xvlr/2013/02/01/01
/masterdata/source/xvlr/2013/02/01/02
/masterdata/source/xvlr/2013/02/01/03
/masterdata/source/xvlr/2013/02/01/04
....
/masterdata/source/xvlr/2013/02/28/00
...
/masterdata/source/xvlr/2013/02/28/23

然后我希望我的协调器每小时运行一次,并为主数据生成新的输出。
如何使用协调器规范?这是我的协调员。它什么也不做。它确实到达了我需要的时间,然后等待。它不能启动工作。
请给我建议。

<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
                 start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow"  xmlns="uri:oozie:coordinator:0.3">
    <controls>
        <timeout>5</timeout>
        <concurrency>4</concurrency>
    </controls>

    <datasets>
        <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>

    </datasets>

    <input-events>
        <data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>

    <output-events>
        <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
            <configuration>
                <property>
                    <name>inputDir</name>
                    <value>${coord:dataIn('xvlrInputEvent')}</value>
                </property>
                <property>
                    <name>outputDir</name>
                    <value>${coord:dataOut('xvlrOutputEvent')}</value>
                </property>

            </configuration>

        </workflow>
    </action>
</coordinator-app>
6yjfywim

6yjfywim1#

以下是正确的解决方案(它可以工作几天:):

<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
                 start="2013-03-07T16:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow"  xmlns="uri:oozie:coordinator:0.3">
    <controls>
        <timeout>3</timeout>
        <concurrency>1</concurrency>
    </controls>

    <datasets>
        <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>

    </datasets>

    <input-events>
        <data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>

    <output-events>
        <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
            <configuration>
                <property>
                    <name>inputDir</name>
                    <value>${coord:dataIn('xvlrInputEvent')}</value>
                </property>
                <property>
                    <name>outputDir</name>
                    <value>${coord:dataOut('xvlrOutputEvent')}</value>
                </property>
            </configuration>

        </workflow>
    </action>
</coordinator-app>

它有什么作用?
一开始是2013-03-07t16:35z,所以之前
收集的数据已通过底层工作流(具有解析功能的mr作业调用)传递
在处理“过去时间数据集”(数据集时间小于当前时间)时,工作流正在逐个运行:它确实消耗了/pastdate/hour\u 00,然后它立即开始消耗/pastdate/hour\u 01,e.t.c。
当协调器到达当前时间时,它开始每小时调用工作流(按设计:05:35,06:35。。。23:35).
请参阅超时声明:我确实缺少数据集:例如,3月1日的第10个小时没有数据。工作流确实等了3分钟就死了。
问题解决了。

相关问题