我正在尝试将另一列添加到一个大型数据集中,该数据集中存储了来自较小csv的最接近的poid。我调用两个函数:第一个用来计算任意两点之间的距离。这将返回最接近该位置的poi的id。
我的问题是,当调用withcolumn转换时,会出现如下酸洗错误:
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o41.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
示例数据集:
第一份文件:
+-------+--------------------+-------+--------+------------+--------+---------+----------+
| _ID| TimeSt|Country|Province| City|Latitude|Longitude|ClosestPOI|
+-------+--------------------+-------+--------+------------+--------+---------+----------+
|4516516|2017-06-21 00:00:...| CA| ON| Waterloo|43.49347|-80.49123|POID |
|4516547|2017-06-21 18:00:...| CA| ON| London|42.93990|-81.27090|POID |
|4516550|2017-06-21 15:00:...| CA| ON| Guelph|43.57760|-80.22010|POID |
|4516600|2017-06-21 15:00:...| CA| ON| Stratford|43.37160|-80.97730|POID |
|4516613|2017-06-21 15:00:...| CA| ON| Stratford|43.37160|-80.97730|POID |
整个第二个文件:
+-----+----------+------------+
|POIID| Latitude| Longitude|
+-----+----------+------------+
| POI1| 53.546167| -113.485734|
| POI2| 53.546167| -113.485734|
| POI3| 45.521629| -73.566024|
| POI4| 45.224830| -63.232729|
+-----+----------+------------+
下面是我使用的代码:
spark = SparkSession.builder.appName("EQProject").getOrCreate()
def haversine(lat1,lon1,lat2,lon2):
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
return c * r
def ClosestPoi(rlat, rlon):
plat = float(dfPoi.collect()[1][1])
plon = float(dfPoi.collect()[1][2])
mindist = haversine(rlat,rlon,poilat,poilon)
for p in dfPoi.collect():
plon = float(p.Longitude)
plat = float(p.Latitude)
rlon = float(rlon)
rlat = float(rlat)
rdist = haversine(rlat, rlon, plat, plon)
if rdist <= mindist:
mindist = rdist
cpoi = p.PoiID
return cpoi
ClosestPoint = udf(ClosestPoi)
df = spark.read.csv("DataSample.csv", header=True)
dfPoi = spark.read.csv("POIList.csv", header=True)
df = df.toDF(*['ID', 'Time','Country','Province','City','Latitude','Longitude'])
dfPoi = dfPoi.toDF(*['PoiID','Latitude','Longitude'])
df = df.dropDuplicates()
df2 = df.withColumn('CPoi' , ClosestPoint(df.Latitude, df.Longitude))
spark.stop()
暂无答案!
目前还没有任何答案,快来回答吧!