我正在使用pyspark的结构化流,我正在从一个Kafka主题中阅读一个键,它是一个整数,以及一个值,它是一个逗号分隔的整数列表
我试图将这个 Dataframe 与从MongoDB中得到的另一个 Dataframe 连接起来。我也可以根据Kafka Dataframe 的值来做一个过滤器,这些值出现在MongoDB Dataframe 的“id”列中(尽管我不知道这个概念是否正确)
Kafka Dataframe :
| 键|价值|
| - ------|- ------|
| 1个|二九七|
MongoDB Dataframe
| 姓名|身份证|
| - ------|- ------|
| 营地_1|1个|
| 营地_2|九|
| 营地_3|五个|
| 营地_4|七|
| 营地_5|第二章|
所以,结果应该是
| 姓名|身份证|
| - ------|- ------|
| 营地_5|第二章|
| 营地_2|九|
| 营地_4|七|
我考虑使用join,因为我无法迭代Kafka Dataframe 的“value”字段中列表的值
1条答案
按热度按时间shyt4zoc1#
您可以尝试先使用分解,然后再使用连接
输出:
你可以添加order by,如果它与你相关,你也可以广播df之一,如果你知道它会很小。
如果你的列表2,7,9不是一个数组而是一个字符串,你可以先把它拆分,然后剩下的也是类似的