pyspark 将 Dataframe 与列表字段连接

taor4pac  于 2023-02-03  发布在  Spark
关注(0)|答案(1)|浏览(194)

我正在使用pyspark的结构化流,我正在从一个Kafka主题中阅读一个键,它是一个整数,以及一个值,它是一个逗号分隔的整数列表
我试图将这个 Dataframe 与从MongoDB中得到的另一个 Dataframe 连接起来。我也可以根据Kafka Dataframe 的值来做一个过滤器,这些值出现在MongoDB Dataframe 的“id”列中(尽管我不知道这个概念是否正确)
Kafka Dataframe :
| 键|价值|
| - ------|- ------|
| 1个|二九七|
MongoDB Dataframe
| 姓名|身份证|
| - ------|- ------|
| 营地_1|1个|
| 营地_2|九|
| 营地_3|五个|
| 营地_4|七|
| 营地_5|第二章|
所以,结果应该是
| 姓名|身份证|
| - ------|- ------|
| 营地_5|第二章|
| 营地_2|九|
| 营地_4|七|
我考虑使用join,因为我无法迭代Kafka Dataframe 的“value”字段中列表的值

shyt4zoc

shyt4zoc1#

您可以尝试先使用分解,然后再使用连接

from pyspark.sql.functions import explode

kafkaData = [("1", [2, 7, 9])]
kafkaDf = spark.createDataFrame(kafkaData, schema=["key", "Value"])

mongoData = [("Camp_1", 1), ("Camp_2", 9), ("Camp_3", 5), ("Camp_4", 7), ("Camp_5", 2)]
mongoDf = spark.createDataFrame(mongoData, schema=["name", "id"])

kafkaDfExploded = kafkaDf.select(explode("Value").alias("id"))
kafkaDfExploded.join(mongoDf, "id", "left").select("name", "id").show()

输出:

+------+---+
|  name| id|
+------+---+
|Camp_4|  7|
|Camp_2|  9|
|Camp_5|  2|
+------+---+

你可以添加order by,如果它与你相关,你也可以广播df之一,如果你知道它会很小。
如果你的列表2,7,9不是一个数组而是一个字符串,你可以先把它拆分,然后剩下的也是类似的

相关问题