我在一个pyspark Dataframe 中有一个列,其中每个条目都是一个字符串。
Sample Input 1: aaabbcca
Sample Output 1: a3b2c2a1
Sample Input 2: aabbbccaa
Sample Output 2: a2b3c2a2
我是一个数据工程的新手,所以我写了一个简单的Python程序来做这件事,它工作得很好-
name = input("enter the characters")
count = 1
strng = ""
for i in range(0,len(name)-1):
if name[i] == name[i+1]:
count = count+1
else:
strng = strng+name[i]
strng = strng+str(count)
count = 1
if i==len(name)-2:
if name[i]!=name[i+1]:
strng = strng+name[i+1]+"1"
else:
strng = strng+name[i]+str(count)
print(strng)
但是,当我尝试将此函数应用于pyspark dataframe中的列时,我得到了一个错误。
我的数据框架是-
+-----------+
| words|
+-----------+
| aaabbcca|
| aabbbccaa|
| abcd|
| dddeert|
|aaabbbacccd|
+-----------+
我的期望输出是
+-----------+-----------+
| words|coded_words|
+-----------+-----------+
| aaabbcca| a3b2c2a1|
| aabbbccaa| a2b3c2a2|
| abcd| a1b1c1d1|
| dddeert| d3e2r1t1|
|aaabbbacccd| a3b3a1c3d1|
+-----------+-----------+
这是我密码-
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import StringType
if __name__ == "__main__":
my_conf = SparkConf()
my_conf.set("spark.app.name","my 1st app")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
def code_func(name):
count = 1
strng = ""
ln = length(name)
for i in name:
if name[i] == name[i + 1]:
count = count + 1
else:
strng = strng + name[i]
strng = strng + str(count)
count = 1
if i == len(name) - 2:
if name[i] != name[i + 1]:
strng = strng + name[i + 1] + "1"
else:
strng = strng + name[i] + str(count)
return strng
df = spark.read.format("csv").option("path", "C:/Users/hp/OneDrive/Desktop/ddd.txt").load().toDF("words")
df2 = df.withColumn("coded_words", code_func(f.col("words")))
df2.show()
我得到以下错误:
Traceback (most recent call last):
File "C:\Users\hp\PycharmProjects\pysparkLearning\practice\test.py", line 33, in <module>
df2 = df.withColumn("coded_words", code_func(f.col("words")))
File "C:\Users\hp\PycharmProjects\pysparkLearning\practice\test.py", line 18, in code_func
for i in name:
File "C:\Users\hp\PycharmProjects\pysparkLearning\venv\lib\site-packages\pyspark\sql\column.py", line 344, in __iter__
raise TypeError("Column is not iterable")
TypeError: Column is not iterable
如何处理字符串形式列值,以及如何解决这个特殊问题?
2条答案
按热度按时间mwkjh3gx1#
这是回溯的两个重要部分:
line 18, in code_func for i in name
以及
TypeError: Column is not iterable
当你运行PySpark版本时,一个 Dataframe 列被传递给你的函数,但是这个列是不可迭代的(你不能像循环字符串一样循环它)。
相反,您希望为列中的每个 item 调用一次函数,而不是对列本身调用一次。
zsbz8rwp2#
你可以重新构造你的函数,使它接受列所在行的字符串作为输入,然后你可以通过使它成为一个
udf
来逐行应用这个函数。