我正在寻找验证超过6亿行和最多30列的csv文件的每一行(解决方案必须处理该范围的几个大型csv文件)。
列可以是文本、日期或金额。csv必须用40条规则进行验证,有些规则将检查数量的正确性,有些规则将检查日期(格式)等…
必须保存每个验证规则的结果,并在以后显示。
一旦数据被验证,第二阶段的验证规则将被应用,基于这一次的总和,平均数…每个规则的结果也必须被保存。
我正在用spark加载文件。具有
session.read().format("com.databricks.spark.csv").option("delimiter",
"|").option("header", "false").csv(csvPath)
或
session.read().option("header", "true").text(csvPath);
要在每行上迭代,我看到有两个选项:
使用 dataset.map( row -> { something });
“something”应该验证每一行并将结果保存在某个地方
但是,由于“something”块将在executors中执行,因此我不知道如何将其返回到驱动程序,或者将其存储在可以从驱动程序进程检索它的位置。
第二种选择是使用 dataset.collect
:但它将导致内存不足,因为所有数据都将加载到驱动程序中。我们可以使用“take”方法,然后从数据集中删除子集(使用过滤器)并重复该操作,但我不习惯使用这种方法
我想知道是否有人能给我推荐一种稳健的方法来处理这类问题。基本上在验证规则的第二阶段保留spark,并使用spark或其他framwrok接收文件,执行并生成第一组验证规则
提前谢谢你的帮助
2条答案
按热度按时间nx7onnlm1#
你可以用
SparkSession
读取csv文件,然后按列对数据进行分区,并对数据进行批处理。例如,将数据写入不需要太多处理的外部数据库。如果您的业务逻辑要求您处理数据集/Dataframe的每一行,那么您可以使用
df.map()
. 如果你的逻辑可以同时在多个rdd上工作,你可以使用df.mapPartition()
。每记录开销高的任务在mapPartition
而不是用map
转变。考虑初始化数据库的情况。如果我们使用
map()
或者foreach()
,我们需要初始化的次数将等于rdd中的元素数。如果我们使用mapPartitions()
,我们需要初始化的次数将等于分区数uplii1fm2#
您只需将带有检查结果的列附加到原始Dataframe中,然后使用一组规则UDF来执行实际验证,如下所示:
第二个选项是使用dataset.collect
我建议不要这样做,而是从原始Dataframe和所有检查结果列中选择一个键字段,并将它们以列格式保存为parquet。
这会更快,因为写入是由所有执行器并行完成的,并且驱动程序不会成为瓶颈。它还很可能有助于避免oom问题,因为内存使用量分布在所有执行器上。