如何知道文件是否已转储到badrecordspath?[azure databricks-Spark]

k2arahey  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(268)

我想用 badrecordspath 在spark in in azure databricks中获取与作业相关的损坏记录的计数,但没有简单的方法可以知道:
如果文件已写入
文件已写入哪个分区
我想也许我可以用这样的代码检查最后一个分区是不是在最后60秒内创建的:

from datetime import datetime, timedelta
import time
import datetime

df = spark.read.format('csv').option("badRecordsPath", corrupt_record_path)

partition_dict = {} #here the dictionnary of partition and corresponding timestamp
for i in dbutils.fs.ls(corrupt_record_path):
  partition_dict[i.name[:-1]]=time.mktime(datetime.datetime.strptime(i.name[:-1], "%Y%m%dT%H%M%S").timetuple())

# here i get the timestamp of one minute ago

submit_timestamp_utc_minus_minute = datetime.datetime.now().replace(tzinfo = timezone.utc) - timedelta(seconds=60)
submit_timestamp_utc_minus_minute = time.mktime(datetime.datetime.strptime(submit_timestamp_utc_minus_minute.strftime("%Y%m%dT%H%M%S"), "%Y%m%dT%H%M%S").timetuple())

# Here i compare the latest partition to check if is more recent than 60 seconds ago

if partition_dict[max(partition_dict, key=lambda k: partition_dict[k])]>submit_timestamp_utc_minus_minute :
  corrupt_dataframe = spark.read.format('json').load(corrupt_record_path+partition+'/bad_records')
  corrupt_records_count = corrupt_dataframe.count()
else:
  corrupt_records_count = 0

但我看到两个问题:
这是一个很大的开销(好的代码也可以写得更好,但仍然)
我甚至不确定在读取作业中何时定义分区名称。是在工作的开始还是结束?如果是在一开始,那么60秒根本就不相关。
顺便说一句,我不能对corrupt\u records\u列使用permissive read,因为我不想缓存Dataframe(您可以在这里看到我的另一个问题)
任何建议或意见将不胜感激!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题