产品描述:
我有一个数据集,其中包含一列“笔记”与电子邮件内容。我在PySpark中尝试了一个Python脚本,使用regexp_replace()函数将电子邮件字符串中的特殊字符替换为一行,以避免CSV分隔符,新行等。
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','/[^0-9]+,/"|\n|\t|\r',''))
t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[,|"\n\r\t^>_]+','-'))
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[^A-Z0-9_]+',''))
字符串
在regexp_replace()之后,数据集中的电子邮件内容是一个长字符串,在数据框中以“-”分隔。(我选择“-”、“”字符)
注解='-根据客户-需要详细清洁-从:XXXXXX <XXXXXXXX@commonspirit.org- -发送:星期四- 2023年5月18日下午4:29至:XXXXXXSupport <XXXXXXXXX.com-Subject:- 以下是您问题的答案:-1。卡尔卡尔操作员状态的其他注解-未正确处理。akakakakakak.-2.步骤:未知-ii.未知-3.患者/用户参与-i。否-4.呼叫方联系信息:1. xxxxxx- 2.内窥镜检查协调员-3。11111111111 4. XXXXXXXX.org-')
我已经尝试了上面的代码,但它不工作。当CSV文件打开时,保存的CSV文件中的输出将电子邮件内容拆分为多行:x1c 0d1x的数据
使用自定义函数保存为CSV格式
def write_csv_with_specific_file_name(sc, df, path, filename):
file_format = 'csv'
df.coalesce(1).write.option("header", "true").mode('overwrite').format(file_format).save(path)
try:
sc_uri = sc._gateway.jvm.java.net.URI
sc_path = sc._gateway.jvm.org.apache.hadoop.fs.Path
file_system = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
# fs = file_system.get(sc_uri("abfss://<container-name>@<account-name>.dfs.core.windows.net/<file-path>/"), configuration())
fs = sc_path(path).getFileSystem(sc._jsc.hadoopConfiguration())
src_path = None
status = fs.listStatus(sc_path(path))
for fileStatus in status:
temp = fileStatus.getPath().toString()
if "part" in temp:
src_path = sc_path(temp)
dest_path = sc_path(path + filename)
if fs.exists(src_path) and fs.isFile(src_path):
fs.rename(src_path, dest_path)
fs.delete(src_path, True)
except Exception as e:
raise Exception("Error renaming the part file to {}:".format(filename, e))
型
我不确定我做错了什么。有人能帮帮我吗?
谢谢
1条答案
按热度按时间lyr7nygr1#
我在
regex_replace
函数中使用了下面的正则表达式来获得所需的输出:字符串
如您所见,下面的输出在一行中:
的数据
您可以使用以下代码将 Dataframe 写入所需路径中的csv文件:
型