我正在尝试使用map在 rdd
例如:
def put(params, payload):
url = "https://{}/{}".format(server, params)
headers = {
'Content-Type': 'application/json' }
response = requests.request("PUT", url, headers=headers, data = payload)
return response.status_code
df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()
但我得到一个错误:
org.apache.spark.api.python.pythonexception:'typeerror:()缺少1个必需的位置参数:“payload”
看起来lambda函数没有得到第二个参数 payload
,不知道为什么。有人能帮帮我吗?
2条答案
按热度按时间zsbz8rwp1#
下面这段代码就是罪魁祸首:
不能在lambda函数中解包Dataframe。这是正确的:
k3bvogb12#
为了向dataframe添加响应,您必须用udf注册put方法,并在dataframe的withcolumn方法中使用它。
这将创建一个名为response的新列,并在其中填充put output。