我有一个df1
Spark Dataframe
id transactions
1 [1, 2, 3, 5]
2 [1, 2, 3, 6]
3 [1, 2, 9, 8]
4 [1, 2, 5, 6]
root
|-- id: int (nullable = true)
|-- transactions: array (nullable = false)
|-- element: int(containsNull = true)
None
我有一个df2
Spark Dataframe
items cost
[1] 1.0
[2] 1.0
[2, 1] 2.0
[6, 1] 2.0
root
|-- items: array (nullable = false)
|-- element: int (containsNull = true)
|-- cost: int (nullable = true)
None
我想检查项目列中的所有数组元素是否都在交易列中。
第一行([1, 2, 3, 5]
)包含来自items列的[1],[2],[2, 1]
。因此,我需要总结其相应的成本:1.0 + 1.0 + 2.0 = 4.0
我想要的输出是
id transactions score
1 [1, 2, 3, 5] 4.0
2 [1, 2, 3, 6] 6.0
3 [1, 2, 9, 8] 4.0
4 [1, 2, 5, 6] 6.0
我尝试使用collect()
/toLocalIterator
的循环,但似乎效率不高。我会有大量的数据。
我认为创建一个这样的UDF将解决这个问题。但它抛出一个错误。
from pyspark.sql.functions import udf
def containsAll(x, y):
result = all(elem in x for elem in y)
if result:
print("Yes, transactions contains all items")
else :
print("No")
contains_udf = udf(containsAll)
dataFrame.withColumn("result", contains_udf(df2.items, df1.transactions)).show()
还有别的路吗
2条答案
按热度按时间llew8vvj1#
2.4之前的有效udf(注意它必须返回一些东西
在2.4或更高版本中,不需要udf:
用途:
结果:
如果
df2
很小,最好将其用作局部变量:最后才有可能爆款加入
然后将结果与原始的
df1
连接起来,但这不是一个简单的解决方案,需要多次 Shuffle 。它可能仍然优于笛卡尔积(
crossJoin
),但它将取决于实际数据。yeotifhr2#
**Spark 3.0+**多了一个使用
forall
的选项Spark 3.1+ -
F.forall('look_for', lambda x: F.array_contains('look_in', x))
的替代语法将其与选项(Spark 2.4中的
array_intersect
)进行比较在某些情况下,从数组内部删除空值可能很有用,使用Spark 3.4+中的
array_compact
最简单。