如何在python中使用boto3查询cloudwatch日志

6qfn3psc  于 2023-01-24  发布在  Python
关注(0)|答案(4)|浏览(178)

我有一个lambda函数,可以将指标写入Cloudwatch。在写入指标的同时,它会在日志组中生成一些日志。

INFO:: username: simran+test@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local

INFO:: username: simran+test2@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local

INFO:: username: simran+test@example.com ClinicID: 7668 nodename: MacBook-Pro-2.local

INFO:: username: simran+test3@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local

我想查询过去x小时内的AWS日志,其中x可以是12到24小时之间的任何时间,基于任何参数。
例如:
1.查询最近5小时的Cloudwatch日志,其中ClinicID=7667

1.查询过去5小时的Cloudwatch日志,其中ClinicID=7667username='simran+test@example.com'

1.查询过去5小时的Cloudwatch日志,其中username='simran+test@example.com'
我在Python中使用boto3

91zkwejq

91zkwejq1#

您可以使用CloudWatch Logs Insights获取所需信息。
您可以使用start_queryget_query_results API:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html
要启动查询,您将使用(对于问题中的用例2,用例1和用例3类似):

import boto3
from datetime import datetime, timedelta
import time

client = boto3.client('logs')

query = "fields @timestamp, @message | parse @message \"username: * ClinicID: * nodename: *\" as username, ClinicID, nodename | filter ClinicID = 7667 and username='simran+test@example.com'"

log_group = '/aws/lambda/NAME_OF_YOUR_LAMBDA_FUNCTION'

start_query_response = client.start_query(
    logGroupName=log_group,
    startTime=int((datetime.today() - timedelta(hours=5)).timestamp()),
    endTime=int(datetime.now().timestamp()),
    queryString=query,
)

query_id = start_query_response['queryId']

response = None

while response == None or response['status'] == 'Running':
    print('Waiting for query to complete ...')
    time.sleep(1)
    response = client.get_query_results(
        queryId=query_id
    )

响应将包含以下格式的数据(加上一些元数据):

{
  'results': [
    [
      {
        'field': '@timestamp',
        'value': '2019-12-09 17:07:24.428'
      },
      {
        'field': '@message',
        'value': 'username: simran+test@example.com ClinicID: 7667 nodename: MacBook-Pro-2.local\n'
      },
      {
        'field': 'username',
        'value': 'simran+test@example.com'
      },
      {
        'field': 'ClinicID',
        'value': '7667'
      },
      {
        'field': 'nodename',
        'value': 'MacBook-Pro-2.local\n'
      }
    ]
  ]
}
mqkwyuun

mqkwyuun2#

你可以通过cloudWatchlogs客户端和一些编码来实现这一点。你也可以自定义条件或使用JSON模块来获得精确的结果。
编辑
你可以使用describe_log_streams来获取流,如果你只想要最新的流,只需设置limit 1,或者如果你想要多个流,使用for循环来迭代所有流,同时进行过滤,如下所述。

import boto3

    client = boto3.client('logs')

    ## For the latest
    stream_response = client.describe_log_streams(
        logGroupName="/aws/lambda/lambdaFnName", # Can be dynamic
        orderBy='LastEventTime',                 # For the latest events
        limit=1                                  # the last latest event, if you just want one
        )

    latestlogStreamName = stream_response["logStreams"]["logStreamName"]

    response = client.get_log_events(
        logGroupName="/aws/lambda/lambdaFnName",
        logStreamName=latestlogStreamName,
        startTime=12345678,
        endTime=12345678,
    )

    for event in response["events"]:
        if event["message"]["ClinicID"] == "7667":
            print(event["message"])
        elif event["message"]["username"] == "simran+test@example.com":
            print(event["message"])
        #.
        #.
        # more if or else conditions

    ## For more than one Streams, e.g. latest 5
    stream_response = client.describe_log_streams(
        logGroupName="/aws/lambda/lambdaFnName", # Can be dynamic
        orderBy='LastEventTime',                 # For the latest events
        limit=5
        )

    for log_stream in stream_response["logStreams"]:
        latestlogStreamName = log_stream["logStreamName"]

        response = client.get_log_events(
             logGroupName="/aws/lambda/lambdaFnName",
             logStreamName=latestlogStreamName,
             startTime=12345678,
             endTime=12345678,
        )
        ## For example, you want to search "ClinicID=7667", can be dynamic

        for event in response["events"]:
           if event["message"]["ClinicID"] == "7667":
             print(event["message"])
           elif event["message"]["username"] == "simran+test@example.com":
             print(event["message"])
           #.
           #.
           # more if or else conditions

告诉我进展如何。

pokxtpni

pokxtpni3#

我用的是awslogs。如果你安装了它,你就可以了。--watch会跟踪新的日志。

awslogs get /aws/lambda/log-group-1 --start="5h ago" --watch

您可以使用以下命令进行安装

pip install awslogs

要筛选,您可以执行以下操作:

awslogs get /aws/lambda/log-group-1  --filter-pattern '"ClinicID=7667"' --start "5h ago" --timestamp

它还支持多种过滤模式。

awslogs get /aws/lambda/log-group-1  --filter-pattern '"ClinicID=7667"' --filter-pattern '" username=simran+test@abc.com"' --start "5h ago" --timestamp

参考文献:
awslogs
awslogs . PyPI

xwbd5t1u

xwbd5t1u4#

最简单的方法是使用awswrangler:

import awswrangler as wr

# must define this for wrangler to work
boto3.setup_default_session(region_name=region)

df = wr.cloudwatch.read_logs(
    log_group_names=["loggroup"],
    start_time=from_timestamp,
    end_time=to_timestamp,
    query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)

你可以传递一个需要的日志组列表,开始和结束时间。输出是一个包含结果的Pandas数据框。
仅供参考,awswrangler使用boto 3命令,如@dejan answer

相关问题