pyspark 如何使用文件的部分名称从胶合中的S3桶读取文件

krcsximq  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(177)

我试图从胶水s3桶的文件基于关键字搜索文件名。例如,读取一个文件,如果文件名包含“文件”。这是我目前使用的代码从s3桶读取给定的文件。

File1_node = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            "s3:// env-files/data/material/filename1.csv"
        ],
        "recurse": True,
    },
    transformation_ctx=" File1_node",
)

File1= File1_node.toDF()

我想通过使用关键字搜索来动态读取文件。例如,如果文件名包含“file”,并且有一个文件名为“filename 1”,则应该读取该文件。如果有多个文件包含相同的关键字,则将它们全部追加。如果有任何方法可以这样做,请让我知道。谢谢!

k10s72fa

k10s72fa1#

您可以使用boto 3 s3 list_objects_v2()来完成此操作。

import boto3
from typing import List

s3_client = boto3.client('s3')

def get_all_filepaths(filename_filter: str, bucket: str, prefix: str) -> List[str]:
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    return [key['Key'] for key in response['Contents'] if filename_filter in key['Key']]

File1_node = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": get_all_filepaths(filename_filter, bucket, prefix),
        "recurse": True,
    },
    transformation_ctx=" File1_node",
)

File1= File1_node.toDF()

使用这个函数,你可以得到一个符合条件的路径列表。我还没有运行这个函数,但是我认为你也应该添加s3://,请检查一下。另外,如果有超过1000个对象,你将不得不实现逻辑来继续使用NextContinuationToken从函数调用的响应中获取数据。
希望这对你有帮助!

相关问题