如何使用PySpark在数据块中执行大型的框架查找

dm7nw8vv  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(145)

我们有一个从大型国家邮政地址数据库中加载的地址框架。该地址框架包含大约3000万行,每行是地理位置,建筑物,家庭,分机......的地址。大约有5到8个潜在的列可用于唯一标识特定地址。这些列包括BuildingNumber,BuildingName,StreetName,TownName,CityName,PostCode,此外,还有几个可供选择的列,如SiteName,FieldName,SubBuildingName用于BuildingName; Thoroughfare,StreetDescription用于StreetName; PostTown,Locality,AdministrativeArea用于TownName,PostCodeIn,PostCodeOut用于PostCode,ext.
我们将包含300到800个站点/建筑物数据的每日数据集摄取到一个网络框架中,然后我们需要尝试与以前的国家地址网络框架进行匹配。这个每日网络框架主要是半结构化的,具有基本的地址列,如BuildingNumber,BuildingName,Street,Town,PostCode,ext...
我必须构建一个PySpark数据管道来尝试获得尽可能接近的匹配,然后可以通过手动干预或其他子流程进行进一步分析。
目前,我一直在尝试使用一个复杂的连接操作来实现这一点,它看起来像下面这样:

sitedata_df.alias(“s”).join(nationaladdress_df.alias(“n”),
    ( (
        (col(“s.BuildingNumber”) == col(“n.BuildingNumber”)) |
        (col(“s.BuildingNumber”).isNull())
    ) &
    (
        (col(“s.BuildingName”) == col(“n.BuildingName”)) |
        (col(“s.BuildingName”) == col(“n.SiteName”)) |
        (col(“s.BuildingName”) == col(“n.FieldName”)) |
        (col(“s.BuildingName”) == col(“n.SubBuildingName”)) |
        (col(“s.BuildingName”).isNull())
    ) &
    (
        (col(“s.Street”) == col(“n.StreetName”)) |
        (col(“s.Street”) == col(“n.Thoroughfare”)) |
        (col(“s.Street”) == col(“n.StreetDescription”)) |
        (col(“s.Street”).isNull())
    ) &
    (
        (col(“s.Town”) == col(“n.TownName”)) |
        (col(“s.Town”) == col(“n.PostTown”)) |
        (col(“s.Town”) == col(“n.Locality”)) |
        (col(“s.Town”) == col(“n.AdministrativeArea”)) |
        (col(“s.Town”).isNull())
    ) &
    (
        (col(“s.PostCode”) == col(“n.PostCode”)) |
        (col(“s.PostCode”) == col(“n.PostCodeIn”)) |
        (col(“s.PostCode”) == col(“n.PostCodeOut”)) |
        (col(“s.PostCode”).isNull())
    )), “inner” )

字符串
请注意,为了简洁起见,我已经从上面的查询中删除了大量的其他可选OR条件,我需要在实际查询中包含这些条件。我已经在sitedata_df框架中使用大约50行的有限数据集测试了这个查询,尽管查询确实产生了一个合理的结果集,但我觉得它不会也不会在800行的数据量下表现良好。
我的问题是:

  • 是否有更好的方法来执行查找一个大的嵌套框架,而不是使用连接操作?
  • 有没有更好的方法来合并的逻辑条件,而不是使用或?
  • 我在一些博客文章中读到一个建议,使用像ElasticSearch与数据砖这样的东西来进行这些类型的查找。这或类似的技术是一个更可行的解决方案吗?

我知道这是一个很长的问题,但我想确保我的查询被完全理解。如果有什么不清楚,请让我知道,我会提供所需的信息。
非常感谢提前。

ujv3wf0j

ujv3wf0j1#

**广播小 Dataframe :**如果每个工作节点的内存都能容纳下较小的 Dataframe ,则可以将其广播到所有节点,以避免不必要的 Shuffle 。使用pyspark.sql.functions模块中的广播功能。

from pyspark.sql.functions import broadcast
large_df = large_df.join(broadcast(small_df), "common_column")

字符串

**持久化或缓存 Dataframe :**如果您要对生成的 Dataframe 执行多个操作,请考虑将 Dataframe 持久化或缓存到内存或磁盘中,这样可以提高后续操作的性能。

large_df.persist()
small_df.persist()

result_df = large_df.join(small_df, on="common_column", how="inner")

large_df.unpersist()
small_df.unpersist()

相关问题