我想使用临时电子病历集群自动处理每日上传的文件。我在S3上的文件使用如下日期键进行分区:
2022-07-27-stats.csv
2022-07-28-stats.csv
...
每天,我都会将S3中的一个文件上传到‘hdfs:/raw/’中,并在文件路径‘hdfs:/raw/.csv’中使用作为Spark脚本中的读取路径,这样我就不必每天手动更新脚本中的读取路径来应对不断变化的日期键。
这可以很好地工作,但我希望能够在‘hdfs:/raw’中获取文件的日期关键字部分,这样我就可以使用字符串内插法将其添加到脚本的写入文件路径部分;这样,写入S3输出存储桶的文件可以进行类似的分区。是否有返回文件名的方法?
2条答案
按热度按时间bxpogfeg1#
您可以使用:
这将创建一个名为
path
的新列和从中读取文件的完整路径(当您想要区分通配符文件时非常有用)。例如,我的
data-first.csv
有一行(first
),第二个数据集data-second.csv
有一行(second
)。如果我读取文件并添加
input_file_name()
我得到以下输出:
您可以进行一些转换来提取日期,但这应该可以完成工作!
tjvv9vkg2#
S3上的文件使用如下日期键进行分区
星火/Hive不喜欢你这样划分。
如果您能够修改您的S3编写器,则可以改为写入这些路径,例如
然后,如果您将Spark/Have配置为只读
s3://bucket/prefix
,它将自动拥有一个分区的 Dataframe ,其中包含年、月和日的列。这样,您就不需要实际的文件名,因此不必解析日期值我不明白您所说的原始文件夹是什么意思,但同样的逻辑也适用于HDFS。理想情况下,如果HDFS是“原始CSV”,那么您应该从HDFS中读取Spark中的数据,应用模式,并可能聚合/清除 Dataframe ,然后以Parket或ORC的形式写入其他位置,如S3。然后,您将拥有查询该数据的有效方法。