redis 快速添加多个项目(1000/秒)到一个sidekiq队列?

niwlg2el  于 2023-11-16  发布在  Redis
关注(0)|答案(3)|浏览(121)

我知道sidekiq有一个push_bulk选项,但我目前受到redis延迟的限制,所以通过push_bulk传递多个项目仍然不够快(大约只有50/s)。
我试着增加redis连接的数量,如下所示:

redis_conn = proc {
  Redis.new({ :url => Rails.configuration.redis.url })
}

Sidekiq.configure_client do |config|
  Sidekiq.configure_client do |config|
    config.redis = ConnectionPool.new(size: 50, &redis_conn)
  end
  config.client_middleware do |chain|
    chain.add Sidekiq::Status::ClientMiddleware
  end
end

字符串
然后启动单独的线程(Thread.new)来对不同的对象执行_code.c。有趣的是,任何不是第一个线程的线程都不会被扔到sidekiq队列中,就像它们被完全忽略一样。
有人知道更好的方法吗?
编辑:下面是我尝试的push_bulk方法,它实际上更慢:

user_ids = User.need_scraping.pluck(:id)
  bar = ProgressBar.new(user_ids.count)
  user_ids.in_groups_of(10000, false).each do |user_id_group|
    Sidekiq::Client.push_bulk(
      'args'  => user_id_group.map{ |user_id| [user_id] },
      'class' => ScrapeUser,
      'queue' => 'scrape_user',
      'retry' => true
    )
  end


谢谢你,谢谢

sxpgvts3

sxpgvts31#

你确实想使用push_bulk。你受到延迟/往返时间的限制,将元素写入支持sidekiq的redis队列。
你正在使用多线程/连接来克服一个缓慢的网络,当你真的应该删除额外的网络往返。
假设您尝试创建20 k个UserWorker作业,这些作业需要一个user_id
您可以通过以下方式将单个作业入队:

UserWorker.perform_async(user_id)

字符串
.Map到:

Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )


因此,20 k user_ids的push_bulk版本是:

# This example takes 20k user_ids in an array, chunks them into groups of 1000 ids,
# and batch sends them to redis as a group.

User.need_scraping.select('id').find_in_batches do |user_group|

  sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } }
  Sidekiq::Client.push_bulk(sidekiq_items)
end


这将20 k redis调用转换为20个redis调用,平均往返时间为5 ms(乐观),即1秒对100秒。您的里程可能会有所不同。

**编辑:**评论者似乎对Sidekiq/Redis客户端批量排队数据的行为感到困惑。

Sidekiq::Client.push_bulk()方法接受一个要加密的作业数组。它将这些作业转换为Sidekiq作业有效负载哈希,然后调用SideKiq::Client.raw_push()将这些有效负载交付给redis。参见来源:https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158
SideKiq::Client.raw_push()获取一个Sidekiq哈希负载列表,将其转换为JSON,然后执行一个结合了两个redis命令的redis MULTI命令。(redis SADD),然后将所有作业有效负载推送到目标队列redis列表对象(redis LPUSH)。这是一个单独的redis命令,在一个单独的redis原子组中一起执行。
如果这仍然很慢,你可能有其他问题(缓慢的网络,过载的redis服务器等)。

mrphzbgm

mrphzbgm2#

@温菲尔德的答案是正确的,他对延迟的看法也是绝对正确的。然而,正确的语法实际上是这样的:

User.need_scraping.select('id').find_in_batches do |user_group|
  Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } })
end

字符串
也许它在最新的Sidekiq中改变了(我懒得检查),但这是现在正确的语法。

ss2ws0br

ss2ws0br3#

根据@michael-y的回答,现在有一个perform_bulk方法可以在sidekiq作业类上调用。
下面是一个示例,建立在这个页面上的例子:

User.need_scraping.select('id').find_in_batches do |user_group|
  args = user_group.map { |user| [user.id] } # must be an array of arrays
  UserWorker.perform_bulk(args)
end

字符串
另请参阅:

  • perform_bulk源代码(它基本上只调用push_bulk
  • push_bulk

相关问题