在pyspark中,我有一个超过400万行的Dataframe。
我使用withcolumn函数向dataframe添加一列。每行的列值在自定义项中定义。
udf包含对api的请求(使用python模块“requests”)。
这个步骤的处理非常慢,将列添加到请求的结果中。
例如,如果启动作业时使用返回“foo”的自定义项(而不是请求),则处理时间不超过5/6分钟(在此之前还有一些工作)。
然而,对于发送请求的udf,处理需要5到6个小时。
有没有一种有效的方法从spark作业发送http请求?
我当前使用的代码:
def get_driving_time(lat_soc, lon_soc, lat_agen, lon_agen):
url = MYURL + '{},{};{},{}'.format(lon_soc, lat_soc, lon_agen, lat_agen)
resp = requests.get(url, verify=False).json()
if resp['code'].lower() == 'ok':
return resp['routes'][0]['duration']
else:
log("ERROR", "API did not return a 'ok' code")
return "ERROR"
calculate_driving_time_udf = udf(get_driving_time)
data_with_distance_and_time = data_with_distance_final.withColumn("driving_time",
calculate_driving_time_udf(data_with_distance_final.lat_soc,
data_with_distance_final.lon_soc,
data_with_distance_final.lat_agen,
data_with_distance_final.lon_agen))
暂无答案!
目前还没有任何答案,快来回答吧!