我有一个有两列的数据框- filepath(wasbs blob的文件路径),string,我想用这个文件名把每个字符串写入一个单独的blob。我该怎么做呢?
2skhul331#
你一次只能写一个wasb容器-不确定这是否是你问题的一部分,但我想澄清一下。另外,spark写文件到目录,而不是单个文件。如果你想完全完成你所要求的,你必须重新分区到1个分区,并按文件路径分区。完成该步骤后,您需要使用azure sdk来重命名文件,并将它们上移到父目录。
nfzehxib2#
更新后的回答:我发现了一种更简单的方法,可以使用dbutils.fs.put来实现这一点。您需要遍历DataFrame的每一行,为每一行调用dbutils.fs.put()。假设您的输入文件(假设为CSV)包含两列,如下所示:
dbutils.fs.put
filepath, stringValue wasbs://container@myaccount.blob.core.windows.net/demo1.txt,"demo string 1" wasbs://container@myaccount.blob.core.windows.net/demo2.txt,"demo string 2" wasbs://container@myaccount.blob.core.windows.net/demo3.txt,"demo string 3" wasbs://container@myaccount.blob.core.windows.net/demo4.txt,"demo string 4" wasbs://container@myaccount.blob.core.windows.net/demo5.txt,"demo string 5"
您可以使用下列程式码,在输入DataFrame中的每一列进行循环:
df = spark.read.option("header", True).csv("wasbs://container@myaccount.blob.core.windows.net/demo-data.csv") rowList = df.rdd.collect() for row in rowList: dbutils.fs.put(str(row[0]), str(row[1]), True)
put方法将一个给定的String写入一个以UTF-8编码的文件,因此使用该方法可以循环遍历DataFrame中的每条记录,将第一列作为文件路径传入,将第二列作为要写入文件的字符串内容传入。这样做的好处是可以将字符串写入单个文件,因此您不需要经历重命名和移动文件的过程。旧答案:由于Spark的分布式特性,将DataFrame写入文件会导致创建一个包含多个文件的目录。您可以使用coalesce强制处理单个工作线程和文件,其名称将以part-0000开头。免责声明:建议仅对小文件使用此选项,因为较大的数据文件可能导致内存不足异常。要完成您正在尝试的任务,您需要遍历DataFrame的每一行,为每一行创建一个新的DataFrame,其中只包含要写入文件的字符串值。假设您的输入文件(假设为CSV)包含两列,如下所示:
coalesce
part-0000
filepath, stringValue wasbs://container@myaccount.blob.core.windows.net/demo1,"demo string 1" wasbs://container@myaccount.blob.core.windows.net/demo2,"demo string 2" wasbs://container@myaccount.blob.core.windows.net/demo3,"demo string 3" wasbs://container@myaccount.blob.core.windows.net/demo4,"demo string 4" wasbs://container@myaccount.blob.core.windows.net/demo5,"demo string 5"
from pyspark.sql import * from pyspark.sql.types import StringType df = spark.read.option("header", True).csv("wasbs://container@myaccount.blob.core.windows.net/demo-data.csv") rowList = df.rdd.collect() for row in rowList: dfRow = spark.createDataFrame([str(row[1])], StringType()) dfRow.coalesce(1).write.mode("overwrite").text(row[0])
这将导致在Blob存储帐户容器中创建名为demo 1、demo 2、demo 3、demo 4和demo 5的目录。每个目录都将包含多个文件。每个目录中名称以part-0000开头的文件将包含字符串值。如果需要这些文件具有不同的名称,并且位于不同的位置,则可以使用dbutils.fs方法来处理文件的移动和重命名。如果需要,还可以使用此方法对创建的目录进行任何清理。
dbutils.fs
2条答案
按热度按时间2skhul331#
你一次只能写一个wasb容器-不确定这是否是你问题的一部分,但我想澄清一下。另外,spark写文件到目录,而不是单个文件。如果你想完全完成你所要求的,你必须重新分区到1个分区,并按文件路径分区。
完成该步骤后,您需要使用azure sdk来重命名文件,并将它们上移到父目录。
nfzehxib2#
更新后的回答:
我发现了一种更简单的方法,可以使用
dbutils.fs.put
来实现这一点。您需要遍历DataFrame的每一行,为每一行调用dbutils.fs.put()。假设您的输入文件(假设为CSV)包含两列,如下所示:
您可以使用下列程式码,在输入DataFrame中的每一列进行循环:
put方法将一个给定的String写入一个以UTF-8编码的文件,因此使用该方法可以循环遍历DataFrame中的每条记录,将第一列作为文件路径传入,将第二列作为要写入文件的字符串内容传入。
这样做的好处是可以将字符串写入单个文件,因此您不需要经历重命名和移动文件的过程。
旧答案:
由于Spark的分布式特性,将DataFrame写入文件会导致创建一个包含多个文件的目录。您可以使用
coalesce
强制处理单个工作线程和文件,其名称将以part-0000
开头。免责声明:建议仅对小文件使用此选项,因为较大的数据文件可能导致内存不足异常。
要完成您正在尝试的任务,您需要遍历DataFrame的每一行,为每一行创建一个新的DataFrame,其中只包含要写入文件的字符串值。
假设您的输入文件(假设为CSV)包含两列,如下所示:
您可以使用下列程式码,在输入DataFrame中的每一列进行循环:
这将导致在Blob存储帐户容器中创建名为demo 1、demo 2、demo 3、demo 4和demo 5的目录。每个目录都将包含多个文件。每个目录中名称以
part-0000
开头的文件将包含字符串值。如果需要这些文件具有不同的名称,并且位于不同的位置,则可以使用
dbutils.fs
方法来处理文件的移动和重命名。如果需要,还可以使用此方法对创建的目录进行任何清理。