csv AWS lambda读取zip文件执行验证,如果验证通过,则解压缩到s3存储桶

6g8kf2rb  于 2022-12-06  发布在  其他
关注(0)|答案(2)|浏览(126)

我有一个要求,其中一个zip文件到达s3桶,我需要写一个lambda使用python读取zip文件执行一些验证和解压另一个S3桶。
Zip文件包含以下内容:

a.csv b.csv c.csv trigger_file.txt

trigger_file.txt --包含zip中的文件名和记录计数(例如:(a.中文摘要:120,B.中文摘要:10,c.中文摘要:50)
因此,使用lambda,我需要读取触发器文件,检查zip文件夹中的文件数是否等于触发器文件中提到的文件数,如果将解压缩传递到s3桶。
下面的代码我已经准备:

def write_to_s3(config_dict):
    inp_bucket = config_dict["inp_bucket"]
    inp_key = config_dict["inp_key"]
    out_bucket = config_dict["out_bucket"]
    des_key = config_dict["des_key"]
    processed_key = config_dict["processed_key"]

    obj = S3_CLIENT.get_object(Bucket=inp_bucket, Key=inp_key)
    putObjects = []
    with io.BytesIO(obj["Body"].read()) as tf:
        # rewind the file
        tf.seek(0)

    # Read the file as a zipfile perform transformations and process the members
    with zipfile.ZipFile(tf, mode='r') as zipf:
        for file in zipf.infolist():
            fileName = file.filename
            print("file name before while loop :",fileName)
            try:
                found = False
                while not found :
                    if fileName == "Trigger_file.txt" :
                        with zipf.open(fileName , 'r') as thefile:
                            my_list = [i.decode('utf8').split(' ') for i in thefile]
                            my_list = str(my_list)[1:-1]
                            print("my_list :",my_list)
                            print("fileName :",fileName)
                            found = True
                            break
                            thefile.close()
                    else:
                        print("Trigger file not found ,try again")
            except Exception as exp_handler:
                    raise exp_handler

            if 'csv' in fileName :
                try:
                    if fileName in my_list:
                        print("Validation Success , all files in Trigger file  are present procced for extraction")
                    else:
                        print("Validation Failed")
                except Exception as exp_handler:
                    raise exp_handler

    # *****FUNCTION TO UNZIP ********

def lambda_handler(event, context):
    try:
        inp_bucket = event['Records'][0]['s3']['bucket']['name']
        inp_key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
        config_dict = build_conf_obj(os.environ['config_bucket'],os.environ['config_file'], os.environ['param_name'])
        write_to_s3(config_dict)
    except Exception as exp_handler:
        print("ERROR")

一切都进行得很顺利,我唯一面临的问题是验证部分,我认为while循环是错误的,因为它将进入无限循环。
期望值:
在zip文件夹中搜索trigger_file.txt,如果找到,则中断循环,进行验证并将其解压缩到s3文件夹。如果未找到,则继续搜索,直到指令结束。
错误输出(超时):

Response:
{
  "errorMessage": "2020-06-16T20:09:06.168Z 39253b98-db87-4e65-b288-b585d268ac5f Task timed out after 60.06 seconds"
}

Request ID:
"39253b98-db87-4e65-b288-b585d268ac5f"

Function Logs:
 again
Trigger file not found ,try again
Trigger file not found ,try again
Trigger file not found ,try again
Trigger file not found ,try again
Trigger file not found ,trEND RequestId: 39253b98-db87-4e65-b288-b585d268ac5f
REPORT RequestId: 39253b98-db87-4e65-b288-b585d268ac5f  Duration: 60060.06 ms   Billed Duration: 60000 ms   Memory Size: 3008 MB    Max Memory Used: 83 MB  Init Duration: 389.65 ms    
2020-06-16T20:09:06.168Z 39253
uhry853o

uhry853o1#

在以下代码的while循环中,如果fileName不是"Trigger_file.txt",则会福尔斯无限循环。

found = False
while not found:
    if fileName == "Trigger_file.txt":
        with zipf.open(fileName , 'r') as thefile:
            my_list = [i.decode('utf8').split(' ') for i in thefile]
            my_list = str(my_list)[1:-1]
            print("my_list :",my_list)
            print("fileName :",fileName)
            found = True
            break
            thefile.close()
    else:
        print("Trigger file not found ,try again")

我认为可以用下面的代码替换部分write_to_s3函数代码:

def write_to_s3(config_dict):

    ######################
    #### Do something ####
    ######################    

    # Read the file as a zipfile perform transformations and process the members
    with zipfile.ZipFile(tf, mode='r') as zipf:
        found = False
        for file in zipf.infolist():
            fileName = file.filename
            if fileName == "Trigger_file.txt":
                with zipf.open(fileName, 'r') as thefile:
                    my_list = [i.decode('utf8').split(' ') for i in thefile]
                    my_list = str(my_list)[1:-1]
                    print("my_list :", my_list)
                    print("fileName :", fileName)
                    found = True
                    thefile.close()
                    break

        if found is False:
            print("Trigger file not found ,try again")
            return

        for file in zipf.infolist():
            fileName = file.filename
            if 'csv' in fileName:
                if fileName not in my_list:
                    print("Validation Failed")
                    return

        print("Validation Success , all files in Trigger file  are present procced for extraction")

    # *****FUNCTION TO UNZIP ********
anauzrmj

anauzrmj2#

将压缩后的文件读入缓冲区,使用zipfile库提取所有文件名,然后在for循环中添加if语句:file in zipped.namelist():(不带扩展名)。

zipped_file = s3_resource.Object(bucket_name=sourcebucketname, key=filekey)
        buffer = BytesIO(zipped_file.get()["Body"].read())
        zipped = zipfile.ZipFile(buffer)
        
        for file in zipped.namelist():
            logger.info(f'current file in zipfile: {file}')
            final_file_path = file + '.extension'

            with zipped.open(file, "r") as f_in:
                content = f_in.read()
                destinationbucket.upload_fileobj(io.BytesIO(content),
                                                        final_file_path,
                                                        ExtraArgs={"ContentType": "text/plain"}
                                                )

这里还有一个教程:https://betterprogramming.pub/unzip-and-gzip-incoming-s3-files-with-aws-lambda-f7bccf0099c9

相关问题