为序列文件格式的文件获取pyspark中的hdfs文件路径

k2arahey  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(609)

我在hdfs上的数据是序列文件格式。我正在使用pyspark(spark 1.6)并尝试实现两件事:
数据路径包含一个yyyy/mm/dd/hh格式的时间戳,我想将其引入数据本身。我尝试了sparkcontext.wholetextfiles,但我认为它可能不支持序列文件格式。
如果我想处理一天的数据,并且想把日期输入到数据中,我该如何处理上面的问题?在本例中,我将加载yyyy/mm/dd/*格式的数据。
谢谢你的指点。

inkz8wg9

inkz8wg91#

如果存储类型与sql类型兼容,并且使用spark2.0,那么就非常简单了。导入 input_file_name :

from pyspark.sql.functions import input_file_name

读取文件并转换为 DataFrame :

df = sc.sequenceFile("/tmp/foo/").toDF()

添加文件名:

df.withColumn("input", input_file_name())

如果此解决方案不适用于您的情况,那么通用的解决方案是直接列出文件(对于您可以使用的HDF) hdfs3 图书馆):

files = ...

逐个读取添加文件名:

def read(f):
    """Just to avoid problems with late binding"""
    return sc.sequenceFile(f).map(lambda x: (f, x))

rdds = [read(f) for f in files]

和工会:

sc.union(rdds)

相关问题