python-3.x 如果满足条件,则修改标志状态

yhuiod9q  于 2023-03-24  发布在  Python
关注(0)|答案(2)|浏览(111)

下面是API微服务的JSON输出。

{
       "case": "2nmi",
       "id": "6.00c",
       "key": "subN",
       "Brand":[
         {
             "state": "Active",
             "name": "Apple",
             "date": "2021-01-20T08:35:33.382532",
             "Loc": "GA",
          },
          {
             "state": "Disabled",
             "name": "HP",
             "date": "2018-01-09T08:25:90.382",
             "Loc": "TX",
          },
          
          {
             "state": "Active",
             "name": "Acer",
             "date": "2022-01-2T8:35:03.5432",
             "Loc": "IO"
          },
          {
             "state": "Booted",
             "name": "Toshiba",
             "date": "2023-09-29T9:5:03.3298",
             "Loc": "TX"
          }
       ],
       "DHL_ID":"a3288ec45c82"
    }

列出持有笔记本电脑品牌

my_list = ["apple", "hp"]

我正在尝试编写一个脚本,以检查my_list列表中的项目是否可作为API微服务的流输出的一部分(JSON),如果是,验证是否,这些项目(apple/hp)在10分钟的超时内处于Active状态。如果单个项目(applehp)在10分钟内未处于Active状态,则脚本失败。
状态(Sales['state'])可以像Active、Booted、Disabled和Purged一样变化,并且my_list可以具有任意数量的条目。
不确定的逻辑,我是否应该使用多个标志来检查列表中的单个项目的状态或相应地操纵单个标志。

def validate_status():
    all_brand_status = False # flag to check if all the Brand's state are Active.
    Sales = func_streaming_logs_json(args) # JSON output is captured from the microservice using this function.
    for sale in Sales['Brand']:
        if sale['name'].lower() in my_list and sale['state'] == "Active":
                all_brand_status = True
            else:
                print(f"{sale['name']} status: {sale['state']}") 
    return all_brand_status

# Timeout Function
def timeOut(T): 
    start_time = datetime.now()
    stop_time = start_time + timedelta(minutes=T)
    while True:
        success = validate_status()
        if success:
            print("say script is successful blah blah") 
            break
        start_time = datetime.now()
        if start_time > stop_time:
            print("Timeout")
            sys.exit(1)

if __name__ == '__main__':
    timeOut(10)
iklwldmw

iklwldmw1#

只要my_list中的Sales中的最后一个品牌是Active,您的代码将返回True。您应该首先设置all_brand_status = True,然后如果my_list中的任何品牌不活动,则将其设置为False

def validate_status():
    all_brand_status = True # flag to check if all the Brand's state are Active.
    Sales = func_streaming_logs_json(args) # JSON output is captured from the microservice using this function.
    for sale in Sales['Brand']:
        if sale['name'].lower() in my_list and sale['state'] != "Active":
            all_brand_status = False
            print(f"{sale['name']} status: {sale['state']}") 
    return all_brand_status

注意,你也可以通过操作逻辑来使用all

def validate_status():
    Sales = func_streaming_logs_json(args) # JSON output is captured from the microservice using this function.
    all_brand_status = all(sale['state'] == 'Active' for sale in Sales if sale['name'].lower() in my_list)
    return all_brand_status
eoigrqb6

eoigrqb62#

如果你计算活跃的品牌数量(在你的列表中),那么如果计数等于 my_list 的长度,则满足你的条件

from datetime import datetime, timedelta

SAMPLE = {
       "case": "2nmi",
       "id": "6.00c",
       "key": "subN",
       "Brand":[
         {
             "state": "Active",
             "name": "Apple",
             "date": "2021-01-20T08:35:33.382532",
             "Loc": "GA",
          },
          {
             "state": "Disabled",
             "name": "HP",
             "date": "2018-01-09T08:25:90.382",
             "Loc": "TX",
          },
          
          {
             "state": "Active",
             "name": "Acer",
             "date": "2022-01-2T8:35:03.5432",
             "Loc": "IO"
          },
          {
             "state": "Booted",
             "name": "Toshiba",
             "date": "2023-09-29T9:5:03.3298",
             "Loc": "TX"
          }
       ],
       "DHL_ID":"a3288ec45c82"
    }

ARGS = None
BRANDS = {'apple', 'hp'}

def func_streaming_logs_json(args=None):
    return SAMPLE

def validate_status():
    sales = func_streaming_logs_json(ARGS)
    assert isinstance(sales, dict)
    check = len(BRANDS)
    assert check > 0
    for item in sales.get('Brand', []):
        name = item.get('name', '')
        state = item.get('state', '')
        if name.lower() in BRANDS and state == 'Active':
            check -= 1
            if check == 0:
                return True
        else:
            print(f'{name} status: {state}')
    return False

def process(minutes):
    stop = datetime.now() + timedelta(minutes=minutes)
    while datetime.now() < stop:
        if validate_status():
            print('Success')
            break

if __name__ == '__main__':
    process(10)

注:

这里显示的代码会“重击”你的CPU,永远不会结束。你需要通过替换func_streaming_logs_json()的真实的实现来调整它

相关问题