我有一个带有4列的sparkDataframe: location_string
, locality
, region
,和 country
. 我正在使用googlemap的geocodeapi来解析每个 location_string
然后用结果填充空值 locality
, region
以及 country
领域。
我已经将调用地理编码库的函数设置为一个udf,但是我面临的问题是,当我超过google的api策略的速率限制时,我最终会得到一个“overlimit”响应状态。
以下是sparkDataframe的示例:
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|location_string |locality |region|country|
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|-Tainan City-Tainan, Taiwan |Tainan City |null |TWN |
|093 Cicero, IL |null |null |null |
|1005 US 98 Bypass Suite 7 Columbia, MS 39429 |null |null |null |
|10210 Baltimore Avenue, College Park, MD, US 20740 |College Park|MD |null |
|12 Braintree - Braintree, MA, 02184 |null |null |null |
|1215 E.Main St. #1074 Carbondale, IL 62901, |null |null |null |
|18 Fairview Heights - Fairview Heights, IL, 62208 |null |null |null |
|21000 Hayden Dr, Woodhaven, MI, US 48183 |null |null |null |
|2257 N. Germantown Pkwy in Cordova, TN |null |null |null |
|2335 S. Towne Ave., Pomona, CA, US 91766 |Pomona |CA |null |
|2976-Taylor Ave & Harford Rd (Parkville Shopping Center, Parkville, MARYLAND, UNITED STATES |null |null |null |
|3342 Southwest Military Drive, Texas3342 Southwest Military Drive, San Antonio, TX, 78211, United States|null |null |null |
|444 Cedar St., Suite 201, St. Paul, MN, US 55101 |St. Paul |MN |null |
|4604 Lowe Road, Louisville, KY, US 40220 |Louisville |KY |null |
|4691 Springboro Pike, Moraine, OH, US 45439 |null |null |null |
|50 Hwy 79 Bypass N Ste K Magnolia, AR 71753 |null |null |null |
|5188 Commerce Dr., Baldwin Park, CA, US 91706 |Baldwin Park|CA |null |
|55445 |null |null |null |
|5695 Harvey St, Muskegon, MI 49444 |null |null |null |
|6464 Downing Street, Denver, CO, US 80229 |null |null |null |
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
为了解决这个问题,我有这样一个函数:
def geocoder_decompose_location(location_string):
if not location_string:
return Row('nation', 'state', 'city')(None, None, None)
GOOGLE_GEOCODE_API_KEYS = [key1, key2, key3]
GOOGLE_GEOCODE_API_KEY = random.choice(GOOGLE_GEOCODE_API_KEYS)
attempts = 0
success = False
while status != True and attempts < 5:
result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
attempts += 1
status = result.status
if status == 'OVER_QUERY_LIMIT':
time.sleep(2)
# retry
continue
success = True
if attempts == 5:
print('Daily Limit Reached')
return Row('nation', 'state', 'city')(result.country, result.state, result.city)
但它似乎没有像预期的那样在sparkDataframe上工作。任何指导都将不胜感激!
1条答案
按热度按时间neekobn81#
解决这个问题最简单的方法是用指数退避代替睡眠函数。使用。。。
time.sleep(math.exp(尝试次数)
这将使您的读取速率降到低于限制的位置。还可以通过添加.coalesce或.repartition(max\u parallelism)来控制sparks的最大并行性