curl 列出UAC / StoneBranch中工作流的所有依赖项

nfg76nw0  于 2022-11-13  发布在  其他
关注(0)|答案(1)|浏览(125)

我在UAC中有一个工作流,其中包含另一个工作流列表,每个工作流都包含一系列任务。
我想调用一个API来列出所有的工作流和任务以及它们的依赖关系。可以吗?
我只提取到了第一层的信息。
工作流程的结构:

INITIAL_WF:
  |_Inside_WF_1
  |    |
  |    |_ task1
  |    |
  |    |_task2
  |
  |_Inside_WF_2
       |
       |_task1
       |
       |_task2
       |
       |_Inside_WF_2_1
           |           
           |_ task1

API调用:

curl --request GET http://UAC_HOST/uc/resources/workflow/vertices?workflowname=INITIAL_WF --header "Content-Type: application/json" --header "Authorization: {TOKEN}"

此命令将仅给予Inside_WF_1和Inside_WF_2的信息。

ycggw6v2

ycggw6v21#

在UAC UI中右键单击“Workflow Task Commands”(工作流任务命令)下,您可以选择“View Tree”(视图树)以显示以下内容:
查看树报告:

但是,目前无法通过单个API调用打印或访问此文件。这正在考虑在未来版本中使用。
同时,我有一个Python脚本,它可以执行等效的操作:

python .\Controller_WorkflowTree.py --controllerurl https://dev-uac:8499/uc --username ccocksedge --password sbus77 --workflow '#app_collections ${ao_schedule_date}'
    WORKFLOW = #app_collections ${ao_schedule_date}
     Task Name                                          Task Alias                                         Task Type
     Pull_Todays_Inputs                                 None                                               Stored Procedure
     Check_Space_B4_Load                                None                                               System Monitor
     Daily_Returns                                      None                                               Workflow
      Task Name                                          Task Alias                                         Task Type
      Daily_Calc_Returns                                 None                                               Windows
      Report_Inputs                                      None                                               Windows
      Report_Vendors                                     None                                               Windows
      Report_Returns                                     None                                               Windows
     Weekly_Returns                                     None                                               Workflow
      Task Name                                          Task Alias                                         Task Type
      Weekly_Calc_Returns                                None                                               Windows
      Report_Inputs                                      None                                               Windows
      Report_Vendors                                     None                                               Windows
      Report_Returns                                     None                                               Windows
     Monthly_Returns                                    None                                               Workflow
      Task Name                                          Task Alias                                         Task Type
      Monthly_Calc_Returns                               None                                               Windows
      Report_Inputs                                      None                                               Windows
      Report_Returns                                     None                                               Windows
      Report_Vendors                                     None                                               Windows
     Disk_Clean_Up                                      None                                               Windows
     List_Receipts                                      None                                               Windows
     Load_Results                                       None                                               Stored Procedure
     Detect_Vendor_Feed                                 None                                               Remote File Monitor
     Download_Vendor_Feeds                              None                                               File Transfer

Here is the code:

    #!/opt/universal/python/bin/python3
# --
#         Origins: Stonebranch
#          Author: Colin Cocksedge colin.cocksedge@stonebranch.com
#            Date: 22-APR-2020
#
#    Copyright (c) Stonebranch, 2020.  All rights reserved.
#
#         Purpose: Workflow Tree Report
#
version = "1.0"
#           Version History:    1.0     CCO     22-APR-2020         Initial Version   
# --

# -- Main Logic Function
def main():
    ScriptSetup()                                                   # -- Import Required Modules, Setup Logging Format, Set Variables
    indent = 0                                                      # -- Set base indent for formatted tree report     
    print("WORKFLOW = {0}".format(args.workflow))
    WorkflowReport(args.workflow,indent)                            # -- Workflow Tree Report
# --

# -- Import Required Modules, Setup Logging Format, Set Variables 
def ScriptSetup():
    # -- Import required python modules 
    import argparse, logging, sys, urllib
    # -- Import requests module, if error then install requests
    try:
        import requests
    except:
        try:
            from pip import main as pipmain
        except:
            from pip._internal import main as pipmain
        pipmain(
            [
                "install",
                "--trusted-host=pypi.python.org",
                "--trusted-host=pypi.org",
                "--trusted-host=files.pythonhosted.org",
                "requests",
            ]
        )
    global argparse, logging, sys, requests, urllib
    # -- Set Variables 
    parser=argparse.ArgumentParser(description='Purpose : Workflow Tree Report')
    # ## --> 
    parser.add_argument("--controllerurl", default="http://localhost:8080/opswise")
    parser.add_argument("--username", default="ops.admin")
    parser.add_argument("--password", default="secret")
    parser.add_argument("--workflow", default="My_Workflow")
    # ## -->
    parser.add_argument("--loglevel", default="None")
    global args
    args=parser.parse_args()
    # -- Setup Logging
    numeric_level = getattr(logging, args.loglevel.upper(), None)
    logging.basicConfig(format='%(asctime)-15s - %(levelname)-8s - %(message)s', level=numeric_level)
    # -- Print Paramater Values
    logging.debug("Executing version {0} with the following parameters : {1}".format(version, args))
    # -- Strip Trailing / from Remote Controller URL to Avoid 404 Resource Not Found
    args.controllerurl = args.controllerurl.rstrip("/")
    # -- Ignore Https Warnings
    import urllib3
    urllib3.disable_warnings()
# --

# ## --> Functions Go Here

# -- Workflow Tasks 
def WorkflowReport(workflow,indent):
    WorkflowTaskList = GetWorkflowTasks(workflow) # Find All Tasks in Workflow
    print("{0} {1:50} {2:50} {3:30}".format(" "*indent,"Task Name","Task Alias","Task Type"))                       
    for task in WorkflowTaskList:
        # For Each Workflow Task
        taskalias = task['alias']
        taskname = task['task']['value']
        # Get the Task Type
        tasktype = GetTaskType(taskname)  
        print("{0} {1:50} {2:50} {3}".format(" "*indent,taskname,str(taskalias),tasktype))
        # If Task Type = Workflow Then process the subworkflow's tasks
        if tasktype == "Workflow":
            indent = indent + 1 # increment subworkflows tree report level
            WorkflowReport(taskname,indent)
            indent = indent - 1 # decrement tree report level
            
# --
# -- Get All Tasks in Workflow
def GetWorkflowTasks(workflowname):
    logging.debug("Call resources/workflow/vertices API")
    payload = {'workflowname':workflowname}
    query = urllib.parse.urlencode(payload)
    ListWorkflowTasks = apicall(args.username, args.password, "get", "/workflow/vertices?", query, "")
    WorkflowTasks = ListWorkflowTasks.json()
    return(WorkflowTasks)
# -- 
# -- Get Task Type
def GetTaskType(taskname):
    logging.debug("Call resources/task/list API")
    data = {
        "name": taskname
    }
    ListTask = apicall(args.username, args.password, "post", "/task/list", "None", data)
    tasklist = ListTask.json()
    tasktype = tasklist[0]['type']
    return(tasktype)
# --
# -- Call UAC API
def apicall(username, password, method, resource, query, jsondata): 
    if str(query) == "None":
        query=""
    headers = {
        'content-type': 'application/json',
        'Accept': 'application/json'
    }
    uri="{0}/resources{1}{2}".format(args.controllerurl, resource, query)
    try:
        if str(method) == "get":
            response = requests.get(uri, headers=headers, auth=(username, password), verify=False)
        if str(method) == "post":
            response = requests.post(uri, headers=headers, json=jsondata, auth=(username, password), verify=False)
    except:
        logging.error("Error Calling {0} API {1}".format(args.controllerurl, sys.exc_info()))
        sys.exit(1)
    if(response.ok):
        pass
    else:
        logging.critical("{0} Error Code : {1}".format(uri, response.status_code))
        logging.critical("Failed with reason : {}".format(response.text))
        sys.exit(1)
    return(response)
# --
# -->

# -- Execute
main()

相关问题