在S3中,我每天的日志大小约为200 GB。我用boto3分页器编写了一个从S3读取的脚本,并将其写入到Open Search。我面临的问题是脚本速度太慢,无法索引所有数据。在S3中索引1天的日志大约需要4天。我的脚本目前如下所示。我不确定如何将多线程与分页器放在一起,也不确定如何使其线程安全。任何关于这方面的更快替代方案的建议也将不胜感激。
import sys
import json
import logging
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
globalVars = {}
globalVars['Environment'] = ""
globalVars['awsRegion'] = ""
globalVars['Bucket'] = ""
globalVars['osIndexPrefix'] = ""
index_name = ''
globalVars['osHosts'] = ""
# Initialize Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def indexDocElement(obj_list):
try:
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, globalVars['awsRegion'])
client = OpenSearch(
hosts=[{'host': globalVars['osHosts'], 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
client.bulk(body=obj_list, index=index_name)
except Exception as e:
logger.error('ERROR: {0}'.format(str(e)))
logger.error('ERROR: Unable to index line')
def get_s3_object(s3, log_date):
try:
paginator = s3.get_paginator('list_objects_v2')
operation_parameters = {'Bucket': globalVars['Bucket'],
'Prefix': 'eks/'+log_date,
'PaginationConfig': {'PageSize': 100}}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
if page['KeyCount'] > 0:
for item in page['Contents']:
yield item
except Exception as e:
logger.error('ERROR: {0}'.format(str(e)))
logger.error(
'ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format('eks/'+log_date, globalVars['Bucket']))
if __name__ == '__main__':
s3 = boto3.client('s3')
log_date = sys.argv[1]
print("nDate:", log_date)
index_string = '{"index": {}}'
for i in get_s3_object(s3, log_date):
if i['Key'].split('/')[-1]== '':
print(len(i['Key'].split('/')))
continue
obj = s3.get_object(Bucket=globalVars['Bucket'], Key=i['Key'])
j = obj['Body'].read().decode('utf-8')
obj_list = j.split('n')
intermediate_item_list = []
for obj in obj_list:
if obj == '':
continue
obj = json.loads(obj)
obj['@timestamp'] = obj['date']
del obj['kubernetes']['labels']
obj = json.dumps(obj)
intermediate_item_list.append(obj)
item_list = []
for item in intermediate_item_list:
item_list.append(index_string)
item_list.append(item)
item_list = 'n'.join(item_list)
indexDocElement(item_list)
1条答案
按热度按时间kyxcudwk1#
我从here找到了GitHub上的引用。他首先创建密钥列表,然后应用多线程从列表中下载密钥。我在我的脚本中尝试了这一点,并能够将所需的时间从4天减少到~6小时。