ruby:创建线程时GC未清理的对象

9q78igpj  于 2023-02-18  发布在  Ruby
关注(0)|答案(1)|浏览(163)

拥有线程的类不会被垃圾回收

我有一个ruby服务,需要将一个对象从S3流到其他地方,文件变得很大,我不想将它们存储为文件,所以我编写了一个简单的类来缓冲对象的部分,以便其他代码部分将其用作IO对象。
本质上它看起来是这样的,和完整的代码可用的要点链接如下:

class S3ObjectStream
  attr_reader :s3_client, :object, :size

  def initialize(bucket, key, part_size: 5 * 1024 * 1024, num_threads: 5)
    @s3_client     = Aws::S3::Client.new(...)
    @object        = Aws::S3::Object.new(bucket_name: bucket, key:, client: @s3_client)
    @io = StringIO.new
    @size = @object.content_length
    initialize_parts
    start_parts
    ObjectSpace.define_finalizer(self,
                                 self.class.method(:finalize).to_proc)
  end

  def self.finalize(id)
    puts "S3ObjectStream #{id} dying"
  end

  def read(size, out_buf=nil)
    # Simplified, checks if more mem needed from parts
    get_mem if need_more
    @io.read(size, out_buf)
  end

  def need_more
    #check byte ranges
  end

  def get_mem
    # Simplified...
    part = @parts.shift
    @io.rewind
    @io << part.data
    start_next_part
  end

  def initialize_parts
    @parts = []
    # Determine # of parts required
    # Create instances of them
    nparts.each do
     part = DataPart.new(...)
     @parts.push_back(part)
    end
  end

  def start_parts
     # Start downloading parts concurrently by num of threads or total parts
    # These vars are set in initialize_parts, not shown in simplified code
    num_to_start = [@num_parts, @num_threads].min
    @parts.each_with_index do |part, i|
      break if i == num_to_start

      part.start
    end
  end

  def start_next_part
    @parts.each do |part|
      next if part.started?

      part.start
      break
    end
  end
end

class DataPart
  def initialize(s3_object, start_byte, end_byte)
    @s3_object  = s3_object
    @start_byte = start_byte
    @end_byte   = end_byte
    @range      = "bytes=#{@start_byte}-#{@end_byte}"
    ObjectSpace.define_finalizer(self,
                                 self.class.method(:finalize).to_proc)
  end

  def self.finalize(id)
    puts "DataPart #{id} dying"
  end

  def start
    @thread = Thread.new do
      @part_data = @s3_object.get(range: @range).body.read
      nil # Don't want the thread to hold onto the string as Thread.value
    end
  end

  def data
    @thread.join
    @part_data
  end
end

我们遇到的问题是DataPart对象似乎没有被垃圾收集清理。我的理解是,一旦DataPart超出get_mem中的作用域(移出数组,然后离开方法的作用域),它应该是不可访问的,并标记为清理。
一开始我们遇到了内存问题(如下图所示),整个文件都被保存在内存中,在start中将nil添加到DataPart线程减少了内存使用,但我们仍然看到对象永远存在。
下面是此脚本

的内存使用情况图
将析构函数打印添加到对象显示,创建的所有DataPart对象直到程序退出才被销毁,即使拥有这些对象及其数组的S3ObjectStream正在按预期销毁。
gist showing test code and logs of objects being destroyed
当我们从start中删除线程并串行下载部分时,DataPart对象会在运行时GC运行期间被破坏,但这显然会给整个过程增加大量时间。
删除线程

后的内存使用情况图
我的问题是,是什么原因导致这些DataPart坚持包含线程?在线程对象和拥有DataPart之间是否存在我不理解的循环依赖关系?

idfiyjo8

idfiyjo81#

我宁愿假设@io中的StringIO对象在每次读取时都会变大,因为您在S3ObjectStream#get_mem中追加了数据,而不是对某些对象进行垃圾收集。
由于StringIO基本上只是一个普通的String,它有一个不同的接口,可以像IO对象一样工作,所以这里发生的情况是,您只是增加了底层字符串的大小,而不会再次释放读取的数据。请注意,对于StringIO对象,只是从它阅读数据并不会删除以前从String读取的数据;您可以随时在它上面调用rewind,以便重新从头读取所有内容。
为了避免这种情况,你应该尝试完全去掉@io对象,只使用一个简单的String对象。在get_mem中,你可以将数据附加到这个字符串。在read中,你可以使用String#byteslice来获取size字节的数据 * 并 * 从缓冲区中删除这个读取的数据。这样,你的缓冲区就不会无限增长。
这可能如下所示:

class S3ObjectStream
  def initialize(bucket, key, part_size: 5 * 1024 * 1024, num_threads: 5)
    # ...

    # a mutable string in binary encoding
    @buffer = +"".force_encoding(Encoding::BINARY)
  end

  def get_mem
    part = @parts.shift
    @buffer << part.data
  end

  def read(size, out_buf = nil)
    # Simplified, checks if more mem needed from parts
    get_mem if need_more

    data = @buffer.byteslice(0, size)
    if out_buf
       out_buf.replace data
       out_buf
    else
       data
    end
  end
end

out_buf在这个实现中或多或少是无用的,可能在任何方面都没有帮助,但可能也没有坏处。
注意这个构造和你之前的StringIO对象都不是线程安全的,如果你要从多个并发线程中追加和/或阅读@buffer,你需要添加适当的互斥锁。
除了@io问题之外,从简化的代码中还可以看出,您开始并行获取所有部件,每个部件都在自己的线程中。因此,每个DataPart对象将其读取的数据保存在@part_data变量的内存中。当您在开始时为数据并行初始化所有DataPart对象时,无论如何,你的内存都会增长到包含所有部分。因此,从@parts数组中部分获取数据部分并将其数据附加到缓冲区中的cobstructuon是相当没有意义的。
相反,您可能只需要在使用DataPart时获取几个DataPart(或者一次一个),并在读取数据时继续创建/获取其他DataPart。

相关问题