PySpark:列不可迭代

bqf10yzr  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(122)

我有如下Spark Dataframe :

from pyspark.sql import SparkSession, functions as F

df = spark.sql("SELECT transaction_id, transaction_label, module_name, length(transaction_label) as length FROM all_trans")
df.show()

+---------------+-----------------+-----------+------+
| transaction_id|transaction_label|module_name|length|
+---------------+-----------------+-----------+------+
|0P2117292543428|              EDU|        mcc|     3|
| 0P211729824944|              EDU|        mcc|     3|
|  0P31172950208|              EDU|        mcc|     3|
|0P2117294027213|       FUN0402007|      regex|    10|
|0P2117294027213|            FUN04|        mcc|     5|
|0P2117293581427|       FUN0402007|      regex|    10|
|0P2117293581427|            FUN04|        mcc|     5|
|0P2117292967336|       FUN0402007|      regex|    10|
|0P2117292967336|            FUN04|        mcc|     5|
|0P2117292659416|       FUN0402007|      regex|    10|
|0P2117292659416|            FUN04|        mcc|     5|
|0P2117293159304|       FUN0402007|      regex|    10|
|0P2117293159304|            FUN04|        mcc|     5|
|0P2117293237687|       FUN0402007|      regex|    10|
|0P2117293237687|            FUN04|        mcc|     5|
|0P2117293548610|       FUN0402007|      regex|    10|
|0P2117293548610|            FUN04|        mcc|     5|
|0P2117293678239|       FUN0402007|      regex|    10|
|0P2117293678239|            FUN04|        mcc|     5|
|0P2117293840924|       FUN0402007|      regex|    10|
+---------------+-----------------+-----------+------+

我想比较相同transaction_idtransaction_label与不同module_nametransaction_label
我试探着:

df = (df.filter("module_name = 'mcc'").alias('m')
    .join(df.filter("module_name = 'regex'").alias('r'), 'transaction_id')
    .withColumn('check', F.col('m.transaction_label') == F.substring('r.transaction_label', 1, F.col('m.length')))
)
df.show()

其产生了:
TypeError:列不可迭代

htrmnn0y

htrmnn0y1#

substring中的第三个参数需要一个数字,但您提供的却是一个列。
在使用substring时切换到SQL. SQL可以处理这种情况。

df = (df.filter("module_name = 'mcc'").alias('m')
    .join(df.filter("module_name = 'regex'").alias('r'), 'transaction_id')
    .withColumn('check', F.col('m.transaction_label') == F.expr("substring(r.transaction_label, 1, m.length)"))
)

相关问题