scala 在钥匙上连接Spark Dataframe

cl25kdpy  于 2022-12-23  发布在  Scala
关注(0)|答案(8)|浏览(152)

我已经构建了两个 Dataframe ,如何连接多个Spark Dataframe ?
例如:
PersonDfProfileDf,公共列为personId as(key)。现在,我们如何将PersonDfProfileDf组合在一起呢?

5gfr0r5j

5gfr0r5j1#

使用scala的别名方法(this is example given for older version of spark for spark 2.x see my other answer):

您可以使用案例类准备示例数据集...这是可选的,例如:你也可以从hiveContext.sql得到DataFrame

import org.apache.spark.sql.functions.col

case class Person(name: String, age: Int, personid : Int)

case class Profile(name: String, personid  : Int , profileDescription: String)

    val df1 = sqlContext.createDataFrame(
   Person("Bindu",20,  2) 
:: Person("Raphel",25, 5) 
:: Person("Ram",40, 9):: Nil)

val df2 = sqlContext.createDataFrame(
Profile("Spark",2,  "SparkSQLMaster") 
:: Profile("Spark",5, "SparkGuru") 
:: Profile("Spark",9, "DevHunter"):: Nil
)

// you can do alias to refer column name with aliases to  increase readablity

val df_asPerson = df1.as("dfperson")
val df_asProfile = df2.as("dfprofile")

val joined_df = df_asPerson.join(
    df_asProfile
, col("dfperson.personid") === col("dfprofile.personid")
, "inner")

joined_df.select(
  col("dfperson.name")
, col("dfperson.age")
, col("dfprofile.name")
, col("dfprofile.profileDescription"))
.show

我个人不喜欢的示例临时表方法...
The reason to use the registerTempTable( tableName ) method for a DataFrame, is so that in addition to being able to use the Spark-provided methods of a DataFrame, you can also issue SQL queries via the sqlContext.sql( sqlQuery ) method, that use that DataFrame as an SQL table. The tableName parameter specifies the table name to use for that DataFrame in the SQL queries.

df_asPerson.registerTempTable("dfperson");
df_asProfile.registerTempTable("dfprofile")

sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription
                  FROM  dfperson JOIN  dfprofile
                  ON dfperson.personid == dfprofile.personid""")

如果你想知道更多关于连接的信息,请看这个漂亮的帖子:beyond-traditional-join-with-apache-spark

注:1)如**@RaphaelRoth**所述,
val resultDf = PersonDf.join(ProfileDf,Seq("personId"))是一种很好的方法,因为如果您对同一个表使用内部连接,它不会有来自两端的重复列。
2)在另一个答案中更新了Spark 2.x示例,其中包含spark 2.x支持的完整连接操作集(示例+结果)

提示:

此外,连接中的重要事项:broadcast function can help to give hint please see my answer

t3irkdon

t3irkdon2#

您可以使用

val resultDf = PersonDf.join(ProfileDf, PersonDf("personId") === ProfileDf("personId"))

或更短、更灵活(因为您可以轻松地指定多个要连接的列)

val resultDf = PersonDf.join(ProfileDf,Seq("personId"))
r6l8ljro

r6l8ljro3#

除了上面的答案,我还尝试使用spark 2.x here is my linked in article with full examples and explanation演示了所有带有相同case类的spark连接。

***所有联接类型:***默认值inner。必须是以下值之一:一米一米一英寸、一米二米一英寸、一米三米一英寸、一米四米一英寸、一米五米一英寸、一米六米一英寸、一米七米一英寸、一米八米一英寸、一米九米一英寸、一米十米一英寸、一米十一米一英寸。

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

 /**
  * @author : Ram Ghadiyaram
  */
object SparkJoinTypesDemo extends App {
  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  case class Person(name: String, age: Int, personid: Int)
  case class Profile(profileName: String, personid: Int, profileDescription: String)
  /**
    * * @param joinType Type of join to perform. Default `inner`. Must be one of:
    * *                 `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,
    * *                 `right`, `right_outer`, `left_semi`, `left_anti`.
    */
  val joinTypes = Seq(
    "inner"
    , "outer"
    , "full"
    , "full_outer"
    , "left"
    , "left_outer"
    , "right"
    , "right_outer"
    , "left_semi"
    , "left_anti"
    //, "cross"
  )
  val df1 = spark.sqlContext.createDataFrame(
    Person("Nataraj", 45, 2)
      :: Person("Srinivas", 45, 5)
      :: Person("Ashik", 22, 9)
      :: Person("Deekshita", 22, 8)
      :: Person("Siddhika", 22, 4)
      :: Person("Madhu", 22, 3)
      :: Person("Meghna", 22, 2)
      :: Person("Snigdha", 22, 2)
      :: Person("Harshita", 22, 6)
      :: Person("Ravi", 42, 0)
      :: Person("Ram", 42, 9)
      :: Person("Chidananda Raju", 35, 9)
      :: Person("Sreekanth Doddy", 29, 9)
      :: Nil)
  val df2 = spark.sqlContext.createDataFrame(
    Profile("Spark", 2, "SparkSQLMaster")
      :: Profile("Spark", 5, "SparkGuru")
      :: Profile("Spark", 9, "DevHunter")
      :: Profile("Spark", 3, "Evangelist")
      :: Profile("Spark", 0, "Committer")
      :: Profile("Spark", 1, "All Rounder")
      :: Nil
  )
  val df_asPerson = df1.as("dfperson")
  val df_asProfile = df2.as("dfprofile")
  val joined_df = df_asPerson.join(
    df_asProfile
    , col("dfperson.personid") === col("dfprofile.personid")
    , "inner")

  println("First example inner join  ")

  // you can do alias to refer column name with aliases to  increase readability
  joined_df.select(
    col("dfperson.name")
    , col("dfperson.age")
    , col("dfprofile.profileName")
    , col("dfprofile.profileDescription"))
    .show
  println("all joins in a loop")
  joinTypes foreach { joinType =>
    println(s"${joinType.toUpperCase()} JOIN")
    df_asPerson.join(right = df_asProfile, usingColumns = Seq("personid"), joinType = joinType)
      .orderBy("personid")
      .show()
  }
  println(
    """
      |Till 1.x  cross join is :  df_asPerson.join(df_asProfile)
      |
      | Explicit Cross Join in 2.x :
      | http://blog.madhukaraphatak.com/migrating-to-spark-two-part-4/
      | Cartesian joins are very expensive without an extra filter that can be pushed down.
      |
      | cross join or cartesian product
      |
      |
    """.stripMargin)

  val crossJoinDf = df_asPerson.crossJoin(right = df_asProfile)
  crossJoinDf.show(200, false)
  println(crossJoinDf.explain())
  println(crossJoinDf.count)

  println("createOrReplaceTempView example ")
  println(
    """
      |Creates a local temporary view using the given name. The lifetime of this
      |   temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
    """.stripMargin)



  df_asPerson.createOrReplaceTempView("dfperson");
  df_asProfile.createOrReplaceTempView("dfprofile")
  val sql =
    s"""
       |SELECT dfperson.name
       |, dfperson.age
       |, dfprofile.profileDescription
       |  FROM  dfperson JOIN  dfprofile
       | ON dfperson.personid == dfprofile.personid
    """.stripMargin
  println(s"createOrReplaceTempView  sql $sql")
  val sqldf = spark.sql(sql)
  sqldf.show

  println(
    """
      |
      |**** EXCEPT DEMO ***
      |
  """.stripMargin)
  println(" df_asPerson.except(df_asProfile) Except demo")
  df_asPerson.except(df_asProfile).show

  println(" df_asProfile.except(df_asPerson) Except demo")
  df_asProfile.except(df_asPerson).show
}

结果:

First example inner join  
+---------------+---+-----------+------------------+
|           name|age|profileName|profileDescription|
+---------------+---+-----------+------------------+
|        Nataraj| 45|      Spark|    SparkSQLMaster|
|       Srinivas| 45|      Spark|         SparkGuru|
|          Ashik| 22|      Spark|         DevHunter|
|          Madhu| 22|      Spark|        Evangelist|
|         Meghna| 22|      Spark|    SparkSQLMaster|
|        Snigdha| 22|      Spark|    SparkSQLMaster|
|           Ravi| 42|      Spark|         Committer|
|            Ram| 42|      Spark|         DevHunter|
|Chidananda Raju| 35|      Spark|         DevHunter|
|Sreekanth Doddy| 29|      Spark|         DevHunter|
+---------------+---+-----------+------------------+

all joins in a loop
INNER JOIN
+--------+---------------+---+-----------+------------------+
|personid|           name|age|profileName|profileDescription|
+--------+---------------+---+-----------+------------------+
|       0|           Ravi| 42|      Spark|         Committer|
|       2|        Snigdha| 22|      Spark|    SparkSQLMaster|
|       2|         Meghna| 22|      Spark|    SparkSQLMaster|
|       2|        Nataraj| 45|      Spark|    SparkSQLMaster|
|       3|          Madhu| 22|      Spark|        Evangelist|
|       5|       Srinivas| 45|      Spark|         SparkGuru|
|       9|            Ram| 42|      Spark|         DevHunter|
|       9|          Ashik| 22|      Spark|         DevHunter|
|       9|Chidananda Raju| 35|      Spark|         DevHunter|
|       9|Sreekanth Doddy| 29|      Spark|         DevHunter|
+--------+---------------+---+-----------+------------------+

OUTER JOIN
+--------+---------------+----+-----------+------------------+
|personid|           name| age|profileName|profileDescription|
+--------+---------------+----+-----------+------------------+
|       0|           Ravi|  42|      Spark|         Committer|
|       1|           null|null|      Spark|       All Rounder|
|       2|        Nataraj|  45|      Spark|    SparkSQLMaster|
|       2|        Snigdha|  22|      Spark|    SparkSQLMaster|
|       2|         Meghna|  22|      Spark|    SparkSQLMaster|
|       3|          Madhu|  22|      Spark|        Evangelist|
|       4|       Siddhika|  22|       null|              null|
|       5|       Srinivas|  45|      Spark|         SparkGuru|
|       6|       Harshita|  22|       null|              null|
|       8|      Deekshita|  22|       null|              null|
|       9|          Ashik|  22|      Spark|         DevHunter|
|       9|            Ram|  42|      Spark|         DevHunter|
|       9|Chidananda Raju|  35|      Spark|         DevHunter|
|       9|Sreekanth Doddy|  29|      Spark|         DevHunter|
+--------+---------------+----+-----------+------------------+

FULL JOIN
+--------+---------------+----+-----------+------------------+
|personid|           name| age|profileName|profileDescription|
+--------+---------------+----+-----------+------------------+
|       0|           Ravi|  42|      Spark|         Committer|
|       1|           null|null|      Spark|       All Rounder|
|       2|        Nataraj|  45|      Spark|    SparkSQLMaster|
|       2|         Meghna|  22|      Spark|    SparkSQLMaster|
|       2|        Snigdha|  22|      Spark|    SparkSQLMaster|
|       3|          Madhu|  22|      Spark|        Evangelist|
|       4|       Siddhika|  22|       null|              null|
|       5|       Srinivas|  45|      Spark|         SparkGuru|
|       6|       Harshita|  22|       null|              null|
|       8|      Deekshita|  22|       null|              null|
|       9|          Ashik|  22|      Spark|         DevHunter|
|       9|            Ram|  42|      Spark|         DevHunter|
|       9|Sreekanth Doddy|  29|      Spark|         DevHunter|
|       9|Chidananda Raju|  35|      Spark|         DevHunter|
+--------+---------------+----+-----------+------------------+

FULL_OUTER JOIN
+--------+---------------+----+-----------+------------------+
|personid|           name| age|profileName|profileDescription|
+--------+---------------+----+-----------+------------------+
|       0|           Ravi|  42|      Spark|         Committer|
|       1|           null|null|      Spark|       All Rounder|
|       2|        Nataraj|  45|      Spark|    SparkSQLMaster|
|       2|         Meghna|  22|      Spark|    SparkSQLMaster|
|       2|        Snigdha|  22|      Spark|    SparkSQLMaster|
|       3|          Madhu|  22|      Spark|        Evangelist|
|       4|       Siddhika|  22|       null|              null|
|       5|       Srinivas|  45|      Spark|         SparkGuru|
|       6|       Harshita|  22|       null|              null|
|       8|      Deekshita|  22|       null|              null|
|       9|          Ashik|  22|      Spark|         DevHunter|
|       9|            Ram|  42|      Spark|         DevHunter|
|       9|Chidananda Raju|  35|      Spark|         DevHunter|
|       9|Sreekanth Doddy|  29|      Spark|         DevHunter|
+--------+---------------+----+-----------+------------------+

LEFT JOIN
+--------+---------------+---+-----------+------------------+
|personid|           name|age|profileName|profileDescription|
+--------+---------------+---+-----------+------------------+
|       0|           Ravi| 42|      Spark|         Committer|
|       2|        Snigdha| 22|      Spark|    SparkSQLMaster|
|       2|         Meghna| 22|      Spark|    SparkSQLMaster|
|       2|        Nataraj| 45|      Spark|    SparkSQLMaster|
|       3|          Madhu| 22|      Spark|        Evangelist|
|       4|       Siddhika| 22|       null|              null|
|       5|       Srinivas| 45|      Spark|         SparkGuru|
|       6|       Harshita| 22|       null|              null|
|       8|      Deekshita| 22|       null|              null|
|       9|            Ram| 42|      Spark|         DevHunter|
|       9|          Ashik| 22|      Spark|         DevHunter|
|       9|Chidananda Raju| 35|      Spark|         DevHunter|
|       9|Sreekanth Doddy| 29|      Spark|         DevHunter|
+--------+---------------+---+-----------+------------------+

LEFT_OUTER JOIN
+--------+---------------+---+-----------+------------------+
|personid|           name|age|profileName|profileDescription|
+--------+---------------+---+-----------+------------------+
|       0|           Ravi| 42|      Spark|         Committer|
|       2|        Nataraj| 45|      Spark|    SparkSQLMaster|
|       2|         Meghna| 22|      Spark|    SparkSQLMaster|
|       2|        Snigdha| 22|      Spark|    SparkSQLMaster|
|       3|          Madhu| 22|      Spark|        Evangelist|
|       4|       Siddhika| 22|       null|              null|
|       5|       Srinivas| 45|      Spark|         SparkGuru|
|       6|       Harshita| 22|       null|              null|
|       8|      Deekshita| 22|       null|              null|
|       9|Chidananda Raju| 35|      Spark|         DevHunter|
|       9|Sreekanth Doddy| 29|      Spark|         DevHunter|
|       9|          Ashik| 22|      Spark|         DevHunter|
|       9|            Ram| 42|      Spark|         DevHunter|
+--------+---------------+---+-----------+------------------+

RIGHT JOIN
+--------+---------------+----+-----------+------------------+
|personid|           name| age|profileName|profileDescription|
+--------+---------------+----+-----------+------------------+
|       0|           Ravi|  42|      Spark|         Committer|
|       1|           null|null|      Spark|       All Rounder|
|       2|        Snigdha|  22|      Spark|    SparkSQLMaster|
|       2|         Meghna|  22|      Spark|    SparkSQLMaster|
|       2|        Nataraj|  45|      Spark|    SparkSQLMaster|
|       3|          Madhu|  22|      Spark|        Evangelist|
|       5|       Srinivas|  45|      Spark|         SparkGuru|
|       9|Sreekanth Doddy|  29|      Spark|         DevHunter|
|       9|Chidananda Raju|  35|      Spark|         DevHunter|
|       9|            Ram|  42|      Spark|         DevHunter|
|       9|          Ashik|  22|      Spark|         DevHunter|
+--------+---------------+----+-----------+------------------+

RIGHT_OUTER JOIN
+--------+---------------+----+-----------+------------------+
|personid|           name| age|profileName|profileDescription|
+--------+---------------+----+-----------+------------------+
|       0|           Ravi|  42|      Spark|         Committer|
|       1|           null|null|      Spark|       All Rounder|
|       2|         Meghna|  22|      Spark|    SparkSQLMaster|
|       2|        Snigdha|  22|      Spark|    SparkSQLMaster|
|       2|        Nataraj|  45|      Spark|    SparkSQLMaster|
|       3|          Madhu|  22|      Spark|        Evangelist|
|       5|       Srinivas|  45|      Spark|         SparkGuru|
|       9|Sreekanth Doddy|  29|      Spark|         DevHunter|
|       9|          Ashik|  22|      Spark|         DevHunter|
|       9|Chidananda Raju|  35|      Spark|         DevHunter|
|       9|            Ram|  42|      Spark|         DevHunter|
+--------+---------------+----+-----------+------------------+

LEFT_SEMI JOIN
+--------+---------------+---+
|personid|           name|age|
+--------+---------------+---+
|       0|           Ravi| 42|
|       2|        Nataraj| 45|
|       2|         Meghna| 22|
|       2|        Snigdha| 22|
|       3|          Madhu| 22|
|       5|       Srinivas| 45|
|       9|Chidananda Raju| 35|
|       9|Sreekanth Doddy| 29|
|       9|            Ram| 42|
|       9|          Ashik| 22|
+--------+---------------+---+

LEFT_ANTI JOIN
+--------+---------+---+
|personid|     name|age|
+--------+---------+---+
|       4| Siddhika| 22|
|       6| Harshita| 22|
|       8|Deekshita| 22|
+--------+---------+---+

Till 1.x  Cross join is :  `df_asPerson.join(df_asProfile)`

 Explicit Cross Join in 2.x :
 http://blog.madhukaraphatak.com/migrating-to-spark-two-part-4/
 Cartesian joins are very expensive without an extra filter that can be pushed down.

 Cross join or Cartesian product


+---------------+---+--------+-----------+--------+------------------+
|name           |age|personid|profileName|personid|profileDescription|
+---------------+---+--------+-----------+--------+------------------+
|Nataraj        |45 |2       |Spark      |2       |SparkSQLMaster    |
|Nataraj        |45 |2       |Spark      |5       |SparkGuru         |
|Nataraj        |45 |2       |Spark      |9       |DevHunter         |
|Nataraj        |45 |2       |Spark      |3       |Evangelist        |
|Nataraj        |45 |2       |Spark      |0       |Committer         |
|Nataraj        |45 |2       |Spark      |1       |All Rounder       |
|Srinivas       |45 |5       |Spark      |2       |SparkSQLMaster    |
|Srinivas       |45 |5       |Spark      |5       |SparkGuru         |
|Srinivas       |45 |5       |Spark      |9       |DevHunter         |
|Srinivas       |45 |5       |Spark      |3       |Evangelist        |
|Srinivas       |45 |5       |Spark      |0       |Committer         |
|Srinivas       |45 |5       |Spark      |1       |All Rounder       |
|Ashik          |22 |9       |Spark      |2       |SparkSQLMaster    |
|Ashik          |22 |9       |Spark      |5       |SparkGuru         |
|Ashik          |22 |9       |Spark      |9       |DevHunter         |
|Ashik          |22 |9       |Spark      |3       |Evangelist        |
|Ashik          |22 |9       |Spark      |0       |Committer         |
|Ashik          |22 |9       |Spark      |1       |All Rounder       |
|Deekshita      |22 |8       |Spark      |2       |SparkSQLMaster    |
|Deekshita      |22 |8       |Spark      |5       |SparkGuru         |
|Deekshita      |22 |8       |Spark      |9       |DevHunter         |
|Deekshita      |22 |8       |Spark      |3       |Evangelist        |
|Deekshita      |22 |8       |Spark      |0       |Committer         |
|Deekshita      |22 |8       |Spark      |1       |All Rounder       |
|Siddhika       |22 |4       |Spark      |2       |SparkSQLMaster    |
|Siddhika       |22 |4       |Spark      |5       |SparkGuru         |
|Siddhika       |22 |4       |Spark      |9       |DevHunter         |
|Siddhika       |22 |4       |Spark      |3       |Evangelist        |
|Siddhika       |22 |4       |Spark      |0       |Committer         |
|Siddhika       |22 |4       |Spark      |1       |All Rounder       |
|Madhu          |22 |3       |Spark      |2       |SparkSQLMaster    |
|Madhu          |22 |3       |Spark      |5       |SparkGuru         |
|Madhu          |22 |3       |Spark      |9       |DevHunter         |
|Madhu          |22 |3       |Spark      |3       |Evangelist        |
|Madhu          |22 |3       |Spark      |0       |Committer         |
|Madhu          |22 |3       |Spark      |1       |All Rounder       |
|Meghna         |22 |2       |Spark      |2       |SparkSQLMaster    |
|Meghna         |22 |2       |Spark      |5       |SparkGuru         |
|Meghna         |22 |2       |Spark      |9       |DevHunter         |
|Meghna         |22 |2       |Spark      |3       |Evangelist        |
|Meghna         |22 |2       |Spark      |0       |Committer         |
|Meghna         |22 |2       |Spark      |1       |All Rounder       |
|Snigdha        |22 |2       |Spark      |2       |SparkSQLMaster    |
|Snigdha        |22 |2       |Spark      |5       |SparkGuru         |
|Snigdha        |22 |2       |Spark      |9       |DevHunter         |
|Snigdha        |22 |2       |Spark      |3       |Evangelist        |
|Snigdha        |22 |2       |Spark      |0       |Committer         |
|Snigdha        |22 |2       |Spark      |1       |All Rounder       |
|Harshita       |22 |6       |Spark      |2       |SparkSQLMaster    |
|Harshita       |22 |6       |Spark      |5       |SparkGuru         |
|Harshita       |22 |6       |Spark      |9       |DevHunter         |
|Harshita       |22 |6       |Spark      |3       |Evangelist        |
|Harshita       |22 |6       |Spark      |0       |Committer         |
|Harshita       |22 |6       |Spark      |1       |All Rounder       |
|Ravi           |42 |0       |Spark      |2       |SparkSQLMaster    |
|Ravi           |42 |0       |Spark      |5       |SparkGuru         |
|Ravi           |42 |0       |Spark      |9       |DevHunter         |
|Ravi           |42 |0       |Spark      |3       |Evangelist        |
|Ravi           |42 |0       |Spark      |0       |Committer         |
|Ravi           |42 |0       |Spark      |1       |All Rounder       |
|Ram            |42 |9       |Spark      |2       |SparkSQLMaster    |
|Ram            |42 |9       |Spark      |5       |SparkGuru         |
|Ram            |42 |9       |Spark      |9       |DevHunter         |
|Ram            |42 |9       |Spark      |3       |Evangelist        |
|Ram            |42 |9       |Spark      |0       |Committer         |
|Ram            |42 |9       |Spark      |1       |All Rounder       |
|Chidananda Raju|35 |9       |Spark      |2       |SparkSQLMaster    |
|Chidananda Raju|35 |9       |Spark      |5       |SparkGuru         |
|Chidananda Raju|35 |9       |Spark      |9       |DevHunter         |
|Chidananda Raju|35 |9       |Spark      |3       |Evangelist        |
|Chidananda Raju|35 |9       |Spark      |0       |Committer         |
|Chidananda Raju|35 |9       |Spark      |1       |All Rounder       |
|Sreekanth Doddy|29 |9       |Spark      |2       |SparkSQLMaster    |
|Sreekanth Doddy|29 |9       |Spark      |5       |SparkGuru         |
|Sreekanth Doddy|29 |9       |Spark      |9       |DevHunter         |
|Sreekanth Doddy|29 |9       |Spark      |3       |Evangelist        |
|Sreekanth Doddy|29 |9       |Spark      |0       |Committer         |
|Sreekanth Doddy|29 |9       |Spark      |1       |All Rounder       |
+---------------+---+--------+-----------+--------+------------------+

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Cross
:- LocalTableScan [name#0, age#1, personid#2]
+- BroadcastExchange IdentityBroadcastMode
   +- LocalTableScan [profileName#7, personid#8, profileDescription#9]
()
78
createOrReplaceTempView example 

Creates a local temporary view using the given name. The lifetime of this
   temporary view is tied to the [[SparkSession]] that was used to create this Dataset.

createOrReplaceTempView  sql 
SELECT dfperson.name
, dfperson.age
, dfprofile.profileDescription
  FROM  dfperson JOIN  dfprofile
 ON dfperson.personid == dfprofile.personid

+---------------+---+------------------+
|           name|age|profileDescription|
+---------------+---+------------------+
|        Nataraj| 45|    SparkSQLMaster|
|       Srinivas| 45|         SparkGuru|
|          Ashik| 22|         DevHunter|
|          Madhu| 22|        Evangelist|
|         Meghna| 22|    SparkSQLMaster|
|        Snigdha| 22|    SparkSQLMaster|
|           Ravi| 42|         Committer|
|            Ram| 42|         DevHunter|
|Chidananda Raju| 35|         DevHunter|
|Sreekanth Doddy| 29|         DevHunter|
+---------------+---+------------------+


**** EXCEPT DEMO ***

 df_asPerson.except(df_asProfile) Except demo
+---------------+---+--------+
|           name|age|personid|
+---------------+---+--------+
|          Ashik| 22|       9|
|       Harshita| 22|       6|
|          Madhu| 22|       3|
|            Ram| 42|       9|
|           Ravi| 42|       0|
|Chidananda Raju| 35|       9|
|       Siddhika| 22|       4|
|       Srinivas| 45|       5|
|Sreekanth Doddy| 29|       9|
|      Deekshita| 22|       8|
|         Meghna| 22|       2|
|        Snigdha| 22|       2|
|        Nataraj| 45|       2|
+---------------+---+--------+

 df_asProfile.except(df_asPerson) Except demo
+-----------+--------+------------------+
|profileName|personid|profileDescription|
+-----------+--------+------------------+
|      Spark|       5|         SparkGuru|
|      Spark|       9|         DevHunter|
|      Spark|       2|    SparkSQLMaster|
|      Spark|       3|        Evangelist|
|      Spark|       0|         Committer|
|      Spark|       1|       All Rounder|
+-----------+--------+------------------+

如上所述,这些是所有连接的维恩图。

piv4azn7

piv4azn74#

单程

// join type can be inner, left, right, fullouter
val mergedDf = df1.join(df2, Seq("keyCol"), "inner")
// keyCol can be multiple column names seperated by comma
val mergedDf = df1.join(df2, Seq("keyCol1", "keyCol2"), "left")

另一种方法

import spark.implicits._ 
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName"))
// to select specific columns as output
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName")).select($"d1.*", $"d2.anotherColName")
alen0pnh

alen0pnh5#

https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html中,使用join
使用给定列与另一个DataFrame进行内部等联接。

PersonDf.join(ProfileDf,$"personId")

PersonDf.join(ProfileDf,PersonDf("personId") === ProfileDf("personId"))

更新日期:

您还可以使用df.registerTempTable("tableName")DFs保存为临时表,并且可以使用sqlContext编写sql查询。

bejyjqdl

bejyjqdl6#

使用scala的内部连接

val joinedDataFrame = PersonDf.join(ProfileDf ,"personId")
joinedDataFrame.show
nvbavucw

nvbavucw7#

发布一个基于java的解决方案,如果你的团队只使用java。关键字inner将确保只有匹配的行出现在最终的 Dataframe 中。

Dataset<Row> joined = PersonDf.join(ProfileDf, 
                    PersonDf.col("personId").equalTo(ProfileDf.col("personId")),
                    "inner");
            joined.show();
2w3rbyxf

2w3rbyxf8#

让我举一个例子来说明
1.创建emp数据框

import spark.sqlContext.implicits._
    val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
        (2,"Rose",1,"2010","20","M",4000),
        (3,"Williams",1,"2010","10","M",1000),
        (4,"Jones",2,"2005","10","F",2000),
        (5,"Brown",2,"2010","40","",-1),
          (6,"Brown",2,"2010","50","",-1)
      )
    val empColumns = Seq("emp_id","name","superior_emp_id","year_joined",
           "emp_dept_id","gender","salary")
    
    val empDF = emp.toDF(empColumns:_*)

1.创建部门数据框

val dept = Seq(("Finance",10),
        ("Marketing",20),
        ("Sales",30),
        ("IT",40)
      )
    
    val deptColumns = Seq("dept_name","dept_id")
    val deptDF = dept.toDF(deptColumns:_*)

现在,让我们将emp.emp_dept_id与dept.dept_id连接起来

empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
        .show(false)

结果如下

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

如果您正在查找python PySpark Join中的示例,也可以在Spark Join中找到完整的Scala示例

相关问题