pyspark:如何在不达到速率限制的情况下调用api/web服务?

zaq34kh6  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(418)

我有一个带有4列的sparkDataframe: location_string , locality , region ,和 country . 我正在使用googlemap的geocodeapi来解析每个 location_string 然后用结果填充空值 locality , region 以及 country 领域。
我已经将调用地理编码库的函数设置为一个udf,但是我面临的问题是,当我超过google的api策略的速率限制时,我最终会得到一个“overlimit”响应状态。
以下是sparkDataframe的示例:

+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|location_string                                                                                         |locality    |region|country|
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|-Tainan City-Tainan, Taiwan                                                                             |Tainan City |null  |TWN    |
|093 Cicero, IL                                                                                          |null        |null  |null   |
|1005 US 98 Bypass Suite 7 Columbia, MS 39429                                                            |null        |null  |null   |
|10210  Baltimore Avenue, College Park, MD, US 20740                                                     |College Park|MD    |null   |
|12 Braintree - Braintree, MA, 02184                                                                     |null        |null  |null   |
|1215 E.Main St. #1074 Carbondale, IL 62901,                                                             |null        |null  |null   |
|18 Fairview Heights - Fairview Heights, IL, 62208                                                       |null        |null  |null   |
|21000 Hayden Dr, Woodhaven, MI, US 48183                                                                |null        |null  |null   |
|2257 N. Germantown Pkwy in Cordova, TN                                                                  |null        |null  |null   |
|2335 S. Towne Ave., Pomona, CA, US 91766                                                                |Pomona      |CA    |null   |
|2976-Taylor Ave & Harford Rd (Parkville Shopping Center, Parkville, MARYLAND, UNITED STATES             |null        |null  |null   |
|3342 Southwest Military Drive, Texas3342 Southwest Military Drive, San Antonio, TX, 78211, United States|null        |null  |null   |
|444 Cedar St., Suite 201, St. Paul, MN, US 55101                                                        |St. Paul    |MN    |null   |
|4604 Lowe Road, Louisville, KY, US 40220                                                                |Louisville  |KY    |null   |
|4691 Springboro Pike, Moraine, OH, US 45439                                                             |null        |null  |null   |
|50 Hwy 79 Bypass N Ste K Magnolia, AR 71753                                                             |null        |null  |null   |
|5188 Commerce Dr., Baldwin Park, CA, US 91706                                                           |Baldwin Park|CA    |null   |
|55445                                                                                                   |null        |null  |null   |
|5695 Harvey St, Muskegon, MI 49444                                                                      |null        |null  |null   |
|6464 Downing Street, Denver, CO, US 80229                                                               |null        |null  |null   |
+--------------------------------------------------------------------------------------------------------+------------+------+-------+

为了解决这个问题,我有这样一个函数:

def geocoder_decompose_location(location_string):
    if not location_string:
        return Row('nation', 'state', 'city')(None, None, None)

    GOOGLE_GEOCODE_API_KEYS = [key1, key2, key3]

    GOOGLE_GEOCODE_API_KEY = random.choice(GOOGLE_GEOCODE_API_KEYS)

    attempts = 0
    success = False
    while status != True and attempts < 5:
        result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
        attempts += 1
        status = result.status
        if status == 'OVER_QUERY_LIMIT':
            time.sleep(2)

            # retry
            continue

        success = True

    if attempts == 5:
        print('Daily Limit Reached')

    return Row('nation', 'state', 'city')(result.country, result.state, result.city)

但它似乎没有像预期的那样在sparkDataframe上工作。任何指导都将不胜感激!

neekobn8

neekobn81#

解决这个问题最简单的方法是用指数退避代替睡眠函数。使用。。。
time.sleep(math.exp(尝试次数)
这将使您的读取速率降到低于限制的位置。还可以通过添加.coalesce或.repartition(max\u parallelism)来控制sparks的最大并行性

相关问题