拥有线程的类不会被垃圾回收
我有一个ruby服务,需要将一个对象从S3流到其他地方,文件变得很大,我不想将它们存储为文件,所以我编写了一个简单的类来缓冲对象的部分,以便其他代码部分将其用作IO对象。
本质上它看起来是这样的,和完整的代码可用的要点链接如下:
class S3ObjectStream
attr_reader :s3_client, :object, :size
def initialize(bucket, key, part_size: 5 * 1024 * 1024, num_threads: 5)
@s3_client = Aws::S3::Client.new(...)
@object = Aws::S3::Object.new(bucket_name: bucket, key:, client: @s3_client)
@io = StringIO.new
@size = @object.content_length
initialize_parts
start_parts
ObjectSpace.define_finalizer(self,
self.class.method(:finalize).to_proc)
end
def self.finalize(id)
puts "S3ObjectStream #{id} dying"
end
def read(size, out_buf=nil)
# Simplified, checks if more mem needed from parts
get_mem if need_more
@io.read(size, out_buf)
end
def need_more
#check byte ranges
end
def get_mem
# Simplified...
part = @parts.shift
@io.rewind
@io << part.data
start_next_part
end
def initialize_parts
@parts = []
# Determine # of parts required
# Create instances of them
nparts.each do
part = DataPart.new(...)
@parts.push_back(part)
end
end
def start_parts
# Start downloading parts concurrently by num of threads or total parts
# These vars are set in initialize_parts, not shown in simplified code
num_to_start = [@num_parts, @num_threads].min
@parts.each_with_index do |part, i|
break if i == num_to_start
part.start
end
end
def start_next_part
@parts.each do |part|
next if part.started?
part.start
break
end
end
end
class DataPart
def initialize(s3_object, start_byte, end_byte)
@s3_object = s3_object
@start_byte = start_byte
@end_byte = end_byte
@range = "bytes=#{@start_byte}-#{@end_byte}"
ObjectSpace.define_finalizer(self,
self.class.method(:finalize).to_proc)
end
def self.finalize(id)
puts "DataPart #{id} dying"
end
def start
@thread = Thread.new do
@part_data = @s3_object.get(range: @range).body.read
nil # Don't want the thread to hold onto the string as Thread.value
end
end
def data
@thread.join
@part_data
end
end
我们遇到的问题是DataPart对象似乎没有被垃圾收集清理。我的理解是,一旦DataPart超出get_mem
中的作用域(移出数组,然后离开方法的作用域),它应该是不可访问的,并标记为清理。
一开始我们遇到了内存问题(如下图所示),整个文件都被保存在内存中,在start
中将nil
添加到DataPart
线程减少了内存使用,但我们仍然看到对象永远存在。
下面是此脚本
的内存使用情况图
将析构函数打印添加到对象显示,创建的所有DataPart
对象直到程序退出才被销毁,即使拥有这些对象及其数组的S3ObjectStream
正在按预期销毁。
gist showing test code and logs of objects being destroyed
当我们从start
中删除线程并串行下载部分时,DataPart
对象会在运行时GC运行期间被破坏,但这显然会给整个过程增加大量时间。
删除线程
后的内存使用情况图
我的问题是,是什么原因导致这些DataPart
坚持包含线程?在线程对象和拥有DataPart
之间是否存在我不理解的循环依赖关系?
1条答案
按热度按时间idfiyjo81#
我宁愿假设
@io
中的StringIO
对象在每次读取时都会变大,因为您在S3ObjectStream#get_mem
中追加了数据,而不是对某些对象进行垃圾收集。由于
StringIO
基本上只是一个普通的String,它有一个不同的接口,可以像IO
对象一样工作,所以这里发生的情况是,您只是增加了底层字符串的大小,而不会再次释放读取的数据。请注意,对于StringIO
对象,只是从它阅读数据并不会删除以前从String读取的数据;您可以随时在它上面调用rewind
,以便重新从头读取所有内容。为了避免这种情况,你应该尝试完全去掉
@io
对象,只使用一个简单的String对象。在get_mem
中,你可以将数据附加到这个字符串。在read
中,你可以使用String#byteslice
来获取size
字节的数据 * 并 * 从缓冲区中删除这个读取的数据。这样,你的缓冲区就不会无限增长。这可能如下所示:
out_buf
在这个实现中或多或少是无用的,可能在任何方面都没有帮助,但可能也没有坏处。注意这个构造和你之前的
StringIO
对象都不是线程安全的,如果你要从多个并发线程中追加和/或阅读@buffer
,你需要添加适当的互斥锁。除了
@io
问题之外,从简化的代码中还可以看出,您开始并行获取所有部件,每个部件都在自己的线程中。因此,每个DataPart
对象将其读取的数据保存在@part_data
变量的内存中。当您在开始时为数据并行初始化所有DataPart对象时,无论如何,你的内存都会增长到包含所有部分。因此,从@parts
数组中部分获取数据部分并将其数据附加到缓冲区中的cobstructuon是相当没有意义的。相反,您可能只需要在使用DataPart时获取几个DataPart(或者一次一个),并在读取数据时继续创建/获取其他DataPart。