lookup()函数

w6mmgewl  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(439)

我有两个数据集,一个存储在hive中(较小的一个用作查找表),另一个来自spark流。
现在我的要求是对这两个数据集执行一些操作。
例如:
数据集1:(存储在配置单元中)

id     name  
101    steve
102    david

数据集2:(来自spark流媒体)

id   deprt   address

101   E01    NewYork
102   E02    London

每当我得到 101 E01 NewYork 通过流式处理,我希望合并两个数据集并返回如下结果:

id  name  dept  address
101 steve E01  NewYork

前面,我已经用广播变量完成了这种类型的任务,我正试图通过使用配置单元查找表来实现这一点。
有谁能建议我怎么做吗?
提前谢谢。

wxclj1h5

wxclj1h51#

假设您的配置单元查找表很小,因为您已经将它用作广播变量,您可以通过读取配置单元表来创建dataframe,您可以使用它执行查找。

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val lookupDF = hiveContext.sql("select * from your_hive_table").cache() //cache lookup data

ds.transform {
    rdd => 
        val df = rdd.toDF("c1","c2","c3")
        df.join(lookupDF, lookupDF("col") === df("col")).select("co1","col2","col3").rdd
}

相关问题