在Pyspark中比较来自两个不同 Dataframe 的两个阵列

42fyovps  于 2023-01-08  发布在  Spark
关注(0)|答案(1)|浏览(423)

我有两个 Dataframe ecah有一个数组(字符串)列。
我试图创建一个新的数据框,只过滤行中的数组元素之一与其他匹配的行。

#first dataframe
main_df = spark.createDataFrame([('1', ['YYY', 'MZA']),
    ('2', ['XXX','YYY']),
    ('3',['QQQ']),
     ('4', ['RRR', 'ZZZ', 'BBB1'])],
    ('No', 'refer_array_col'))

#second dataframe
df = spark.createDataFrame([('1A',    '3412asd','value-1',    ['XXX', 'YYY', 'AAA']),
('2B',  '2345tyu','value-2',    ['DDD', 'YFFFYY', 'GGG', '1']),
('3C',  '9800bvd',  'value-3',  ['AAA']),
 ('3C', '9800bvd',  'value-1',  ['AAA', 'YYY', 'CCCC'])],
('ID',  'Company_Id',   'value' ,'array_column'))

df.show()

+---+----------+-------+--------------------+
| ID|Company_Id|  value|      array_column  |
+---+----------+-------+--------------------+
| 1A|   3412asd|value-1|   [XXX, YYY, AAA]  |
| 2B|   2345tyu|value-2|[DDD, YFFFYY, GGG, 1]|
| 3C|   9800bvd|value-3|             [AAA]   |
| 3C|   9800bvd|value-1|  [AAA, YYY, CCCC]   |
+---+----------+-------+---------------------+
    • 我尝试的代码:**
  • 主要思想是使用rdd. toLocalIterator(),因为在同一for循环中有一些其他函数依赖于此过滤器 *
for x in main_df.rdd.toLocalIterator:
    a = main_df["refer_array_col"]
     b = main_df["No"]
     some_x_filter = F.col('array_coulmn').isin(b)  
   

    final_df = df.filter(
       # filter 1
       some_x_filter &           
       # second filter is to compare 'a' with array_column - i tried using F.array_contains
       (F.array_contains(F.col('array_column'), F.lit(a)))     
)
  • some_x_filter也以类似的方式工作
  • some_x_filter正在比较字符串列数组中的字符串值。
  • 但是现在a包含一个字符串列表,我无法将其与array_column进行比较

在我的代码中,我得到了一个数组包含错误

    • 错误**
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList ['YYY', 'MZA']

谁能告诉我在第二个过滤器上我可以选择使用什么?

7d7tgy0s

7d7tgy0s1#

从我所理解的基础上,我们的谈话在评论。
本质上,您的需求是将数组列与Python列表进行比较。
因此,这将完成工作

df.withColumn("asArray", F.array(*[F.lit(x) for x in b]))

相关问题