将多个S3文件夹/路径读入PySpark

r3i60tvu  于 2022-11-28  发布在  Spark
关注(0)|答案(2)|浏览(186)

我正在使用PySpark进行大数据分析。我可以使用以下命令导入存储在特定桶的特定文件夹中的所有CSV文件:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file:///home/path/datafolder/data2014/*.csv')

(其中 * 相当于通配符)
我的问题如下:
1.如果我想对2014年和2015年的数据进行分析,例如文件1是.load('file:///home/path/SFweather/data2014/*.csv'),文件2是.load('file:///home/path/SFweather/data2015/*.csv'),文件3是.load('file:///home/path/NYCweather/data2014/*.csv'),文件4是.load('file:///home/path/NYCweather/data2015/*.csv')。我如何同时导入多个路径以得到一个 Dataframe ?我是否需要将它们分别存储为 Dataframe ,然后在PySpark中将它们连接在一起?(您可以假设所有CSV都具有相同的模式)
1.假设现在是2014年11月。如果我想再次运行分析,但在“最新数据”上运行,例如,当它是2014年12月时,在12月14日运行,该怎么办?例如,我想加载文件2:.load('file:///home/path/datafolder/data2014/dec14/*.csv')并使用以下文件:.load('file:///home/path/datafolder/data2014/nov14/*.csv')。是否有办法安排Jupyter笔记本(或类似的)来更新加载路径并导入最新的运行(在这种情况下,“nov 14”将被替换为“dec 14”,然后是“jan 15”等)。
我浏览了前面的问题,但无法找到答案,因为这是AWS / PySpark集成特定的。
提前感谢您的帮助!

  • [背景:我已经被允许访问来自不同团队的包含各种大数据集的许多S3桶。将其复制到我的S3桶中,那么构建一个Jupyter笔记本似乎比直接从数据桶中提取数据并在其上构建一个模型/表/等并将处理后的输出保存到数据库中要多得多。如果我的想法是完全错误的,请阻止我!:)]*
0s7z1bwu

0s7z1bwu1#

只要文件格式相同,就可以使用通配符读入多个路径。
在您的示例中:

.load('file:///home/path/SFweather/data2014/*.csv')
.load('file:///home/path/SFweather/data2015/*.csv')
.load('file:///home/path/NYCweather/data2014/*.csv')
.load('file:///home/path/NYCweather/data2015/*.csv')

您可以将上面的4个load语句替换为以下路径,以将所有csv一次性读入一个 Dataframe :

.load('file:///home/path/*/*/*.csv')

如果您想更具体地说明以避免阅读某些文件/文件夹,可以执行以下操作:

.load('file:///home/path/[SF|NYC]weather/data201[4|5]/*.csv')
wr98u20j

wr98u20j2#

您可以使用模式字符串列表一次加载多个路径。pyspark.sql.DataFrameReader.load方法接受一个路径字符串列表,如果您无法使用单个Hadoop glob模式表示要加载的所有路径,这将非常有用:

?
    Matches any single character.

*
    Matches zero or more characters.

[abc]
    Matches a single character from character set {a,b,c}.

[a-b]
    Matches a single character from the character range {a...b}.
    Note that character a must be lexicographically less than or
    equal to character b.

[^a]
    Matches a single character that is not from character set or
    range {a}.  Note that the ^ character must occur immediately
    to the right of the opening bracket.

\c
    Removes (escapes) any special meaning of character c.

{ab,cd}
    Matches a string from the string set {ab, cd}

{ab,c{de,fh}}
    Matches a string from the string set {ab, cde, cfh}

例如,如果要加载以下路径:

[
    's3a://bucket/prefix/key=1/year=2010/*.csv',
    's3a://bucket/prefix/key=1/year=2011/*.csv',
    's3a://bucket/prefix/key=2/year=2020/*.csv',
    's3a://bucket/prefix/key=2/year=2021/*.csv',
]

您可以将它们简化为两种路径模式,

  • s3a://bucket/prefix/key=1/year=201[0-1]/*.csv
  • s3a://bucket/prefix/key=2/year=202[0-1]/*.csv

并调用load()两次。您可以更进一步,使用{ab,cd}交替将它们简化为单个模式字符串,但我认为使用glob模式并通过对load()的一次调用来表示路径的最可读方式是传递一个路径模式列表:

spark.read.format('csv').load(
    [
        's3a://bucket/prefix/key=1/year=201[0-1]/*.csv',
        's3a://bucket/prefix/key=2/year=202[0-1]/*.csv',
    ]
)

对于您在第1期中列出的路径,您可以使用单个模式字符串来表示所有四个路径:

'file:///home/path/{NY,SF}weather/data201[45]/*.csv'

对于问题2,可以编写逻辑来构造要加载的路径。

相关问题