使用Python从Azure Monitor获取特定警报

cl25kdpy  于 2023-06-24  发布在  Python
关注(0)|答案(2)|浏览(124)

我想使用Python(通过Azure函数)从Azure Monitor获取特定警报。Azure Monitor将为每个事件触发az函数。
目前我使用的是来自azure.mgmt.alertsmanagement.operations模块的get_all,这允许我获取所有警报。也已经测试了get_by_id,但我不得不指定alert_id,而我希望自动获取它。

import logging
import urllib3
import os
import json
import requests
from azure.identity import ClientSecretCredential
from azure.mgmt.alertsmanagement import AlertsManagementClient

subscription_id =""
client_id =""
client_secret =""
tenant_id = ""

credential = ClientSecretCredential(
      tenant_id=tenant_id, 
      client_id=client_id, 
      client_secret=client_secret
)

print("===Auth Azure Monitor===")
client = AlertsManagementClient(
    credential,
    subscription_id
)

print("=== Get alert event from Az Monitor & Post it to monitoring platform === ")
headers = {'Authorization': f'authtoken {token}'}

for alert in client.alerts.get_all():
    if alert.name == "alert_rule_name" :
        attributes = {'CLASS': 'EVENT',
                      'severity': 'CRITICAL',
                      'msg': alert.name,
                      'lastModifiedDateTime': json.dumps(alert.properties.essentials.last_modified_date_time, indent=4, sort_keys=True, default=str)
                      }
        payload = [{'eventSourceHostName': alert.properties.essentials.target_resource_name, 'attributes': attributes}]
        print("JSON_PAYLOAD :", payload)
## Some code here to push the Alert to a monitoring platform ..

请在下面找到Azure Monitor使用get_all发送的json:

{'value': [{'properties': {'essentials': {
'severity': 'Sev2', 
'signalType': 'Metric', 
'alertState': 'New', 
'monitorCondition': 'Fired', 
'monitorService': 'Platform', 
'targetResource': '/subscriptions/sub_id/resourcegroups/rsg_name/providers/microsoft.compute/virtualmachines/vm_name', 
'targetResourceName': 'vm_name', 
'targetResourceGroup': 'rsg_name', 
'targetResourceType': 'virtualmachines', 
'sourceCreatedId': '5f33r_rsg_name_microsoft.insights_metricAlerts_alert_rule_name-1899618006', 
'alertRule': '/subscriptions/sub_id/resourceGroups/rsg_name/providers/microsoft.insights/metricAlerts/alert_rule_name', 
'startDateTime': '2023-05-09T13:32:28.1880147Z', 
'lastModifiedDateTime': '2023-05-09T13:32:28.1880147Z', 
'lastModifiedUserName': 'System', 
'actionStatus': {'isSuppressed': False}, 'description': ''}
}, 
'id': '/subscriptions/sub_id/providers/Microsoft.AlertsManagement/alerts/2222-5555-88888', 
'type': 'Microsoft.AlertsManagement/alerts', 
'name': 'alert_rule_name'},

如您所见,我正在通过[if www.example.com =="alert_rule_name"]进行过滤,但这不是我要查找的内容(我得到了一个事件列表)。alert.name == "alert_rule_name"] and this is not what I'm looking for (I got a list of Events).
当Azure Monitor调用我的函数时,是否有方法从有效负载中获取警报ID?这是为了使用此ID获取特定警报(事件)。
先谢谢你了

wbrvyc0a

wbrvyc0a1#

是否有方法在Azure Monitor
您可以使用下面的代码使用python获取带有payload的Alert id。
您需要在属性中添加**alert.id**,以获取您的特定警报的警报ID。

代码:

import os
import json
import requests
from azure.identity import DefaultAzureCredential
from azure.mgmt.alertsmanagement import AlertsManagementClient

subscription_id ="your subscription id"
client_id =""
client_secret =""
tenant_id = ""

credential = ClientSecretCredential(
      tenant_id=tenant_id, 
      client_id=client_id, 
      client_secret=client_secret
)

print("===Auth Azure Monitor===")
client = AlertsManagementClient(
    credential,
    subscription_id
)
print("=== Get alert event from Az Monitor & Post it to monitoring platform === ")

for alert in client.alerts.get_all():
    if alert.name == "Backup Failure" :
        attributes = {'CLASS': 'EVENT',
                      'severity': 'CRITICAL',
                      'msg': alert.name,
                      'id': alert.id,
                      'lastModifiedDateTime': json.dumps(alert.properties.essentials.last_modified_date_time, indent=4, sort_keys=True, default=str)
                      }
        
        payload = [{'eventSourceHostName': alert.properties.essentials.target_resource_name,'attributes': attributes}]
        print("JSON_PAYLOAD :", payload)

输出:

===Auth Azure Monitor===
=== Get alert event from Az Monitor & Post it to monitoring platform ===
JSON_PAYLOAD : [{'eventSourceHostName': 'mm-automation-runas-account-2', 'attributes': {'CLASS': 'EVENT', 'severity': 'CRITICAL', 'msg': 'aa-test-1', 'id': '/subscriptions/bxxxxf/resourcegroups/management_migration-resources/providers/microsoft.automation/automationaccounts/mm-automation-runas-account-2/providers/Microsoft.AlertsManagement/alerts/3f481155-b808-a188-6exxxxxx', 'lastModifiedDateTime': '"2023-06-14 05:35:14.747028+00:00"'}}]
JSON_PAYLOAD : [{'eventSourceHostName': 'mm-automation-runas-account-2', 'attributes': {'CLASS': 'EVENT', 'severity': 'CRITICAL', 'msg': 'aa-test-1', 'id': '/subscriptions/bxxxxxf/resourcegroups/management_migration-resources/providers/microsoft.automation/automationaccounts/mm-automation-runas-account-2/providers/Microsoft.AlertsManagement/alerts/8cba3e70-c957-4xxxxxxxx', 'lastModifiedDateTime': '"2023-06-13 12:35:13.840749+00:00"'}}]

kmbjn2e3

kmbjn2e32#

Azure Monitor触发下面的Azure函数,该函数仅解析一个事件并将其转发到另一个目标,以便通知支持团队:

import azure.functions as func
import os
import json
import requests
import urllib3
import logging

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info("===== Auth Monitoring Platform =====")
    urllib3.disable_warnings()
    creds = {'username': "user", 'password': "****", 'tenantName': '*'}
    logging.debug(f'Retrieving authorization token')
    resp = requests.post(f'https://{api_url}', json=creds, verify=False)
    if resp.status_code != 200:
            logging.error(f'{resp.headers}')
            logging.error(f'{resp.text}')
            exit(1)
            
    token = resp.json()["response"]["authToken"]

    logging.info("===== Get Alert JSON & Prepare Post to Monitor Platform =====")

    req_body = req.get_json()
    
    msg_detail = {'firedDateTime' : req_body['data']['essentials']['firedDateTime'], 
                   'operator':       req_body['data']['alertContext']['condition']['allOf'][0]['operator'],
                   'threshold':      req_body['data']['alertContext']['condition']['allOf'][0]['threshold'],
                   'metricValue':    req_body['data']['alertContext']['condition']['allOf'][0]['metricValue']
                   }
    headers = {'Authorization': f'authtoken {token}'}
    attributes = {  'CLASS':           'EVENT',
                    'severity':        'CRITICAL',
                    'msg':             req_body['data']['essentials']['alertRule'],
                    'msg_detail':     mc_long_msg,
                    'object':       req_body['data']['essentials']['configurationItems'][0],
                    'object_class': req_body['data']['alertContext']['condition']['allOf'][0]['metricNamespace'],
                    'mc_parameter':    req_body['data']['alertContext']['condition']['allOf'][0]['metricName']
                 }
    
    payload = [{'eventSource': req_body['data']['essentials']['configurationItems'][0], 'attributes': attributes}]
    params = {'param1':"value1" , 'param2':"value2"}
    logging.info(f'Event header: {headers}')
    logging.info(f'Event payload: {payload}')     
    
    response = requests.post(f'https://{api_url}',
                       headers=headers,
                       params=params,
                       json=payload,
                       verify=False
                )
    if response.status_code != 200:
        logging.error(f'{response.headers}')
        logging.error(f'{response.text}')
        exit(1)
    
    logging.debug(f'{response.headers}')
    logging.debug(f'{response.text}')
    logging.info(f'Payload: {payload}')
    
    return func.HttpResponse(
        "This HTTP triggered function executed successfully.",
        status_code=200
    )

相关问题