我正在使用PySpark进行大数据分析。我可以使用以下命令导入存储在特定桶的特定文件夹中的所有CSV文件:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file:///home/path/datafolder/data2014/*.csv')
(其中 * 相当于通配符)
我的问题如下:
1.如果我想对2014年和2015年的数据进行分析,例如文件1是.load('file:///home/path/SFweather/data2014/*.csv')
,文件2是.load('file:///home/path/SFweather/data2015/*.csv')
,文件3是.load('file:///home/path/NYCweather/data2014/*.csv')
,文件4是.load('file:///home/path/NYCweather/data2015/*.csv')
。我如何同时导入多个路径以得到一个 Dataframe ?我是否需要将它们分别存储为 Dataframe ,然后在PySpark中将它们连接在一起?(您可以假设所有CSV都具有相同的模式)
1.假设现在是2014年11月。如果我想再次运行分析,但在“最新数据”上运行,例如,当它是2014年12月时,在12月14日运行,该怎么办?例如,我想加载文件2:.load('file:///home/path/datafolder/data2014/dec14/*.csv')
并使用以下文件:.load('file:///home/path/datafolder/data2014/nov14/*.csv')
。是否有办法安排Jupyter笔记本(或类似的)来更新加载路径并导入最新的运行(在这种情况下,“nov 14”将被替换为“dec 14”,然后是“jan 15”等)。
我浏览了前面的问题,但无法找到答案,因为这是AWS / PySpark集成特定的。
提前感谢您的帮助!
- [背景:我已经被允许访问来自不同团队的包含各种大数据集的许多S3桶。将其复制到我的S3桶中,那么构建一个Jupyter笔记本似乎比直接从数据桶中提取数据并在其上构建一个模型/表/等并将处理后的输出保存到数据库中要多得多。如果我的想法是完全错误的,请阻止我!:)]*
2条答案
按热度按时间0s7z1bwu1#
只要文件格式相同,就可以使用通配符读入多个路径。
在您的示例中:
您可以将上面的4个load语句替换为以下路径,以将所有csv一次性读入一个 Dataframe :
如果您想更具体地说明以避免阅读某些文件/文件夹,可以执行以下操作:
wr98u20j2#
您可以使用模式字符串列表一次加载多个路径。
pyspark.sql.DataFrameReader.load
方法接受一个路径字符串列表,如果您无法使用单个Hadoop glob模式表示要加载的所有路径,这将非常有用:例如,如果要加载以下路径:
您可以将它们简化为两种路径模式,
s3a://bucket/prefix/key=1/year=201[0-1]/*.csv
和s3a://bucket/prefix/key=2/year=202[0-1]/*.csv
,并调用
load()
两次。您可以更进一步,使用{ab,cd}
交替将它们简化为单个模式字符串,但我认为使用glob模式并通过对load()
的一次调用来表示路径的最可读方式是传递一个路径模式列表:对于您在第1期中列出的路径,您可以使用单个模式字符串来表示所有四个路径:
对于问题2,可以编写逻辑来构造要加载的路径。