如何将多线程与boto3 S3分页器放在一起?

s71maibg  于 2022-10-06  发布在  ElasticSearch
关注(0)|答案(1)|浏览(145)

在S3中,我每天的日志大小约为200 GB。我用boto3分页器编写了一个从S3读取的脚本,并将其写入到Open Search。我面临的问题是脚本速度太慢,无法索引所有数据。在S3中索引1天的日志大约需要4天。我的脚本目前如下所示。我不确定如何将多线程与分页器放在一起,也不确定如何使其线程安全。任何关于这方面的更快替代方案的建议也将不胜感激。

import sys
import json
import logging
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

globalVars = {}
globalVars['Environment'] = ""
globalVars['awsRegion'] = ""
globalVars['Bucket'] = ""
globalVars['osIndexPrefix'] = ""
index_name = ''
globalVars['osHosts'] = ""

# Initialize Logger

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def indexDocElement(obj_list):
    try:
        credentials = boto3.Session().get_credentials()
        auth = AWSV4SignerAuth(credentials, globalVars['awsRegion'])

        client = OpenSearch(
            hosts=[{'host': globalVars['osHosts'], 'port': 443}],
            http_auth=auth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )

        client.bulk(body=obj_list, index=index_name)

    except Exception as e:
        logger.error('ERROR: {0}'.format(str(e)))
        logger.error('ERROR: Unable to index line')

def get_s3_object(s3, log_date):
    try:
        paginator = s3.get_paginator('list_objects_v2')
        operation_parameters = {'Bucket': globalVars['Bucket'],
                                'Prefix': 'eks/'+log_date,
                                'PaginationConfig': {'PageSize': 100}}
        page_iterator = paginator.paginate(**operation_parameters)

        for page in page_iterator:
            if page['KeyCount'] > 0:
                for item in page['Contents']:
                    yield item

    except Exception as e:
        logger.error('ERROR: {0}'.format(str(e)))
        logger.error(
            'ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format('eks/'+log_date, globalVars['Bucket']))

if __name__ == '__main__':

    s3 = boto3.client('s3')

    log_date = sys.argv[1]
    print("nDate:", log_date)

    index_string = '{"index": {}}'

    for i in get_s3_object(s3, log_date):
        if i['Key'].split('/')[-1]== '':
            print(len(i['Key'].split('/')))
            continue
        obj = s3.get_object(Bucket=globalVars['Bucket'], Key=i['Key'])
        j = obj['Body'].read().decode('utf-8')
        obj_list = j.split('n')
        intermediate_item_list = []
        for obj in obj_list:
            if obj == '':
                continue
            obj = json.loads(obj)
            obj['@timestamp'] = obj['date']
            del obj['kubernetes']['labels']
            obj = json.dumps(obj)
            intermediate_item_list.append(obj)

        item_list = []
        for item in intermediate_item_list:
            item_list.append(index_string)
            item_list.append(item)

        item_list = 'n'.join(item_list)
        indexDocElement(item_list)
kyxcudwk

kyxcudwk1#

我从here找到了GitHub上的引用。他首先创建密钥列表,然后应用多线程从列表中下载密钥。我在我的脚本中尝试了这一点,并能够将所需的时间从4天减少到~6小时。

相关问题