提取并创建新的电子邮件列dataframe pyspark

plupiseo  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(130)

我有一个df,其中有一个列包含了我不想要的电子邮件和更多信息。下面是一些例子:

Email_Col
"Snow, John" <john.snow@stackoverflow.com>, "Stark, Arya" <starkarya@got.com>
"YourBoss" <yourbosss1@yourcurrentcompany.net>
"test1 <emailtest@tester.com>", "test2 <emailtest2@tester.com>", "test3" <emailtest3@tester.com>

我需要清理该列或创建一个新的包含电子邮件的列。下面是预期的输出,一个数组列:

New_Email_Col
[john.snow@stackoverflow.com, Stark, starkarya@got.com]
[yourbosss1@yourcurrentcompany.net]
[emailtest@tester.com emailtest2@tester.com, emailtest3@tester.com]

我的代码:

import re

def extract(col):
    for row in col:
        all_matches = re.findall(r'\w+.\w+@\w+.\w+', row)
    return all_matches

extract_udf = udf(lambda col: extract(col), ArrayType(StringType()))

df = df.withColumn(('emails'), extract_udf(col('to')))

我的错误:
Python异常错误:'类型错误:应为字符串或类似字节对象',来自,第4行下面是完整追溯

wwtsj6pe

wwtsj6pe1#

请不要使用udf--它们很慢,而且现在在绝大多数情况下都不需要。

F.expr("regexp_extract_all(Email_Col, '(?<=<).*?(?=>)', 0)")

Spark 3.1+提供regexp_extract_all
完整示例:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('''"Snow, John" <john.snow@stackoverflow.com>, "Stark, Arya" <starkarya@got.com>''',),
     ('''"YourBoss" <yourbosss1@yourcurrentcompany.net>''',),
     ('''"test1 <emailtest@tester.com>", "test2 <emailtest2@tester.com>", "test3" <emailtest3@tester.com>''',)],
    ['Email_Col'])

df = df.withColumn('Email_Col', F.expr("regexp_extract_all(Email_Col, '(?<=<).*?(?=>)', 0)"))

df.show(truncate=0)

# +--------------------------------------------------------------------+

# |Email_Col                                                           |

# +--------------------------------------------------------------------+

# |[john.snow@stackoverflow.com, starkarya@got.com]                    |

# |[yourbosss1@yourcurrentcompany.net]                                 |

# |[emailtest@tester.com, emailtest2@tester.com, emailtest3@tester.com]|

# +--------------------------------------------------------------------+

要添加单独的新列,请执行以下操作:

df = df.withColumn('New_Email_Col', F.expr("regexp_extract_all(Email_Col, '(?<=<).*?(?=>)', 0)"))

相关问题