import org.apache.spark.sql.functions.col
case class Person(name: String, age: Int, personid : Int)
case class Profile(name: String, personid : Int , profileDescription: String)
val df1 = sqlContext.createDataFrame(
Person("Bindu",20, 2)
:: Person("Raphel",25, 5)
:: Person("Ram",40, 9):: Nil)
val df2 = sqlContext.createDataFrame(
Profile("Spark",2, "SparkSQLMaster")
:: Profile("Spark",5, "SparkGuru")
:: Profile("Spark",9, "DevHunter"):: Nil
)
// you can do alias to refer column name with aliases to increase readablity
val df_asPerson = df1.as("dfperson")
val df_asProfile = df2.as("dfprofile")
val joined_df = df_asPerson.join(
df_asProfile
, col("dfperson.personid") === col("dfprofile.personid")
, "inner")
joined_df.select(
col("dfperson.name")
, col("dfperson.age")
, col("dfprofile.name")
, col("dfprofile.profileDescription"))
.show
// join type can be inner, left, right, fullouter
val mergedDf = df1.join(df2, Seq("keyCol"), "inner")
// keyCol can be multiple column names seperated by comma
val mergedDf = df1.join(df2, Seq("keyCol1", "keyCol2"), "left")
另一种方法
import spark.implicits._
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName"))
// to select specific columns as output
val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName")).select($"d1.*", $"d2.anotherColName")
8条答案
按热度按时间5gfr0r5j1#
使用scala的别名方法(this is example given for older version of spark for spark 2.x see my other answer):
您可以使用案例类准备示例数据集...这是可选的,例如:你也可以从
hiveContext.sql
得到DataFrame
。我个人不喜欢的示例临时表方法...
The reason to use the
registerTempTable( tableName )
method for a DataFrame, is so that in addition to being able to use the Spark-provided methods of a DataFrame, you can also issue SQL queries via thesqlContext.sql( sqlQuery )
method, that use that DataFrame as an SQL table. The tableName parameter specifies the table name to use for that DataFrame in the SQL queries.如果你想知道更多关于连接的信息,请看这个漂亮的帖子:beyond-traditional-join-with-apache-spark
注:1)如**@RaphaelRoth**所述,
val resultDf = PersonDf.join(ProfileDf,Seq("personId"))
是一种很好的方法,因为如果您对同一个表使用内部连接,它不会有来自两端的重复列。2)在另一个答案中更新了Spark 2.x示例,其中包含spark 2.x支持的完整连接操作集(示例+结果)
提示:
此外,连接中的重要事项:broadcast function can help to give hint please see my answer
t3irkdon2#
您可以使用
或更短、更灵活(因为您可以轻松地指定多个要连接的列)
r6l8ljro3#
除了上面的答案,我还尝试使用spark 2.x here is my linked in article with full examples and explanation演示了所有带有相同case类的spark连接。
***所有联接类型:***默认值
inner
。必须是以下值之一:一米一米一英寸、一米二米一英寸、一米三米一英寸、一米四米一英寸、一米五米一英寸、一米六米一英寸、一米七米一英寸、一米八米一英寸、一米九米一英寸、一米十米一英寸、一米十一米一英寸。结果:
如上所述,这些是所有连接的维恩图。
piv4azn74#
单程
另一种方法
alen0pnh5#
在https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html中,使用
join
:使用给定列与另一个DataFrame进行内部等联接。
或
更新日期:
您还可以使用
df.registerTempTable("tableName")
将DFs
保存为临时表,并且可以使用sqlContext
编写sql查询。bejyjqdl6#
使用scala的内部连接
nvbavucw7#
发布一个基于java的解决方案,如果你的团队只使用java。关键字
inner
将确保只有匹配的行出现在最终的 Dataframe 中。2w3rbyxf8#
让我举一个例子来说明
1.创建emp数据框
1.创建部门数据框
现在,让我们将emp.emp_dept_id与dept.dept_id连接起来
结果如下
如果您正在查找python PySpark Join中的示例,也可以在Spark Join中找到完整的Scala示例