我有一个在databricks损坏记录的问题。我们要对损坏的记录进行计数,并将损坏的记录保存在特定位置作为增量表。为了做到这一点,我们正在阅读使用 PERMISSIVE
并在此基础上进行查询 _corrupt_record
列。
我们在azuredatabricks中将pyspark与apachespark3.0.1结合使用。
下面是我们得到的错误消息:
自spark 2.3以来,当引用的列仅包含内部损坏记录列(默认情况下命名为\u corrupt \u record)时,不允许从原始json/csv文件进行查询。例如:spark.read.schema(schema).json(file).filter($“\u corrupt\u record”.isnotnull).count()和spark.read.schema(schema).json(file).select(“\u corrupt\u record”).show()。
根据此文档,如果要查询列损坏的记录,必须缓存或保存数据。
但我们不想在etl中缓存数据。etl用于在同一集群上运行的许多作业,我们可以输入150gb的大文件。缓存数据可能会导致集群崩溃。
有没有办法在不缓存数据的情况下查询这些损坏的记录?
1将数据保存在blob存储上可能是另一种选择,但这听起来开销很大。
2我们也试过使用这个选项 BadRecordsPath
:将坏记录保存到badrecordspath并读回以对其进行计数,但是没有简单的方法可以知道是否写入了坏记录文件(以及该文件写入了哪个分区)。分区看起来像 /20210425T102409/bad_records
看到我的另一个问题了吗
3另一种方法是从允许读取中减去格式错误的读取。如:
dataframe_with_corrupt = spark.read.format('csv').option("mode", "PERMISSIVE").load(path)
dataframe_without_corrupt = spark.read.format('csv').option("mode", "DROPMALFORMED").load(path)
corrupt_df = dataframe_with_corrupt.exceptAll(dataframe_without_corrupt)
但我不确定它会比缓存占用更少的内存!
任何建议或意见将不胜感激!提前谢谢
暂无答案!
目前还没有任何答案,快来回答吧!