luigi任务去了哪里?

ffx8fchx  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(346)

第一次进入luigi(和python!)还有一些问题。相关代码为:

from Database import Database
import luigi

class bbSanityCheck(luigi.Task):

  conn = luigi.Parameter()
  date = luigi.Parameter()
  def __init__(self, *args,**kwargs):
    super(bbSanityCheck, self).__init__(*args,**kwargs)
    self.has_run = False

  def run(self):
    print "Entering run of bb sanity check"
    # DB STUFF HERE THAT DOESN"T MATTER
   print "Are we in la-la land?"

  def complete(self):
    print "BB Sanity check being asked for completeness: " , self.has_run
    return self.has_run

class Pipeline(luigi.Task):
  date = luigi.DateParameter()

  def requires(self):
    db = Database('cbs')
    self.conn = db.connect()
    print "I'm about to yield!"
    return bbSanityCheck(conn = self.conn, date = self.date)

  def run(self):
    print "Hello World"
    self.conn.query("""SELECT * 
              FROM log_blackbook""")
    result = conn.store_result()

    print result.fetch_row()

  def complete(self):
    return False

if __name__=='__main__':
  luigi.run()

输出在这里(删除了相关的db返回,原因是):

DEBUG: Checking if Pipeline(date=2013-03-03) is complete
I'm about to yield!
INFO: Scheduled Pipeline(date=2013-03-03)
I'm about to yield!
DEBUG: Checking if bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03) is complete
BB Sanity check being asked for completeness:  False
INFO: Scheduled bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 5150] Running   bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
Entering run of bb sanity check
Are we in la-la land?
INFO: [pid 5150] Done      bbSanityCheck(conn=<_mysql.connection open to 'sas1.rad.wc.truecarcorp.com' at 223f050>, date=2013-03-03)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread

所以问题是:
1.)为什么“我要屈服”会被打印两次?
2.)为什么“你好世界”从未被印刷?
3.)什么是“1个可能由其他工作人员运行的挂起任务”?
我更喜欢超净输出,因为它更容易维护。我希望我能解决这些问题。
我还注意到需要“yield”或“returnitem,item2,item3”。我读过关于屈服的书,并且理解它。我不明白的是哪种习惯在这里被认为是优越的,或者他们之间的细微差别是我对这门语言还不熟悉的。

mbyulnm0

mbyulnm01#

我想你误解了路易吉的工作原理。
(1) 嗯。。我不确定。在我看来,这更像是在信息和调试中打印相同内容的问题
(2) 所以,你尝试运行依赖于bbsanitycheck运行的管道。complete()永远不会返回true,因为您从未在bbsanitycheck中将has\u run设置为true。因此管道任务永远不能运行和输出helloworld,因为它的依赖关系永远不会完成。
(3) 这可能是因为您有一个挂起的任务(实际上是管道)。但路易吉明白,它是不可能运行和关闭。
我个人不会使用has\u run作为检查任务是否已运行的方法,而是检查此作业的结果是否存在。也就是说,如果这个作业对数据库做了什么,那么complete()应该检查预期的内容是否存在。

相关问题