ruby 我如何使用sneakes(用于rabbitMQ)对一条消息进行最多5次失败尝试的重新排队?

pod7payv  于 2023-02-18  发布在  Ruby
关注(0)|答案(1)|浏览(139)

我看到旧的gem版本在2015年左右支持在Sneakers.configure参数中定义这类东西,但现在已经不是这样了。所以我试图找到替代方案,但运动鞋wiki没有任何细节,我也找不到任何方法来完成它通过寻找运动鞋的源代码。
我需要我的工作人员能够“知道”尝试发送消息失败的次数,以便选择是重新排队还是拒绝该消息
配置

Sneakers.configure(
  heartbeat: 60,
  amqp: "amqp://#{ENV['RABBITMQ_USER']}:#{ENV['RABBITMQ_PASS']}@#{ENV['RABBITMQ_HOST']}",
  daemonize: ENV['RACK_ENV'] == 'production',
  pid_path: './sneakers.pid',
  vhost: '/',
  exchange: 'sneakers',
  exchange_type: :direct,
  timeout_job_after: 2.minutes,
  workers: 1,
  properties: {
    connection_name: 'worker'
  }
)
Sneakers.logger.level = Logger::DEBUG

workers = Sneakers::Runner.new([Workers::BB])
workers.run

工人

module Workers
  ##
  class BB
    include Sneakers::Worker
    from_queue :backbone

    def work(msg)
      failed_atempts = 0 ## ! need to find out the failed attempt count

      ## ... handle message ...

      ack!
    rescue StandardError => e
      log :fatal, e.message
      log :fatal, e.backtrace
      
      requeue! if failed_attempts <= 5
      reject! if failed_attempts > 5
    end
  end
end
gwbalxhn

gwbalxhn1#

目前,这是我找到的唯一方法,我使用work_with_params代替work,然后使用publish手动重新排队消息,并手动配置自定义报头

class Workman
  include Sneakers::Worker

  from_queue :my_queue

  def work_with_params(msg, _delivery_info, metadata)
    max_retry = 5
    headers = metadata[:headers]
    retry_count = headers['retry_count'] || 0

    ## ...

    ack!
  rescue StandardError => e
    retry_count += 1
    headers['retry_count'] = retry_count

    if retry_count > max_retry
      ## ... do something to stash failed messages for later processing
      stash_failed_message(@queue.name, msg, metadata, e)
    end

    if retry_count <= max_retry
      publish msg, routing_key: @queue.name, persistent: true, headers: { retry_count: retry_count }
    end

    ack!
  end
end

相关问题