我有一个spark数据框 sdf
使用如下所示的gps点:
d = {'user': ['A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C', 'C', 'A', 'A'],
'lat': [37.75243634842733, 37.75344580658182, 37.75405656449232, 37.753649393112181,37.75409897804892, 37.753937806404586, 37.72767062183685, 37.72710631810977, 37.72605407110467, 37.71141865080228, 37.712199505873926, 37.713285899241896, 37.71428740401767, 37.712810604103346, 37.75405656449232, 37.753649393112181],
'lon': [-122.41924881935118, -122.42006421089171, -122.419216632843, -122.41784334182738, -122.4169099330902, -122.41549372673035, -122.3878937959671, -122.3884356021881, -122.38841414451599, -122.44688630104064, -122.44474053382874, -122.44361400604248, -122.44260549545288, -122.44156479835509, -122.4169099330902, -122.41549372673035],
'date': ['2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-04', '2018-02-04'],
'radius': [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]}
pdf = pd.DataFrame(data=d)
sdf = spark.createDataFrame(pdf)
+----+------------------+-------------------+----------+------+
|user| lat| lon| date|radius|
+----+------------------+-------------------+----------+------+
| A| 37.75243634842733|-122.41924881935118|2018-02-03| 10|
| A| 37.75344580658182|-122.42006421089171|2018-02-03| 10|
| A| 37.75405656449232| -122.419216632843|2018-02-03| 10|
| A|37.753649393112184|-122.41784334182738|2018-02-03| 10|
| A| 37.75409897804892| -122.4169099330902|2018-02-03| 10|
| A|37.753937806404586|-122.41549372673035|2018-02-03| 10|
| B| 37.72767062183685| -122.3878937959671|2018-02-03| 10|
| B| 37.72710631810977| -122.3884356021881|2018-02-03| 10|
| B| 37.72605407110467|-122.38841414451599|2018-02-03| 10|
| C| 37.71141865080228|-122.44688630104064|2018-02-03| 10|
| C|37.712199505873926|-122.44474053382874|2018-02-03| 10|
| C|37.713285899241896|-122.44361400604248|2018-02-03| 10|
| C| 37.71428740401767|-122.44260549545288|2018-02-03| 10|
| C|37.712810604103346|-122.44156479835509|2018-02-03| 10|
| A| 37.75405656449232| -122.4169099330902|2018-02-04| 10|
| A|37.753649393112184|-122.41549372673035|2018-02-04| 10|
+----+------------------+-------------------+----------+------+
由于sparkDataframe包含不同用户在不同日期生成的不同gps轨迹,因此我想编写一个函数,通过这个df循环并将相应的坐标集提供给(osrm)请求 date
和per user
一组而不是一次全部。
from typing import Dict, Any, List, Tuple
import pyspark.sql.functions as F
import requests
# Format coordinates into a concatenated string formatted for the OSRM server
def format_coords(df):
coords = df.agg(F.concat_ws(';', F.collect_list(F.format_string('%f,%f', 'lon', 'lat')))).head()[0]
return(coords)
# Format dictionary of additional options to the OSRM request into a concatenated string format.
def format_options(options: Dict[str, str]) -> str:
options = "&".join([f"{k}={v}" for k, v in options.items()])
return options
# Format radiuses into a concatenated string formatted for the OSRM server
def format_radiuses(df):
radiuses = "&radiuses=" + df.agg(F.concat_ws(';', F.collect_list(F.format_string('%d', 'radius')))).head()[0]
return(radiuses)
# Make request
def make_request(coords, radiuses, options):
coords = format_coords(coords)
radiuses = format_radiuses(radiuses)
options = format_options(options) if options else ""
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}{radiuses}"
r = requests.get(url)
return r.json()
不幸的是,运行代码blow会返回 TypeError: 'GroupedData' object is not iterable
. 我错过了什么:
output = {}
for trip, g in sdf.groupBy('date', 'user'):
output[trip] = make_request(coords = sdf[['lat', 'lon']],
radiuses = sdf[['radius']],
options = {'overview':'full',
'geometries': 'polyline6',
'annotations': 'nodes'})
1条答案
按热度按时间deyfvvtc1#
您可以尝试在分组方式之后聚合字符串: