情况很简单。我有3个任务需要执行:
flume任务>>睡眠任务>>http任务
独立地执行这些任务是完美的。如果我通过cli手动启动flume代理(source http,sink hdfs),然后发送curl命令,那么代理会将收到的消息转储到hdfs中。
正如我所说,通过cli手动执行每个操作是无缝的。
然而,通过气流dag安排这一过程是一个挑战。我试过用几种方法构建我的dag,但是都没有用。主要的问题是flume任务(bashoperator)一直处于运行状态,而且永远不会结束。这是有道理的。不应该。
但是dag永远不会继续到下一个节点(sleep30s->sendcurl命令)。
我构建了一个线性依赖关系(flume任务>>睡眠任务>>http任务)-陷入flume任务。
我构造了branchingpythonoperator
分支a=Flume任务
分支b=睡眠任务->http任务。
这样,分支a跳过flume任务,分支b成功执行sleep任务,失败执行http任务。http任务失败,没有代理运行。
然后我决定在根目录下分开,这样flume\u任务就独立了:
Flume任务
睡眠\u任务>>检查\uFlume\u状态>>http \u任务
为最后一个示例提供我的代码,我认为它具有最合理的dag构造。我希望有人能解释一下。我在网上阅读了大量的参考资料,我知道flume是事件驱动的,但是我不明白我的airflow脚本有什么错?
如果有人能给我指出正确的方向,我将不胜感激。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime
from airflow.models import TaskInstance
# Defaults
default_args = {
'start_date': datetime(2019, 11, 11),
'depends_on_past':False
}
# DAGs
dag = DAG('HTTP_2_HDFS', default_args=default_args, schedule_interval='59 * * * *')
# Commands
flume_command = "flume-ng agent --name myAgent --conf conf --conf-file /home/hadoop/flume/conf/http.conf "
sleep_command = "sleep 30 "
http_command = "/home/hadoop/flume/hdfs_test/HTTP_2_HDFS.sh "
# Tasks
def check_status(**kwargs):
flume_task_instance = TaskInstance(flume_task, datetime(2019, 11, 11))
state = flume_task_instance.current_state()
if state == "running":
print("FLUME PROCESS RUNNING !!!")
flume_task = BashOperator(
task_id='FLUME',
bash_command=flume_command,
dag=dag
)
sleep_task = BashOperator(
task_id='SLEEP',
bash_command=sleep_command,
dag=dag
)
http_task = BashOperator(
task_id='HTTP',
bash_command=http_command,
dag=dag
)
check_running_task = PythonOperator(
task_id='CHECK_FLUME_STATUS',
python_callable=check_status,
provide_context=True,
dag=dag
)
# Node Connections
flume_task
sleep_task >> check_running_task >> http_task
# branch = BranchPythonOperator(task_id='BRANCH', provide_context=True, python_callable=check_status, dag=dag)
# branch >> flume_task
# branch >> sleep_task >> http_task
1条答案
按热度按时间k2fxgqgv1#
我没有直接使用flume的经验,因此如果这些假设中的某些假设不成立,我很抱歉,但以我的经验,airflow不能很好地处理长时间运行的流程。
看起来你可能想要一个不同的设计。如果http任务本身在dag中以1分钟的间隔运行呢?它还能和Flume探员说话并按它应该的方式冲洗吗?
所以flume是独立运行的,您不需要睡眠任务,因为您使用的是airflow调度器,以间隔运行。
我们有一个类似的设置,用于访问气流中的Kafka主题。只有耗电元件在气流内部运行。我们在它周围构建了 Package 器来模拟“流”,但它实际上更像是几个重叠的微批处理在运行,给人一种流的错觉。