如何加载一个大的csv文件,验证每一行并处理数据

dnph8jn4  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(363)

我正在寻找验证超过6亿行和最多30列的csv文件的每一行(解决方案必须处理该范围的几个大型csv文件)。
列可以是文本、日期或金额。csv必须用40条规则进行验证,有些规则将检查数量的正确性,有些规则将检查日期(格式)等…
必须保存每个验证规则的结果,并在以后显示。
一旦数据被验证,第二阶段的验证规则将被应用,基于这一次的总和,平均数…每个规则的结果也必须被保存。
我正在用spark加载文件。具有

session.read().format("com.databricks.spark.csv").option("delimiter",
         "|").option("header", "false").csv(csvPath)

session.read().option("header", "true").text(csvPath);

要在每行上迭代,我看到有两个选项:
使用 dataset.map( row -> { something }); “something”应该验证每一行并将结果保存在某个地方
但是,由于“something”块将在executors中执行,因此我不知道如何将其返回到驱动程序,或者将其存储在可以从驱动程序进程检索它的位置。
第二种选择是使用 dataset.collect :但它将导致内存不足,因为所有数据都将加载到驱动程序中。我们可以使用“take”方法,然后从数据集中删除子集(使用过滤器)并重复该操作,但我不习惯使用这种方法
我想知道是否有人能给我推荐一种稳健的方法来处理这类问题。基本上在验证规则的第二阶段保留spark,并使用spark或其他framwrok接收文件,执行并生成第一组验证规则
提前谢谢你的帮助

nx7onnlm

nx7onnlm1#

你可以用 SparkSession 读取csv文件,然后按列对数据进行分区,并对数据进行批处理。例如,将数据写入不需要太多处理的外部数据库。

dataFrame
    .write
    .mode(saveMode)
    .option("batchsize", 100)
    .jdbc(url, "tablename", new java.util.Properties())

如果您的业务逻辑要求您处理数据集/Dataframe的每一行,那么您可以使用 df.map() . 如果你的逻辑可以同时在多个rdd上工作,你可以使用 df.mapPartition() 。每记录开销高的任务在 mapPartition 而不是用 map 转变。
考虑初始化数据库的情况。如果我们使用 map() 或者 foreach() ,我们需要初始化的次数将等于rdd中的元素数。如果我们使用 mapPartitions() ,我们需要初始化的次数将等于分区数

uplii1fm

uplii1fm2#

您只需将带有检查结果的列附加到原始Dataframe中,然后使用一组规则UDF来执行实际验证,如下所示:

object Rules {
      val rule1UDF = udf(
        (col1: String, col2: String) => {
         // your validation code goes here
         true // the result of validation
      }
    }
    // ...
    val nonAggregatedChecksDf = df
       .withColumn("rule1_result", Rules.rule1UDF("col1", "col2"))
       .withColumn("rule2_result", Rules.rule2UDF("col1", "col3"))
       .select("id", "rule1_result", "rule2_result", <all the columns relevant for the aggregation checks>)

    val aggregatedChecksDf = nonAggregatedChecksDf
       .agg(<...>)
       .withColumn("rule3_result", Rules.rule3UDF("sum1", "avg2"))
       .withColumn("rule4_result", Rules.rule4UDF("count1", "count3"))
       .select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")

第二个选项是使用dataset.collect
我建议不要这样做,而是从原始Dataframe和所有检查结果列中选择一个键字段,并将它们以列格式保存为parquet。

aggregatedChecksDf
    .select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")
    .write
    .mode(saveMode)
    .parquet(path)

这会更快,因为写入是由所有执行器并行完成的,并且驱动程序不会成为瓶颈。它还很可能有助于避免oom问题,因为内存使用量分布在所有执行器上。

相关问题