ruby-on-rails 用sidekiq只执行一个重复的任务?

wj8zmpe1  于 2023-10-21  发布在  Ruby
关注(0)|答案(5)|浏览(149)

我有一个在MongoDB上做map/reduce工作的后台工作。当用户向文档中发送更多数据时,它会踢掉在文档上运行的后台作业。如果用户发送多个请求,它将为同一文档启动多个后台作业,但实际上只需要运行一个。有没有一种方法可以防止多个重复示例?我正在考虑为每个文档创建一个队列,并确保在提交新作业之前它是空的。或者我可以设置一个与我的文档ID相同的作业ID,并在提交之前检查是否不存在?
另外,我刚刚发现了一个sidekiq-unique-jobs宝石。但文件是不存在的。这是我想要的吗?

epfja78i

epfja78i1#

我最初的建议是为这个特定的任务使用互斥锁。但是,由于可能有多个应用服务器在运行sidekiq作业,我建议在redis级别上使用。
例如,在sidekiq worker定义中使用redis-semaphore。* 一个未经测试的例子 *:

def perform
  s = Redis::Semaphore.new(:map_reduce_semaphore, connection: "localhost")

  # verify that this sidekiq worker is the first to reach this semaphore.
  unless s.locked?

    # auto-unlocks in 90 seconds. set to what is reasonable for your worker.
    s.lock(90)
    your_map_reduce()
    s.unlock
  end
end

def your_map_reduce
  # ...
end
xqk2d5yq

xqk2d5yq2#

https://github.com/krasnoukhov/sidekiq-middleware
UniqueJobs为作业提供唯一性。
使用
示例工人:

class UniqueWorker
  include Sidekiq::Worker

  sidekiq_options({
    # Should be set to true (enables uniqueness for async jobs)
    # or :all (enables uniqueness for both async and scheduled jobs)
    unique: :all,

    # Unique expiration (optional, default is 30 minutes)
    # For scheduled jobs calculates automatically based on schedule time and expiration period
    expiration: 24 * 60 * 60
  })

  def perform
    # Your code goes here
  end
end
w8f9ii69

w8f9ii694#

你可以这样做,假设你有所有的作业都被添加到排队桶。

class SidekiqUniqChecker
  def self.perform_unique_async(action, model_name, id)
    key = "#{action}:#{model_name}:#{id}"
    queue = Sidekiq::Queue.new('elasticsearch')
    queue.each { |q| return if q.args.join(':') == key }
    Indexer.perform_async(action, model_name, id)
  end
end

上面的代码只是一个示例,但您可以根据需要对其进行调整。
Source

alen0pnh

alen0pnh5#

创建这个类并将其作为计划作业运行(每1分钟),该作业扫描队列并删除重复。这只适用于Sidekiq。

rake任务

namespace :dev do
     task remove_duplicated_jobs: :environment do 
           JobDuplicated.new.jobs.each(&:delete)
      end
  end

/lib/job_duplicated.rb

require 'sidekiq/api'

class JobDuplicated

    def jobs
        
        results = []

        queues.each do |queue|
            jobs = {}

            # Scansiona ogni job nella coda
            queue.each do |job|

                job_name = JSON.parse(job.value)['wrapped']
                arguments = JSON.parse(job.value)['args'][0]['arguments']

                jid = job.jid
                key = [job_name, arguments]

                # Se ho già un job con questo nome e argomenti altrimenti lo aggiungo
                if jobs[key]
                    results << job
                    #job.delete
                else
                    jobs[key] = jid
                end
            end
        end

        results
    end

    private 

        def queues
            Sidekiq::Queue.all
        end
end

相关问题