如何使用map在pyspark中进行restapi调用

iibxawm4  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(541)

我正在尝试使用map在 rdd 例如:

def put(params, payload):

  url = "https://{}/{}".format(server, params)
  headers = {
    'Content-Type': 'application/json' }

  response = requests.request("PUT", url, headers=headers, data = payload) 
  return response.status_code

df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()

但我得到一个错误:
org.apache.spark.api.python.pythonexception:'typeerror:()缺少1个必需的位置参数:“payload”
看起来lambda函数没有得到第二个参数 payload ,不知道为什么。有人能帮帮我吗?

zsbz8rwp

zsbz8rwp1#

下面这段代码就是罪魁祸首:

df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()

不能在lambda函数中解包Dataframe。这是正确的:

df.select("params", "payload").rdd.map(lambda row: put(row[0], row[1])).collect()
k3bvogb1

k3bvogb12#

为了向dataframe添加响应,您必须用udf注册put方法,并在dataframe的withcolumn方法中使用它。

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

putUdf = udf(put, StringType())

df = df.withColumn("response", putUdf(df.params, df.payload))

这将创建一个名为response的新列,并在其中填充put output。

相关问题