azure 如何调用databricks rest API列出作业运行

xmakbtuz  于 2023-10-22  发布在  其他
关注(0)|答案(2)|浏览(142)

我目前正在开发一个Python脚本来检索昨天执行的所有作业的完整列表。然而,我遇到了一个脚本的分页机制使用令牌的问题。尽管我尝试循环遍历分页过程,结果输出保持不变。

代码如下

import requests
import pandas as pd
import math
import datetime
import json

def fetch_and_process_job_runs(base_uri, api_token, params):
    endpoint = '/api/2.1/jobs/runs/list'
    headers = {'Authorization': f'Bearer {api_token}'}
    
    all_data = []  # To store all the data from multiple pages
    
    while True:
#         print(params)
        response = requests.get(base_uri + endpoint, headers=headers, params=params)
        response_json = response.json()
        
        data = []
        for run in response_json["runs"]:
            start_time_ms = run["start_time"]
            start_time_seconds = start_time_ms / 1000
            start_time_readable = datetime.datetime.fromtimestamp(start_time_seconds).strftime('%Y-%m-%d %H:%M:%S')
            data.append({
                "job_id": run["job_id"],
                "creator_user_name": run["creator_user_name"],
                "run_name": run["run_name"],
                "run_page_url": run["run_page_url"],
                "run_id": run["run_id"],
                "execution_duration_in_mins": math.ceil(int(run.get('execution_duration')) / (1000 * 60)),
                "result_state": run["state"].get("result_state"),
                "start_time": start_time_readable
            })
        
        all_data.extend(data)
        df = pd.DataFrame(all_data)
        print(df)
        
        if response_json.get("has_more") == True:
            next_page_token = response_json.get("next_page_token")
            params['next_page_token'] = next_page_token
        else:
            break
    
    df = pd.DataFrame(all_data)
    return df

# Replace with your actual values
now = datetime.datetime.utcnow()
yesterday = now - datetime.timedelta(days=1)
start_time_from = int(yesterday.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()) * 1000
start_time_to = int(yesterday.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()) * 1000
        
params = {
#      "start_time_from": start_time_from,
#      "start_time_to": start_time_to,
     "expand_tasks": True
}
baseURI = 'https://adb-xxxxxxxxxxxxxx.azuredatabricks.net'
apiToken = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'

result_df = fetch_and_process_job_runs(baseURI, apiToken, params)
print(result_df)

请帮帮我

wgx48brx

wgx48brx1#

我注意到next_token的值在API响应中没有改变,然后发现代码中有一个非常小的错误。在请求中传递的参数是page_token,而不是next_page_token
根据https://docs.databricks.com/api/workspace/jobs/list的文档,

page_token string

使用上一个请求返回的next_page_tokenprev_page_token分别列出下一页或上一页作业。
因此,params['next_page_token']需要更改为params['page_token']

flvlnr44

flvlnr442#

最好的方法是使用Databricks SDK--它将隐藏API细节,如果将来发生更改,那么您的代码将不需要更改。它很简单:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

job_list = w.jobs.list(expand_tasks=False)

加上它automatically works with different authentication methods等等。不同的工具。

相关问题