加入spark后有效计数记录

1cklez4t  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(377)

这就是我要做的。我需要获取一个数据集中存在的记录数,而不是另一个数据集中的记录数,然后再次与第三个数据集连接以获取其他一些列。

val tooCompare = dw
        .select(
          "loc",
          "id",
          "country",
          "region"
        ).dropDuplicates()

val previous = dw
        .select(
          "loc",
          "id",
          "country",
          "region"
        ).dropDuplicates()

val delta = tooCompare.exceptAll(previous).cache()

val records = delta
        .join(
          dw,//another dataset
          delta
            .col("loc").equalTo(dw.col("loc"))
            .and(delta.col("id").equalTo(dw.col("id")))
            .and(delta.col("country").equalTo(dw.col("country")))
            .and(delta.col("region").equalTo(dw.col("region")))
        )
        .drop(delta.col("loc"))
        .drop(delta.col("id"))
        .drop(delta.col("country"))
        .drop(delta.col("region"))
        .cache()
    }

 val recordsToSend = records.cache()
 val count = recordsToSend.select("loc").distinct().count()

有没有更有效的方法?我是新来的。我很确定我在这里遗漏了什么

6mw9ycah

6mw9ycah1#

我认为您粘贴的代码中可能存在一些错误,因为toocompare和previous是相同的,+第三个数据集联接引用了表上的deanon,但dw。。。。
对于这个示例答案,假设当前表名为“current”,previous名为“previous”,第三个表名为“extra”。然后:

val delta = current.join(
              previous, 
              Seq("loc","id","country","region"), 
              "leftanti"
            ).select("loc","id","country","region").distinct

val recordsToSend = delta
                    .join(
                      extra,
                      Seq("loc", "id", "country", "region")
                    )

val count = recordsToSend.select("loc").distinct().count()

这可能更有效,但我很感激你评论它是否真的是!
顺便说一句:请注意,我使用seq[string]作为连接参数(这要求两个表上的列名相同,并且不会产生两个列的副本)。但是,可以更简洁地编写原始连接逻辑,如下所示(使用我的命名约定):

val recordsToSend = delta
                    .join(
                      extra,
                      delta("loc") === extra("loc")
                        && delta("id") === extra("id")
                        && delta("country") === extra("country")
                        && delta("region") === extra("region")
                    )
                    .drop(delta("loc"))
                    .drop(delta("id"))
                    .drop(delta("country"))
                    .drop(delta("region"))

更好的办法是编写一个drop函数,允许您提供多个列,但我现在真的离题了;-)

ulmd4ohb

ulmd4ohb2#

我建议使用sql来提高可读性。
首先,创建有问题的Dataframe的临时视图。不知道你有什么样的Dataframe

dfToCompare.createOrReplaceTempView("toCompare")
previousDf.createOrReplaceTempView("previous")
anotherDataSet.createOrReplaceTempView("another")

然后可以在一条sql语句中继续执行所有操作

val records = spark.sql("""select loc, id, country,region
              from toCompare c
              inner join another a
               on a.loc = c.loc  
                and a.id = p.id
                and a.country = c.country
                and a.region = c.region
             where not exists (select null
                                from previous p
                                where p.loc = c.loc  
                                 and p.id = p.id
                                 and p.country = c.country
                                 and p.region = c.region""")

然后你可以像以前一样继续。。。

val recordsToSend = records.cache()
val count = recordsToSend.select("loc").distinct().count()

相关问题