dataframe自连接条件检查

uxh89sit  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(388)
df1 = spark.createDataFrame([(1,[4,2]),(4,[3,2])], [ "col2","col4"])

     +----+------+
     |col2|  col4|
     +----+------+
     |  1 |[4, 2]|
     |   4|[3, 2]|
     +----+------+

   df = spark.createDataFrame([("a",1,10), ("a",2,20), ("a",3,30), 
   ("b",4,40),("b",5,40),("b",1,40)], ["col1", "col2", "col3"])

   +----+----+----+
   |col1|col2|col3|
   +----+----+----+
   |   a|   1|  10|
   |   a|   2|  20|
   |   a|   3|  30|
   |   b|   4|  40|
   |   b|   5|  40|
   |   b|   1|  40|
    +----+----+----+

基于col2连接df和df1,如果匹配,则通过col1检查col4是否在col2组中。我期待的输出,有人能告诉我如何自我加入pyspark(检查col4 isin col2 group by col1)。
预期产量

col1   col2   col3

   a      1     10
8i9zcol2

8i9zcol21#

您需要在这里使用数组\u contains,它根据匹配条件返回true或false

from pyspark.sql import functions as F

df = df.join(df1, "col2", "left")

df = df.withColumn("is_available", (F.expr('array_contains(col4, col2)')))
df = df.filter(F.col("is_available") == True) # In case you need only matched cases
df.show()
+----+----+----+---------+------------+
|col2|col1|col3|     col4|is_available|
+----+----+----+---------+------------+
|   1|   a|  10|[4, 2, 1]|        true|
|   1|   b|  40|[4, 2, 1]|        true|
+----+----+----+---------+------------+

------你问题中的观察---------
基于给定的数据-一旦我们执行连接,它将不会给出预期的结果-

from pyspark.sql import functions as F

df = df.join(df1, "col2", "left")
df.show()
+----+----+----+------+
|col2|col1|col3|  col4|
+----+----+----+------+
|   5|   b|  40|  null|
|   1|   a|  10|[4, 2]|
|   1|   b|  40|[4, 2]|
|   3|   a|  30|  null|
|   2|   a|  20|  null|
|   4|   b|  40|[3, 2]|
+----+----+----+------+

现在,如果您查看col2和col4中的值,您将看到post join在[4,2]中没有可用的1。因此,在创建

df1 = spark.createDataFrame([(1,[4,2,**1**]),(4,[3,2])], [ "col2","col4"])
0ve6wy6x

0ve6wy6x2#

val df1 = Seq((1,List(4,2)),(4,List(3,2))).toDF("col2","col4")
  val df = Seq(("a",1,10), ("a",2,20), ("a",3,30),
    ("b",4,40),("b",5,40),("b",1,40)).toDF("col1", "col2", "col3")

  val res1DF = df1.join(df, df1.col("col2") === df.col("col2"), "inner")
    .select(
      df.col("col1"),
      df.col("col2"),
      df.col("col3")
    )

  res1DF.show(false)
  //  +----+----+----+
  //  |col1|col2|col3|
  //  +----+----+----+
  //  |a   |1   |10  |
  //  |b   |4   |40  |
  //  |b   |1   |40  |
  //  +----+----+----+

  val df11 = df1.withColumn("col41", explode(col("col4")))

  val res2DF = res1DF.join(df11, df11.col("col41") === res1DF.col("col2"), "inner")
    .select(
      res1DF.col("col1"),
      res1DF.col("col2"),
      res1DF.col("col3")
    )
  res2DF.show(false)
  //  +----+----+----+
  //  |col1|col2|col3|
  //  +----+----+----+
  //  |b   |4   |40  |
  //  +----+----+----+

相关问题