ruby 向ActiveJob传递成功和失败处理程序

2w2cym1i  于 2023-04-20  发布在  Ruby
关注(0)|答案(1)|浏览(98)

我有一个ActiveJob,它应该通过HTTP从外部系统加载一段数据。当这个作业完成时,我想排队第二个作业,它会进行一些后处理,然后将数据提交给不同的外部系统。
我不想让第一份工作知道第二份工作,因为
1.封装
1.可重用性
1.基本上这和第一份工作没关系
同样地,我不希望第一个作业关心数据加载失败时接下来会发生什么--也许用户得到通知,也许我们在超时后重试,也许我们只是记录它并放弃--同样,它可能会根据异常的细节而变化,并且作业不需要包括处理它的逻辑或到其他系统的连接。
在Java中(这是我最有经验的地方),我可以使用类似Guava的ListenableFuture来添加成功和失败回调:

MyDataLoader loader = new MyDataLoader(someDataSource)
ListenableFuture<Data> future = executor.submit(loader);
Futures.addCallback(future, new FutureCallback<Data>() {
    public void onSuccess(Data result) {
        processData(result);
    }
    public void onFailure(Throwable t) {
        handleFailure(t);
    }
});

然而,ActiveJob似乎并不提供这种外部回调机制--我从“Active Job Basics”的相关章节中可以看出,after_performrescue_from只能从作业类内部调用,而after_peform并不能区分成功和失败。
所以我能想到的最好的方法(我并不是说它很好)是将几个lambda传递到作业的perform方法中,如下所示:

class MyRecordLoader < ActiveJob::Base

  # Loads data expensively (hopefully on a background queue) and passes
  # the result, or any exception, to the appropriate specified lambda.
  #
  # @param data_source [String] the URL to load data from
  # @param on_success [-> (String)] A lambda that will be passed the record
  #   data, if it's loaded successfully
  # @param on_failure [-> (Exception)] A lambda that will be passed any
  #   exception, if there is one
  def perform(data_source, on_success, on_failure)
    begin
      result = load_data_expensively_from data_source
      on_success.call(result)
    rescue => exception
      on_failure.call(exception)
    end
  end

end

(Side注:我不知道yardoc的语法是什么来声明lambdas作为参数的。这看起来正确吗,或者,失败了,似乎合理?)
然后调用者必须将这些传入:

MyRecordLoader.perform_later(
  some_data_source,
  method(:process_data),
  method(:handle_failure)
)

这并不可怕,至少在调用端是这样,但看起来很笨拙,我不禁怀疑这其中有一个我没有找到的共同模式。我有点担心,作为一个Ruby/Rails新手,我只是让ActiveJob去做一些它一开始就不想做的事情。我发现的所有ActiveJob示例都是'解雇和忘记' -异步的。返回”一个结果似乎不是一个ActiveJob用例。
此外,我不清楚这是否适用于像Resque这样在单独进程中运行作业的后端。
什么是“Ruby方式”来做到这一点?

**更新:**作为dre-hh的hinted at,ActiveJob在这里并不是合适的工具。它也不可靠,而且对于这种情况来说过于复杂。我改用Concurrent Ruby,它更适合用例,而且,由于任务大多是IO绑定的,即使在MRI上也足够快,despite the GIL
**更新(2023-04-19):**对于遇到这个问题的任何人来说,我遇到了类似的要求,在这种情况下,我需要长时间运行的后台作业,这些作业可以在服务器重新部署或类似情况后重新启动,并且由于我们使用PostgreSQL作为数据库后端,我能够通过ActiveJob+GoodJob的批处理支持实现这一点。

xienkqul

xienkqul1#

ActiveJob不像future或promise那样是一个异步库。
它只是一个在后台执行任务的接口。当前线程/进程不会收到此操作的结果。
例如,当使用Sidekiq作为ActiveJob队列时,它会将perform方法的参数序列化到redis store中。另一个在Rails应用程序上下文中运行的守护进程将监视redis队列,并使用序列化的数据示例化您的worker。
所以传递回调函数可能是好的,但是为什么要把它们作为另一个类的方法呢?如果回调函数是动态的(在不同的调用中改变),传递回调函数是有意义的。但是,当你在调用类上实现它们时,考虑把这些方法移动到你的job worker类中。

相关问题