hbase spark rdd json列

yrdbyhpb  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(415)

我正在使用nerdammer hbase spark connector,在rdd将两个hbase表转换为dataframe并运行sql将它们连接起来时,我正在阅读这两个hbase表。
其中一个表中的一列有json对象,我需要在最终结果中提取特定的json属性值,这是怎么可能的。如果我在ardd的d列中有json数据,比如[{“foo”:“bar”,“baz”:“qux”}],我需要创建新的rdd或df,它的值只在这个列中是“baz”,这样当我加入时,我只得到这个属性的值。

val ARDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](ATableName)
        .select("A","B","C","D","E").inColumnFamily("pCF")

        val BRDD = sc.hbaseTable[(Option[String],Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](BTableName)
        .select("A","B","C","D","E","F").inColumnFamily("tCF")

    val ADF = sqlContext.createDataFrame(ARDD).registerTempTable("aDF")
    val BDF = sqlContext.createDataFrame(BRDD).registerTempTable("bDF")

val resultset = sqlContext.sql("SELECT aDF._1, bDF._2, bDF._3, bDF._4, bDF._5, bDF._6, bDF._3, aDF._1, aDF._2, bDF._1 FROM aDF, bDFWHERE aDF._5 = bDF._7").collect()

val joinedResult = resultset.foreach(println)
  println("Count " + joinedResult)
c8ib6hqw

c8ib6hqw1#

创建了一个自定义项来实现这一点,并在我的df中创建了一个新列,其中包含已解析的信息

import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((t: String) => {
   compact((parse(t.toString,true) \ "myField"))})

val abc=  myDF.withColumn("_p", udfScoreToCategory(myDF("_4"))).registerTempTable("odDF")

相关问题