我有2个Dataframe,想加入他们,想过滤数据,我想过滤
orgtypetoexclude与每个transactionid匹配的数据。
在一个单词中,我的transactionid是join contiions,orgtypetoexclude是exclude condition,这里分享一个简单的例子
import org.apache.spark.sql.functions.expr
import spark.implicits._
val jsonstr ="""{
"id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
"Transactions": [
{
"TransactionId": "USAL",
"OrgTypeToExclude": ["A","B"]
},
{
"TransactionId": "USMD",
"OrgTypeToExclude": ["E"]
},
{
"TransactionId": "USGA",
"OrgTypeToExclude": []
}
]
}"""
val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
val json = spark.read.json(Seq(jsonstr).toDS).select("Transactions.TransactionId","Transactions.OrgTypeToExclude")
df.printSchema()
json.printSchema()
df.join(json,$"code"<=> $"TransactionId".cast("string") && !exp("array_contains(OrgTypeToExclude, Alp)") ,"inner" ).show()
--Expecting output
id Code Alp
4 "USAL" "C"
2 "USMD" "B"
3 "USGA" "C"
谢谢,马诺。
2条答案
按热度按时间bpzcxfmw1#
Transactions
是数组类型&您正在访问TransactionId
&OrgTypeToExclude
所以你会得到多个数组。而不是你只是爆炸根水平
Transactions
array&提取OrgTypeToExclude
&TransactionId
下一步很容易。请检查下面的代码。
ppcbkaq52#
首先,您似乎忽略了一个事实,即事务也是一个数组,我们可以使用explode来处理它:
另外,array_contains想要一个值而不是一列作为它的第二个参数。我不知道有哪个版本支持引用列,因此我们将生成一个自定义项:
然后我们可以这样修改连接条件:
预期结果: