spark:join condition 带数组(可为空的)

azpvetkf  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(379)

我有2个Dataframe,想加入他们,想过滤数据,我想过滤
orgtypetoexclude与每个transactionid匹配的数据。
在一个单词中,我的transactionid是join contiions,orgtypetoexclude是exclude condition,这里分享一个简单的例子

import org.apache.spark.sql.functions.expr
import spark.implicits._
val jsonstr ="""{

  "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  "Transactions": [
    {
      "TransactionId": "USAL",
      "OrgTypeToExclude": ["A","B"]
    },
    {
      "TransactionId": "USMD",
      "OrgTypeToExclude": ["E"]
    },
    {
      "TransactionId": "USGA",
      "OrgTypeToExclude": []
    }
    ]   
}"""
val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
val json = spark.read.json(Seq(jsonstr).toDS).select("Transactions.TransactionId","Transactions.OrgTypeToExclude")

df.printSchema()
json.printSchema()
df.join(json,$"code"<=> $"TransactionId".cast("string") && !exp("array_contains(OrgTypeToExclude, Alp)") ,"inner" ).show()

  --Expecting output
 id  Code    Alp 
 4   "USAL"  "C"
 2   "USMD"  "B"
 3   "USGA"  "C"

谢谢,马诺。

bpzcxfmw

bpzcxfmw1#

Transactions 是数组类型&您正在访问 TransactionId & OrgTypeToExclude 所以你会得到多个数组。
而不是你只是爆炸根水平 Transactions array&提取 OrgTypeToExclude & TransactionId 下一步很容易。
请检查下面的代码。

scala> val jsonstr ="""{
     |
     |   "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
     |   "Transactions": [
     |     {
     |       "TransactionId": "USAL",
     |       "OrgTypeToExclude": ["A","B"]
     |     },
     |     {
     |       "TransactionId": "USMD",
     |       "OrgTypeToExclude": ["E"]
     |     },
     |     {
     |       "TransactionId": "USGA",
     |       "OrgTypeToExclude": []
     |     }
     |     ]
     | }"""
jsonstr: String =
{

  "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  "Transactions": [
    {
      "TransactionId": "USAL",
      "OrgTypeToExclude": ["A","B"]
    },
    {
      "TransactionId": "USMD",
      "OrgTypeToExclude": ["E"]
    },
    {
      "TransactionId": "USGA",
      "OrgTypeToExclude": []
    }
    ]
}

scala> val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
df: org.apache.spark.sql.DataFrame = [id: int, code: string ... 1 more field]

scala> val json = spark.read.json(Seq(jsonstr).toDS).select(explode($"Transactions").as("Transactions")).select($"Transactions.*")
json: org.apache.spark.sql.DataFrame = [OrgTypeToExclude: array<string>, TransactionId: string]

scala> df.show(false)
+---+----+---+
|id |code|Alp|
+---+----+---+
|1  |USAL|A  |
|4  |USAL|C  |
|2  |USMD|B  |
|5  |USMD|E  |
|3  |USGA|C  |
+---+----+---+

scala> json.show(false)
+----------------+-------------+
|OrgTypeToExclude|TransactionId|
+----------------+-------------+
|[A, B]          |USAL         |
|[E]             |USMD         |
|[]              |USGA         |
+----------------+-------------+

scala> df.join(jsondf,(df("code") === jsondf("TransactionId") && !array_contains(jsondf("OrgTypeToExclude"),df("Alp"))),"inner").select("id","code","alp").show(false)
+---+----+---+
|id |code|alp|
+---+----+---+
|4  |USAL|C  |
|2  |USMD|B  |
|3  |USGA|C  |
+---+----+---+

scala>
ppcbkaq5

ppcbkaq52#

首先,您似乎忽略了一个事实,即事务也是一个数组,我们可以使用explode来处理它:

val json = spark.read.json(Seq(jsonstr).toDS)
  .select(explode($"Transactions").as("t")) // deal with Transactions array first
  .select($"t.TransactionId", $"t.OrgTypeToExclude")

另外,array_contains想要一个值而不是一列作为它的第二个参数。我不知道有哪个版本支持引用列,因此我们将生成一个自定义项:

val arr_con = udf { (a: Seq[String], v: String) => a.contains(v) }

然后我们可以这样修改连接条件:

df.join(json0, $"code" <=> $"TransactionId" && ! arr_con($"OrgTypeToExclude", $"Alp"), "inner").show()

预期结果:

scala> df.join(json, $"code" <=> $"TransactionId" && ! arr_con($"OrgTypeToExclude", $"Alp"), "inner").show()
+---+----+---+-------------+----------------+
| id|code|Alp|TransactionId|OrgTypeToExclude|
+---+----+---+-------------+----------------+
|  4|USAL|  C|         USAL|          [A, B]|
|  2|USMD|  B|         USMD|             [E]|
|  3|USGA|  C|         USGA|              []|
+---+----+---+-------------+----------------+

相关问题