并发-Ruby异步方法调用问题

tf7tbtn2  于 2023-05-28  发布在  Ruby
关注(0)|答案(2)|浏览(123)

Ruby版本:ruby-3.2.1
我需要阅读一个包含50 k行的csv文件。然后,对于每一行,我需要逐个执行7个API,以便在第三方应用程序中创建一个条目。
我正在尝试使用并发ruby gem的异步特性。下面是一个相同的示例代码。

require 'csv'
require "json"
require 'byebug'
require 'concurrent'

class Processor
  include Concurrent::Async

  def consume(c_row)
    ## Inside consume methods 7 APIS are called in sequence for each row
    puts "processing!!!! -> #{c_row}"
    sleep 1 if c_row % 2
    sleep 5 if c_row % 3 
  end
end

class ImporterAsync
  def self.perform(master_file)
    begin
      ## CSV has 50k entries ##
      CSV.foreach(master_file, headers: true).with_index(1) do |row|
        Processor.new.async.consume(row)
      end
    rescue StandardError => e
      puts "Main script failed: #{e.message}"
    ensure
      puts 'closing'
    end
  end
end

ImporterAsync.perform('master_file.csv')

当我使用async关键字Processor.new.async.consume(row)调用处理器时,代码没有响应
但是代码在没有async关键字的情况下执行得很好。Processor.new.consume(row)
我相信异步的方式将帮助我的代码更好地执行这种负载。我也愿意接受建议。
提前感谢- Ajith

ttp71kqs

ttp71kqs1#

你的程序在创建异步线程后立即退出。你需要等待他们,否则他们会终止与其余的程序。像这样使用async实际上会返回一个Concurrent::IVar,您可以使用它来跟踪#value上的每个工作。

class ImporterAsync
  def self.perform(master_file)
    futures = []
    begin
      CSV.foreach(master_file, headers: true).with_index(1) do |row|
        future = Processor.new.async.consume(row)
        futures.push(future)
      end
    rescue StandardError => e
      puts "Main script failed: #{e.message}"
    ensure
      puts 'closing'
    end

    futures.each do |t|
      t.wait
      puts "finished (#{t.value.to_s[0...40].strip}) #{Time.now}"
    end
    puts "ALL DONE"
  end
end

这是你的类的一个修改版本,用于将每个IVar收集到一个数组中。开始后,他们都等待每一个完成。这意味着程序将在退出之前等待所有这些。(注意finished也可能意味着引发了异常,它并不意味着“好”的完成。

m3eecexj

m3eecexj2#

实际上,我参考了下面的链接来了解正在发生的事情。
Celluloid async inside ruby blocks does not work
但我仍然开放的建议。

相关问题