我最近开始玩pyspark,因为我要处理大量的数据。我现在的任务是通过调用我们正在运行的http服务来获取一些值。我的数据存储在pyspark数据框中,看起来像:
+--------------------+----------+
| name| id|
+--------------------+----------+
|xxxxxxxxxxxxxxxxx...| 1|
|yyyyyyyyyyyyyyyyy...| 2|
+--------------------+----------+
我正在使用自定义项调用 id
列并获取新值( new_id
)并将其添加到现有的Dataframe中。网络延迟不是问题,因为所有框都在同一个vpn中。我的代码大致如下所示:
def update_df(df):
udf_get_new_id = udf(get_new_id, ArrayType(StringType()))
df = df.withColumn('new_id', udf_get_new_id(df.id)[0])
# My request might fail so I want to log the status code of the request as well
updated_df = df.withColumn('status_code', udf_get_new_id(df.id)[1])
return updated_df
def get_new_id(id)
url = SOME_HTTP_URL
headers = {'content-type': 'application/json',
'Connection': 'keep-alive',
'Content - Length': '125',
'cache - control': 'no - cache',
'keep-alive': 'timeout = 5, max = 1000'
}
body = {'id': id}
response_string = requests.post(url, data = json.dumps(body), headers = headers)
status_code = response_string.status_code
response_content = ast.literal_eval(response_string.content.decode())
new_id = response_content['new_id']
return [new_id, status_code]
我把这些函数称为:
updated_df = update_df(df)
start_time = timer()
updated_df.repartition(100).collect()
end_time = timer()
print(end_time - start_time)
更新后的数据应如下所示:
+--------------------+----------+----------+-----------+
| name| id| new_id|status_code|
+--------------------+----------+----------+-----------+
|xxxxxxxxxxxxxxxxx...| 1| XX| 200|
|yyyyyyyyyyyyyyyyy...| 2| YY| 200|
+--------------------+----------+----------+-----------+
我在本例中调用的服务每秒可以处理数百万个请求,我希望尽可能多地利用它,我想知道是否有办法优化当前的代码。如果您感兴趣,这里是我能设法获得的最大处理能力 spark-submit
我用来运行代码的命令):
/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit --conf spark.app.name=test_app --master yarn --deploy-mode client --num-executors 4 --executor-cores 4 --executor-memory 2G --driver-cores 4 --driver-memory 2G
我从我的朋友那里听说他们将要在scala中异步处理分区,我想知道这是我用pysparkDataframe所能做的最好的方法,还是有方法来优化它?任何反馈都非常感谢。
暂无答案!
目前还没有任何答案,快来回答吧!