如何编写PySpark脚本将Emil内容转换为CSV文件的长字符串?

pes8fvy9  于 2023-07-31  发布在  Spark
关注(0)|答案(1)|浏览(114)

产品描述:
我有一个数据集,其中包含一列“笔记”与电子邮件内容。我在PySpark中尝试了一个Python脚本,使用regexp_replace()函数将电子邮件字符串中的特殊字符替换为一行,以避免CSV分隔符,新行等。

# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','/[^0-9]+,/"|\n|\t|\r','')) 
t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[,|"\n\r\t^>_]+','-')) 
# t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes','[^A-Z0-9_]+',''))

字符串
在regexp_replace()之后,数据集中的电子邮件内容是一个长字符串,在数据框中以“-”分隔。(我选择“-”、“”字符)
注解='-根据客户-需要详细清洁-从:XXXXXX <XXXXXXXX@commonspirit.org- -发送:星期四- 2023年5月18日下午4:29至:XXXXXXSupport <XXXXXXXXX.com-Subject:- 以下是您问题的答案:-1。卡尔卡尔操作员状态的其他注解-未正确处理。akakakakakak.-2.步骤:未知-ii.未知-3.患者/用户参与-i。否-4.呼叫方联系信息:1. xxxxxx- 2.内窥镜检查协调员-3。11111111111 4. XXXXXXXX.org-')
我已经尝试了上面的代码,但它不工作。当CSV文件打开时,保存的CSV文件中的输出将电子邮件内容拆分为多行:x1c 0d1x的数据
使用自定义函数保存为CSV格式

def write_csv_with_specific_file_name(sc, df, path, filename):
    file_format = 'csv'
    df.coalesce(1).write.option("header", "true").mode('overwrite').format(file_format).save(path)
    try:
        sc_uri = sc._gateway.jvm.java.net.URI
        sc_path = sc._gateway.jvm.org.apache.hadoop.fs.Path
        file_system = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
        configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
        # fs = file_system.get(sc_uri("abfss://<container-name>@<account-name>.dfs.core.windows.net/<file-path>/"), configuration())
        fs = sc_path(path).getFileSystem(sc._jsc.hadoopConfiguration())
        src_path = None
        status = fs.listStatus(sc_path(path))
        for fileStatus in status:
            temp = fileStatus.getPath().toString()
            if "part" in temp:
                src_path = sc_path(temp)
        dest_path = sc_path(path + filename)
        if fs.exists(src_path) and fs.isFile(src_path):
            fs.rename(src_path, dest_path)
            fs.delete(src_path, True)
    except Exception as e:
        raise Exception("Error renaming the part file to {}:".format(filename, e))


我不确定我做错了什么。有人能帮帮我吗?

谢谢

lyr7nygr

lyr7nygr1#

我在regex_replace函数中使用了下面的正则表达式来获得所需的输出:

t = maintnence_request_df.withColumn('Notes', regexp_replace('Notes', r'[,\|"\n\r\t^>_-]+', '-'))

字符串
如您所见,下面的输出在一行中:


的数据
您可以使用以下代码将 Dataframe 写入所需路径中的csv文件:

email_df.coalesce(1).write.csv("<path>/<filename>.csv", header=True, mode="overwrite")

相关问题