python 是否有一种方法,我们可以得到分页的回应从sftp?

iezvtpos  于 2023-05-16  发布在  Python
关注(0)|答案(1)|浏览(103)

我使用fspec,它使用paramiko的内置功能,但无法真正找到一种方法来分页响应。
有没有办法在这里实现这个功能?
这个用例就像每个目录都有100000个文件,我想在内存中单独列出所有这些文件是一个坏主意。
有一个sftp.listdir_iter,但是我们在fspec中有这个功能吗?

vnzz0bqm

vnzz0bqm1#

listdir_iter将提供一种更直接的方法来实现分页,因为它返回一个迭代器,允许您逐个检索项。

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp

        # Use Paramiko's listdir_iter method
        items_iter = sftp.listdir_iter(path)

        # Skip the items before the starting page
        for _ in range(start_page * page_size):
            try:
                next(items_iter)
            except StopIteration:
                break

        items = []
        for i, item in enumerate(items_iter):
            if i >= page_size:
                break
            items.append(item)

        return items

但是你也可以考虑listdir_attr,它一次加载所有项目,然后对列表进行切片以获得所需的页面:那会更快这意味着您可以尝试通过对返回的SFTPAttributes对象列表进行切片来实现分页。例如:

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp

        # Use Paramiko's listdir_attr method
        items = sftp.listdir_attr(path)

        # Slice the items list to implement pagination
        start_index = start_page * page_size
        end_index = start_index + page_size
        paginated_items = items[start_index:end_index]

        # Extract filenames from the SFTPAttributes objects
        filenames = [item.filename for item in paginated_items]

        return filenames

您可以将其用作:

fs = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password")
paginated_response = fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)

这种方法比使用listdir_iter的方法稍微高效一些,因为它避免了逐个迭代项。
但是,在对列表进行切片之前,它仍然会在内存中加载 * 所有 * SFTPAttributes对象。除非您有非常大的文件数量和有限的内存资源,否则这种内存开销可能不是问题。
要在fsspec中使用listdir_iter,可以创建一个自定义的PaginatedSFTPFileSystem类,该类继承自SFTPFileSystem
自定义类通过self.ftp属性访问底层paramiko SFTP客户端,然后仍然直接使用listdir_iter方法。

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp  # Access the paramiko SFTP client

        # Use Paramiko's listdir_iter method
        items_iter = sftp.listdir_iter(path)

        # Implement pagination by controlling the iterator
        # ... (rest of the implementation)

        return items

通过以这种方式访问paramiko SFTP客户端,您可以使用listdir_iter直接实现分页,即使它不是fsspec的一部分。
使用sshfs(使用asyncssh的SFTP协议的fspec实现),我没有看到类似SSHFS.listdir的方法。
但是sshfs也有很多其他基本的文件系统操作,比如mkdirtouchfind
因此,您可以尝试使用find方法,该方法继承自fsspec中的AbstractFileSystem类,用于分页:

import sshfs

class PaginatedSSHFS(sshfs.SSHFS):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def find_paginated(self, path, page_size=1000, start_page=0):
        if not await self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use SSHFS's find method
        items = await super().find(path, detail=False)

        # Implement pagination by slicing the items list
        start_index = start_page * page_size
        end_index = start_index + page_size
        paginated_items = items[start_index:end_index]

        return paginated_items

您可以在项目中使用此自定义实现,如下所示:

import asyncio

async def main():
    fs = PaginatedSSHFS(host="your_host", username="your_username", password="your_password")
    paginated_response = await fs.find_paginated("/path/to/your/directory", page_size=1000, start_page=0)
    print(paginated_response)

asyncio.run(main())

此实现使用find方法,并将detail参数设置为False,以获取文件路径列表。
然后,它通过对项目列表进行切片来实现分页。
同样,这种方法在对列表进行切片之前将 * 所有 * 项加载到内存中,这对于非常大的目录可能效率低下。
你知道是否有一种方法可以直接添加一个sftp连接到PaginatedSFTPFileSystem,而不是有hostusernamepassword再次,因为我已经有一个fsspec.filesystem对象sftp连接创建,执行多个任务?
我想你可以将一个现有的SFTPFileSystem对象传递给你的自定义PaginatedSFTPFileSystem类,并使用它的底层sftp连接。
为此,可以修改自定义类,使其在初始化期间接受SFTPFileSystem对象,并使用其sftp属性列出目录项。

from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, sftp_fs, *args, **kwargs):
        self.sftp = sftp_fs.ftp
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use the sftp connection from the existing SFTPFileSystem object
        items_iter = self.sftp.listdir_iter(path)

        # Implement pagination by controlling the iterator
        # ... (rest of the implementation)

        return items

现在你可以创建一个SFTPFileSystem对象并将其传递给PaginatedSFTPFileSystem

sftp_fs = SFTPFileSystem(host="your_host", username="your_username", password="your_password")

# Pass the existing SFTPFileSystem object to the custom PaginatedSFTPFileSystem
paginated_fs = PaginatedSFTPFileSystem(sftp_fs)

paginated_response = paginated_fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)
print(paginated_response)

这个自定义类现在将使用来自 existingSFTPFileSystem对象的sftp连接,无需再次提供hostusernamepassword
Corralien在评论中建议使用walk(path, maxdepth=None, topdown=True, **kwargs)
您可以将此方法用于自定义PaginatedSFTPFileSystem类,因为它继承自SFTPFileSystem,而SFTPFileSystem又继承自AbstractFileSystem
这意味着walk方法可用于您的自定义类。
但是,这可能不是最适合分页的选择,因为它返回嵌套结构中的文件和目录,这使得以直接的方式对结果进行分页变得更加困难。
如果只需要对顶层目录进行分页,可以修改自定义PaginatedSFTPFileSystem类,使其包含walk方法的自定义实现,并支持顶层目录的分页。

from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def walk_paginated(self, path, page_size=1000, start_page=0, maxdepth=None, topdown=True, **kwargs):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use the original walk method to generate the nested structure
        walk_generator = super().walk(path, maxdepth=maxdepth, topdown=topdown, **kwargs)

        # Implement pagination for the top level only
        start_index = start_page * page_size
        end_index = start_index + page_size

        paginated_walk = []
        for idx, (root, directories, files) in enumerate(walk_generator):
            if start_index <= idx < end_index:
                paginated_walk.append((root, directories, files))
            if idx >= end_index:
                break

        return paginated_walk

与以下产品一起使用:

fs = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password")

paginated_response = fs.walk_paginated("/path/to/your/directory", page_size=1000, start_page=0)
for root, directories, files in paginated_response:
    print(f"Root: {root}")
    print(f"Directories: {directories}")
    print(f"Files: {files}\n")

同样,这只会对 * 顶级 * 目录和文件进行分页,而不会对子目录中的文件进行分页。
如果需要对所有级别的文件和目录进行分页,请考虑使用find方法或自定义listdir_paginated方法,如前面的示例所示。
正如mdurant在评论中所指出的:

fsspec缓存文件系统示例,因此如果您使用相同的参数调用SSHFileSystem(或派生)两次,则不会创建新的ftp会话,而是重用现有会话。

请参见示例/列表缓存。
根据您的用例,您可能需要传递skip_instance_cache=Trueuse_listings_cache=False
考虑一下,如果使用相同的参数创建PaginatedSFTPFileSystem示例,fsspec将返回缓存的SFTPFileSystem示例。
如果要强制创建新的FTP会话,可以在创建PaginatedSFTPFileSystem示例时传递一个唯一参数。
例如,您可以添加一个虚拟参数,每次要创建新的FTP会话时,该参数都采用唯一值:

# Create a new PaginatedSFTPFileSystem instance with a new FTP session
fs1 = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password", dummy_arg1="unique_value_1")

# Create another new PaginatedSFTPFileSystem instance with a new FTP session
fs2 = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password", dummy_arg2="unique_value_2")

在该示例中,fs1fs2将具有单独的FTP会话,尽管具有相同的主机、用户名和密码,因为唯一的伪参数强制fsspec创建新示例,而不是重用缓存的示例。

相关问题