import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp
# Use Paramiko's listdir_iter method
items_iter = sftp.listdir_iter(path)
# Skip the items before the starting page
for _ in range(start_page * page_size):
try:
next(items_iter)
except StopIteration:
break
items = []
for i, item in enumerate(items_iter):
if i >= page_size:
break
items.append(item)
return items
import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp # Access the paramiko SFTP client
# Use Paramiko's listdir_iter method
items_iter = sftp.listdir_iter(path)
# Implement pagination by controlling the iterator
# ... (rest of the implementation)
return items
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, sftp_fs, *args, **kwargs):
self.sftp = sftp_fs.ftp
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
# Use the sftp connection from the existing SFTPFileSystem object
items_iter = self.sftp.listdir_iter(path)
# Implement pagination by controlling the iterator
# ... (rest of the implementation)
return items
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def walk_paginated(self, path, page_size=1000, start_page=0, maxdepth=None, topdown=True, **kwargs):
if not self.isdir(path):
raise ValueError("Path should be a directory")
# Use the original walk method to generate the nested structure
walk_generator = super().walk(path, maxdepth=maxdepth, topdown=topdown, **kwargs)
# Implement pagination for the top level only
start_index = start_page * page_size
end_index = start_index + page_size
paginated_walk = []
for idx, (root, directories, files) in enumerate(walk_generator):
if start_index <= idx < end_index:
paginated_walk.append((root, directories, files))
if idx >= end_index:
break
return paginated_walk
# Create a new PaginatedSFTPFileSystem instance with a new FTP session
fs1 = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password", dummy_arg1="unique_value_1")
# Create another new PaginatedSFTPFileSystem instance with a new FTP session
fs2 = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password", dummy_arg2="unique_value_2")
1条答案
按热度按时间vnzz0bqm1#
listdir_iter
将提供一种更直接的方法来实现分页,因为它返回一个迭代器,允许您逐个检索项。但是你也可以考虑
listdir_attr
,它一次加载所有项目,然后对列表进行切片以获得所需的页面:那会更快这意味着您可以尝试通过对返回的SFTPAttributes
对象列表进行切片来实现分页。例如:您可以将其用作:
这种方法比使用
listdir_iter
的方法稍微高效一些,因为它避免了逐个迭代项。但是,在对列表进行切片之前,它仍然会在内存中加载 * 所有 *
SFTPAttributes
对象。除非您有非常大的文件数量和有限的内存资源,否则这种内存开销可能不是问题。要在fsspec中使用
listdir_iter
,可以创建一个自定义的PaginatedSFTPFileSystem
类,该类继承自SFTPFileSystem
。自定义类通过
self.ftp
属性访问底层paramiko SFTP客户端,然后仍然直接使用listdir_iter
方法。通过以这种方式访问
paramiko
SFTP客户端,您可以使用listdir_iter
直接实现分页,即使它不是fsspec
的一部分。使用
sshfs
(使用asyncssh的SFTP协议的fspec实现),我没有看到类似SSHFS.listdir
的方法。但是
sshfs
也有很多其他基本的文件系统操作,比如mkdir
、touch
和find
。因此,您可以尝试使用
find
方法,该方法继承自fsspec
中的AbstractFileSystem
类,用于分页:您可以在项目中使用此自定义实现,如下所示:
此实现使用
find
方法,并将detail参数设置为False
,以获取文件路径列表。然后,它通过对项目列表进行切片来实现分页。
同样,这种方法在对列表进行切片之前将 * 所有 * 项加载到内存中,这对于非常大的目录可能效率低下。
你知道是否有一种方法可以直接添加一个sftp连接到
PaginatedSFTPFileSystem
,而不是有host
,username
,password
再次,因为我已经有一个fsspec.filesystem
对象sftp连接创建,执行多个任务?我想你可以将一个现有的
SFTPFileSystem
对象传递给你的自定义PaginatedSFTPFileSystem
类,并使用它的底层sftp连接。为此,可以修改自定义类,使其在初始化期间接受
SFTPFileSystem
对象,并使用其sftp
属性列出目录项。现在你可以创建一个
SFTPFileSystem
对象并将其传递给PaginatedSFTPFileSystem
:这个自定义类现在将使用来自 existing
SFTPFileSystem
对象的sftp连接,无需再次提供host
、username
和password
。Corralien在评论中建议使用
walk(path, maxdepth=None, topdown=True, **kwargs)
。您可以将此方法用于自定义
PaginatedSFTPFileSystem
类,因为它继承自SFTPFileSystem
,而SFTPFileSystem
又继承自AbstractFileSystem
。这意味着
walk
方法可用于您的自定义类。但是,这可能不是最适合分页的选择,因为它返回嵌套结构中的文件和目录,这使得以直接的方式对结果进行分页变得更加困难。
如果只需要对顶层目录进行分页,可以修改自定义
PaginatedSFTPFileSystem
类,使其包含walk方法的自定义实现,并支持顶层目录的分页。与以下产品一起使用:
同样,这只会对 * 顶级 * 目录和文件进行分页,而不会对子目录中的文件进行分页。
如果需要对所有级别的文件和目录进行分页,请考虑使用
find
方法或自定义listdir_paginated
方法,如前面的示例所示。正如mdurant在评论中所指出的:
fsspec缓存文件系统示例,因此如果您使用相同的参数调用SSHFileSystem(或派生)两次,则不会创建新的ftp会话,而是重用现有会话。
请参见示例/列表缓存。
根据您的用例,您可能需要传递
skip_instance_cache=True
或use_listings_cache=False
。考虑一下,如果使用相同的参数创建PaginatedSFTPFileSystem示例,fsspec将返回缓存的SFTPFileSystem示例。
如果要强制创建新的FTP会话,可以在创建
PaginatedSFTPFileSystem
示例时传递一个唯一参数。例如,您可以添加一个虚拟参数,每次要创建新的FTP会话时,该参数都采用唯一值:
在该示例中,
fs1
和fs2
将具有单独的FTP会话,尽管具有相同的主机、用户名和密码,因为唯一的伪参数强制fsspec
创建新示例,而不是重用缓存的示例。