我试图获取通过AWS Glue中的S3数据目录加载的每个文件的输入文件名(或路径)。
我在一些地方使用了read,input_file_name()
应该提供这些信息(尽管需要注意的是,这只在调用from_catalog
而不是from_options
时有效,我相信我是这样做的!)).
因此,下面的代码看起来应该可以工作,但总是为每个input_file_name
返回空值。
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
database='database_name',
table_name='table_name',
transformation_ctx='fm_source',
)
df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
结果输出:
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
是否有其他方法可以创建框架来确保input_file_name()
被填充?现在,我尝试通过create_dynamic_frame.from_catalog
、create_dynamic_frame.from_options
和getSource().getFrame()
构建源帧,但得到的结果是相同的,每个都是空的input_file_name
列。
8条答案
按热度按时间k97glaaz1#
我还添加了我的经验,在我的情况下,我收到了一个空的结果,因为调用
cache()
方法。例如:
我收到
但是如果删除
df.cache()
行,input_file_name
列就会正确地显示输入文件名。解决方法可能是在缓存之前调用
F.input_file_name()
。o2rvlv0m2#
我相信这在使用
groupFiles
选项时是不可能的,因为Glue在幕后连接文件以创建最佳的输入数量。因此,input_file_name
的概念在此上下文中没有意义,因为原始文件路径不再是直接输入。然而,文档在某种意义上有点误导,即使输入少于50,000个文件,如果不显式禁用该选项,Glue也会根据文件大小将输入连接起来。在我们的例子中,我们有成千上万的小输入文件(〈1 MB)导致这种行为。
您可以通过显式禁用分组来轻松验证这一点(注意,对于类似于我们的场景,这将对性能产生严重影响:
当然,最好不要依赖于输入状态或上下文,因此我们最终编写了一个在S3
PUT
上触发的AWS Lambda,它将元数据(包括文件名和路径)写入文件本身。c0vxltue3#
我发现了另一个奇怪的行为。当我在
input_file_path
之前添加limit()
时,id也不起作用。此代码的文件名为空
这个代码有效
我花了一些时间才弄清楚这一点,因为我试图通过只阅读几行来加快调试速度。
ztyzrc3y4#
我遇到了和Vzzarr一样的问题。当我在dataframe上调用
cache
后创建input_file_path
列时,文件路径为空。但是当我在调用cache
之前创建input_file_path
时,它工作了。omjgkv6w5#
如果上面的方法都不行,再加上这个。.
以清除群集中的所有缓存。
t5fffqht6#
有时,有一些边缘情况,其中这个方法,'input_file_name()',在DynamicFrame中不能像预期的那样工作。在这种情况下,您可以尝试对DynamicFrame使用“attachFilename”选项来检索记录的文件路径。
下面是一个例子:
==验证码:
uoifb46i7#
这个问题很重要,因为有些人可能会在生产环境中使用此功能。我们在MediaQX的数据团队中做了一些调查,以下是我们的发现。感谢@Leyla Helin Çetin和@Eren Sakarya。
在研究结果之前,请确保您没有在Glue上使用此
input_file_name
功能,因为当文件大小很小时,它会中断。请确保将必要的列添加到文件中,并且不依赖于路径。如果你使用的是纯Spark,你可能不需要担心这一点。我们检查了所有相关帖子的回复,我将链接它们,为即将到来的问题持有人提供循环阅读:
在这种情况下,这是最合理的解决方案,但在glue上下文中,尚不清楚它是否完美,您可能会错过一些glue功能,如书签:如何在AWS Glue作业中追加一个包含源名称的新列?
有几种方法显示了Map函数方法,但它们都使用
input_file_name
函数,因此对于生产解决方案来说并不准确。在AWS Glue中使用S3文件夹结构作为 meta数据有些人通过使用作业名称解决了他们的问题,但对于不同的配置,这是不合理的:AWS Glue: How to add a column with the source filename in the output?
在这个线程中,讨论了
groupFiles
选项,在AWS文档中,它声明它不适用于parquet文件,但实际上它可以工作,并且它不是这个问题的直接解决方案。另一种解决方案是删除文件名中的点,但在我们的情况下,这不是一个解决方案:Glue dynamic frame is not populating from s3 bucket
您可以通过设置以下参数使函数工作:
spark.conf.set("spark.sql.files.maxPartitionBytes", "1MB")
但是,我们认为它增加了阅读时间,并且使
input_file_name
功能正常工作。我们认为,当阅读过程非常快时,该功能停止。请注意,读取文件的总大小应该大于您设置的参数,因此它也不是一个完美的解决方案。除此之外,它还会降低作业的速度,因为该参数的默认值是128 MB。我希望这个答案能有所帮助,请大声说出您的发现,以便AWS工作人员在修复此错误时可能会使用此答案。
干杯。
vof42yt18#
这发生在我身上是因为我的数据块集群有一个不推荐的运行时(7.6)我试过7。3、工作中的LTS:)