spark中有没有有效的方法来查询api?

w8f9ii69  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(215)

在pyspark中,我有一个超过400万行的Dataframe。
我使用withcolumn函数向dataframe添加一列。每行的列值在自定义项中定义。
udf包含对api的请求(使用python模块“requests”)。
这个步骤的处理非常慢,将列添加到请求的结果中。
例如,如果启动作业时使用返回“foo”的自定义项(而不是请求),则处理时间不超过5/6分钟(在此之前还有一些工作)。
然而,对于发送请求的udf,处理需要5到6个小时。
有没有一种有效的方法从spark作业发送http请求?
我当前使用的代码:

def get_driving_time(lat_soc, lon_soc, lat_agen, lon_agen):
    url = MYURL + '{},{};{},{}'.format(lon_soc, lat_soc, lon_agen, lat_agen)
    resp = requests.get(url, verify=False).json()
    if resp['code'].lower() == 'ok':
        return resp['routes'][0]['duration']
    else:
        log("ERROR", "API did not return a 'ok' code")
        return "ERROR"

calculate_driving_time_udf = udf(get_driving_time)

data_with_distance_and_time = data_with_distance_final.withColumn("driving_time", 
     calculate_driving_time_udf(data_with_distance_final.lat_soc,
     data_with_distance_final.lon_soc,
     data_with_distance_final.lat_agen,
     data_with_distance_final.lon_agen))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题