如何异步处理pysparkDataframe分区

h6my8fg2  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(332)

我最近开始玩pyspark,因为我要处理大量的数据。我现在的任务是通过调用我们正在运行的http服务来获取一些值。我的数据存储在pyspark数据框中,看起来像:

+--------------------+----------+
|                name|        id|
+--------------------+----------+
|xxxxxxxxxxxxxxxxx...|         1|
|yyyyyyyyyyyyyyyyy...|         2|
+--------------------+----------+

我正在使用自定义项调用 id 列并获取新值( new_id )并将其添加到现有的Dataframe中。网络延迟不是问题,因为所有框都在同一个vpn中。我的代码大致如下所示:

def update_df(df):
    udf_get_new_id = udf(get_new_id, ArrayType(StringType()))

    df = df.withColumn('new_id', udf_get_new_id(df.id)[0])
    # My request might fail so I want to log the status code of the request as well
    updated_df = df.withColumn('status_code', udf_get_new_id(df.id)[1])
    return updated_df

def get_new_id(id)
    url = SOME_HTTP_URL
    headers = {'content-type': 'application/json',
               'Connection': 'keep-alive',
               'Content - Length': '125',
               'cache - control': 'no - cache',
               'keep-alive': 'timeout = 5, max = 1000'
               }
    body = {'id': id}
    response_string = requests.post(url, data = json.dumps(body), headers = headers)
    status_code = response_string.status_code
    response_content = ast.literal_eval(response_string.content.decode())
    new_id = response_content['new_id']
    return [new_id, status_code]

我把这些函数称为:

updated_df = update_df(df)

start_time = timer()
updated_df.repartition(100).collect()
end_time = timer()
print(end_time - start_time)

更新后的数据应如下所示:

+--------------------+----------+----------+-----------+
|                name|        id|    new_id|status_code|
+--------------------+----------+----------+-----------+
|xxxxxxxxxxxxxxxxx...|         1|        XX|        200|
|yyyyyyyyyyyyyyyyy...|         2|        YY|        200|
+--------------------+----------+----------+-----------+

我在本例中调用的服务每秒可以处理数百万个请求,我希望尽可能多地利用它,我想知道是否有办法优化当前的代码。如果您感兴趣,这里是我能设法获得的最大处理能力 spark-submit 我用来运行代码的命令):

/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit --conf spark.app.name=test_app --master yarn --deploy-mode client --num-executors 4 --executor-cores 4 --executor-memory 2G --driver-cores 4 --driver-memory 2G

我从我的朋友那里听说他们将要在scala中异步处理分区,我想知道这是我用pysparkDataframe所能做的最好的方法,还是有方法来优化它?任何反馈都非常感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题