Azure数据块spark -写入blob存储

rsl1atfo  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(180)

我有一个有两列的数据框- filepath(wasbs blob的文件路径),string,我想用这个文件名把每个字符串写入一个单独的blob。我该怎么做呢?

2skhul33

2skhul331#

你一次只能写一个wasb容器-不确定这是否是你问题的一部分,但我想澄清一下。另外,spark写文件到目录,而不是单个文件。如果你想完全完成你所要求的,你必须重新分区到1个分区,并按文件路径分区。
完成该步骤后,您需要使用azure sdk来重命名文件,并将它们上移到父目录。

nfzehxib

nfzehxib2#

更新后的回答:
我发现了一种更简单的方法,可以使用dbutils.fs.put来实现这一点。您需要遍历DataFrame的每一行,为每一行调用dbutils.fs.put()。
假设您的输入文件(假设为CSV)包含两列,如下所示:

filepath, stringValue
wasbs://container@myaccount.blob.core.windows.net/demo1.txt,"demo string 1"
wasbs://container@myaccount.blob.core.windows.net/demo2.txt,"demo string 2"
wasbs://container@myaccount.blob.core.windows.net/demo3.txt,"demo string 3"
wasbs://container@myaccount.blob.core.windows.net/demo4.txt,"demo string 4"
wasbs://container@myaccount.blob.core.windows.net/demo5.txt,"demo string 5"

您可以使用下列程式码,在输入DataFrame中的每一列进行循环:

df = spark.read.option("header", True).csv("wasbs://container@myaccount.blob.core.windows.net/demo-data.csv")

rowList = df.rdd.collect()
for row in rowList:
  dbutils.fs.put(str(row[0]), str(row[1]), True)

put方法将一个给定的String写入一个以UTF-8编码的文件,因此使用该方法可以循环遍历DataFrame中的每条记录,将第一列作为文件路径传入,将第二列作为要写入文件的字符串内容传入。
这样做的好处是可以将字符串写入单个文件,因此您不需要经历重命名和移动文件的过程。
旧答案:
由于Spark的分布式特性,将DataFrame写入文件会导致创建一个包含多个文件的目录。您可以使用coalesce强制处理单个工作线程和文件,其名称将以part-0000开头。
免责声明:建议仅对小文件使用此选项,因为较大的数据文件可能导致内存不足异常。
要完成您正在尝试的任务,您需要遍历DataFrame的每一行,为每一行创建一个新的DataFrame,其中只包含要写入文件的字符串值。
假设您的输入文件(假设为CSV)包含两列,如下所示:

filepath, stringValue
wasbs://container@myaccount.blob.core.windows.net/demo1,"demo string 1"
wasbs://container@myaccount.blob.core.windows.net/demo2,"demo string 2"
wasbs://container@myaccount.blob.core.windows.net/demo3,"demo string 3"
wasbs://container@myaccount.blob.core.windows.net/demo4,"demo string 4"
wasbs://container@myaccount.blob.core.windows.net/demo5,"demo string 5"

您可以使用下列程式码,在输入DataFrame中的每一列进行循环:

from pyspark.sql import *
from pyspark.sql.types import StringType

df = spark.read.option("header", True).csv("wasbs://container@myaccount.blob.core.windows.net/demo-data.csv")

rowList = df.rdd.collect()
for row in rowList:
  dfRow = spark.createDataFrame([str(row[1])], StringType())
  dfRow.coalesce(1).write.mode("overwrite").text(row[0])

这将导致在Blob存储帐户容器中创建名为demo 1、demo 2、demo 3、demo 4和demo 5的目录。每个目录都将包含多个文件。每个目录中名称以part-0000开头的文件将包含字符串值。
如果需要这些文件具有不同的名称,并且位于不同的位置,则可以使用dbutils.fs方法来处理文件的移动和重命名。如果需要,还可以使用此方法对创建的目录进行任何清理。

相关问题