add列,其中包含对两个Dataframe应用udf的结果

h43kikqp  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(171)

我正在尝试将另一列添加到一个大型数据集中,该数据集中存储了来自较小csv的最接近的poid。我调用两个函数:第一个用来计算任意两点之间的距离。这将返回最接近该位置的poi的id。
我的问题是,当调用withcolumn转换时,会出现如下酸洗错误:

_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o41.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist

示例数据集:
第一份文件:

+-------+--------------------+-------+--------+------------+--------+---------+----------+
|    _ID|              TimeSt|Country|Province|        City|Latitude|Longitude|ClosestPOI|
+-------+--------------------+-------+--------+------------+--------+---------+----------+
|4516516|2017-06-21 00:00:...|     CA|      ON|    Waterloo|43.49347|-80.49123|POID      |
|4516547|2017-06-21 18:00:...|     CA|      ON|      London|42.93990|-81.27090|POID      |
|4516550|2017-06-21 15:00:...|     CA|      ON|      Guelph|43.57760|-80.22010|POID      |
|4516600|2017-06-21 15:00:...|     CA|      ON|   Stratford|43.37160|-80.97730|POID      |
|4516613|2017-06-21 15:00:...|     CA|      ON|   Stratford|43.37160|-80.97730|POID      |

整个第二个文件:

+-----+----------+------------+
|POIID|  Latitude|   Longitude|
+-----+----------+------------+
| POI1| 53.546167| -113.485734|
| POI2| 53.546167| -113.485734|
| POI3| 45.521629|  -73.566024|
| POI4| 45.224830|  -63.232729|
+-----+----------+------------+

下面是我使用的代码:

spark = SparkSession.builder.appName("EQProject").getOrCreate()
def haversine(lat1,lon1,lat2,lon2):
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r
def ClosestPoi(rlat, rlon): 
    plat = float(dfPoi.collect()[1][1])
    plon = float(dfPoi.collect()[1][2])    
    mindist = haversine(rlat,rlon,poilat,poilon)
    for p in dfPoi.collect():
        plon = float(p.Longitude)
        plat = float(p.Latitude)
        rlon = float(rlon)
        rlat = float(rlat)
        rdist = haversine(rlat, rlon, plat, plon)
        if rdist <= mindist:
            mindist = rdist
            cpoi = p.PoiID
    return cpoi
ClosestPoint = udf(ClosestPoi)
df = spark.read.csv("DataSample.csv", header=True)
dfPoi = spark.read.csv("POIList.csv", header=True)
df = df.toDF(*['ID', 'Time','Country','Province','City','Latitude','Longitude'])

dfPoi = dfPoi.toDF(*['PoiID','Latitude','Longitude'])
df = df.dropDuplicates()
df2 = df.withColumn('CPoi' , ClosestPoint(df.Latitude, df.Longitude))
spark.stop()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题