如何使用Ruby Fibers避免阻塞IO

f4t66c6m  于 12个月前  发布在  Ruby
关注(0)|答案(5)|浏览(106)

我需要上传一个目录中的一堆文件到S3。由于上传所需的90%以上的时间都花在等待http请求完成上,所以我想以某种方式一次执行其中的几个。
纤维能帮我解决这个问题吗?它们被描述为解决这类问题的一种方法,但我想不出在http调用阻塞时可以做任何工作的任何方法。
有没有办法不用线程就能解决这个问题?

pdkcd3nj

pdkcd3nj1#

我不太了解1.9中的纤程,但是1.8.6中的常规线程可以解决这个问题。尝试使用队列http://ruby-doc.org/stdlib/libdoc/thread/rdoc/classes/Queue.html
查看文档中的示例,您的消费者是执行上传的部分。它“消费”一个URL和一个文件,并上传数据。生产者是你的程序的一部分,保持工作,并找到新的文件上传。
如果你想一次上传多个文件,只需为每个文件启动一个新的线程:

t = Thread.new do
  upload_file(param1, param2)
end
@all_threads << t

然后,稍后在你的“生产者”代码中(记住,它不一定要在自己的线程中,它可以是主程序):

@all_threads.each do |t|
  t.join if t.alive?
end

队列可以是@member_variable或$global。

ikfrs5lh

ikfrs5lh2#

回答您的实际问题:

  • 纤维可以帮助我在这一切?*

不他们不能。Jörg W Mittag解释了为什么最好。
不,您不能使用Fibers进行并发。纤程并不是一个并发结构,而是一个控制流结构,就像CNET。这就是光纤的全部意义:它们从不并行,它们是合作的,它们是决定性的。光纤是协程。(事实上,我一直不明白为什么它们不简单地称为协程。
Ruby中唯一的并发结构是线程。
当他说Ruby中唯一的并发结构是线程时,请记住Ruby有许多不同的实现,它们的线程实现也各不相同。Jörg再次为这些差异提供了一个很好的答案;并正确地得出结论,只有像JRuby(使用Map到本机线程的JVM线程)或fork进程这样的东西才能实现真正的并行性。

  • 任何方法我可以解决这个问题没有线程?*

除了分叉你的进程,我还建议你看看EventMachine和类似em-http-request的东西。它是一个事件驱动的、非阻塞的、基于reactor pattern的HTTP客户端,是异步的,不会引起线程的开销。

8i9zcol2

8i9zcol23#

Aaron Patterson(@tenderlove)使用了一个几乎与您的示例完全相同的示例来准确描述为什么您可以 * 并且应该 * 使用线程来实现您的情况中的并发性。
大多数I/O库现在都足够聪明,可以在执行IO时释放GVL(全局VM锁,或大多数人称为GIL或全局解释器锁)。在C中有一个简单的函数调用来实现这一点。您不需要担心C代码,但对您来说,这意味着大多数值得使用的IO库将释放GVL并允许其他线程执行,而执行IO的线程等待数据返回。
如果我刚才说的令人困惑,你不必太担心。你需要知道的主要事情是,如果你正在使用一个体面的库来做你的HTTP请求(或任何其他I/O操作)。数据库,进程间通信,等等),Ruby解释器(MRI)足够聪明,能够释放解释器上的锁,并允许其他线程执行,而一个线程等待IO返回。如果下一个线程有自己的IO要获取,Ruby解释器将做同样的事情(假设IO库是为了利用Ruby的这个特性而构建的,我相信现在大多数都是这样)。
所以,总结一下我所说的,使用线程!您应该看到性能优势。如果没有,请检查您的http库是否使用了C中的rb_thread_blocking_region()函数,如果没有,请找出原因。也许有一个很好的理由,也许你需要考虑使用一个更好的图书馆。
Aaron Patterson视频的链接在这里:http://www.youtube.com/watch?v=kufXhNkm5WU
这是值得一看,即使只是为了笑,因为亚伦帕特森是互联网上最有趣的人之一。

44u64gxh

44u64gxh4#

您可以使用单独的进程来实现这一点,而不是线程:

#!/usr/bin/env ruby

$stderr.sync = true

# Number of children to use for uploading
MAX_CHILDREN = 5

# Hash of PIDs for children that are working along with which file
# they're working on.
@child_pids = {}

# Keep track of uploads that failed
@failed_files = []

# Get the list of files to upload as arguments to the program
@files = ARGV

### Wait for a child to finish, adding the file to the list of those
### that failed if the child indicates there was a problem.
def wait_for_child
    $stderr.puts "    waiting for a child to finish..."
    pid, status = Process.waitpid2( 0 )
    file = @child_pids.delete( pid )
    @failed_files << file unless status.success?
end

### Here's where you'd put the particulars of what gets uploaded and
### how. I'm just sleeping for the file size in bytes * milliseconds
### to simulate the upload, then returning either +true+ or +false+
### based on a random factor.
def upload( file )
    bytes = File.size( file )
    sleep( bytes * 0.00001 )
    return rand( 100 ) > 5
end

### Start a child uploading the specified +file+.
def start_child( file )
    if pid = Process.fork
        $stderr.puts "%s: uploaded started by child %d" % [ file, pid ]
        @child_pids[ pid ] = file
    else
        if upload( file )
            $stderr.puts "%s: done." % [ file ]
            exit 0 # success
        else
            $stderr.puts "%s: failed." % [ file ]
            exit 255
        end
    end
end

until @files.empty?

    # If there are already the maximum number of children running, wait 
    # for one to finish
    wait_for_child() if @child_pids.length >= MAX_CHILDREN

    # Start a new child working on the next file
    start_child( @files.shift )

end

# Now we're just waiting on the final few uploads to finish
wait_for_child() until @child_pids.empty?

if @failed_files.empty?
    exit 0
else
    $stderr.puts "Some files failed to upload:",
        @failed_files.collect {|file| "  #{file}" }
    exit 255
end
qv7cva1a

qv7cva1a5#

现在是2023年,情况发生了变化。您可以使用Fibers来解决这个问题!
您需要:

用法示例:

require "fiber_scheduler"
require "open-uri"

FiberScheduler do
  Fiber.schedule do
    URI.open("https://httpbin.org/delay/2")
  end

  Fiber.schedule do
    URI.open("https://httpbin.org/delay/2")
  end
end

就是这样!请注意,FiberBlock中的两个纤程将同时执行。
如果你需要返回一个值,你可以这样做:

def async_request(url)
  response = nil

  FiberScheduler do
      Fiber.schedule do
        response = URI.open(url)
      end
    
      Fiber.schedule do
        until response
          print "not ready yet, waiting, or doing some stuff"
          sleep 5
        end
      end
    end

   response
end

response = async_request("https://httpbin.org/delay/2")

相关问题