Scala -根据另一个csv文件中的列键将csv文件拆分为多个csv文件

e3bfsja2  于 2022-12-15  发布在  Scala
关注(0)|答案(1)|浏览(284)

我试图创建多个csv文件和一个大的csv文件的基础上阅读另一个csv文件,其中有关键字/部分值打破csv文件成许多

  1. key.csv
    | 型号名称|
    | - ------|
    | 车|
    | 脚踏车|
    | 巴士|
    | 自动|
  2. Input.csv
    | 识别号|车型|汽车名称|汽车当前|自行车类型|自行车_名称|总线类型|总线名称|自动类型|
    | - ------|- ------|- ------|- ------|- ------|- ------|- ------|- ------|- ------|
    | 1个|本田|城市|是|雅马哈|自由区|学校|福特|手册|
    | 第二章|塔塔|打孔器|是|英雄|极限|公众|阿肖克|气体|
    因此,我想读取密钥.csv文件,并根据密钥.csv文件值创建/中断输入.csv文件,如...
    car.csv
    | 识别号|车型|汽车名称|
    | - ------|- ------|- ------|
    | 1个|本田|城市|
    | 第二章|塔塔|打孔器|
    bike.csv
    | 识别号|自行车类型|自行车_名称|
    | - ------|- ------|- ------|
    | 1个|雅马哈|自由区|
    | 第二章|英雄|极限|
    bus.csv和auto.csv也是如此
    为了得到这个结果,我尝试使用以下:
import spark.implicits._; 
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val input_file = "/input.csv"
val mod_in_path = "/key.csv"

val df_input_mod=spark.read.format("csv").option("header","true").option("delimiter","|").load(mod_in_path)

val model_names = df_input_mod.select("model_name")

val df_input=spark.read.format("csv").option("header","true").option("delimiter","|").load(input_file)
val all_cols = df_input.columns

val party_col = all_cols.filter(_.contains("id"))

for( mname <- model_names){
println(mname)

var mname_col = all_cols.filter(_.contains(mname.mkString(""))).filter(! _.contains("PRESENT")).mkString(",")

println(mname_col)

var final_col = party_col.mkString("").concat(",").concat(mname_col)

println(final_col)

var colName = Seq(final_col)

var columnsAll=colName.map(m=>col(m))

#var final_val = df_input.select(final_col.split(",").map(_.toString).map(col): _*)

var final_val = df_input.select(columnsAll:_*)
     final_val.repartition(1).write.mode("overwrite").option("delimiter", "|").option("header",true).csv("/output/"+mname)

println("output file created for "+mname )
}

在循环内使用Map时出现以下错误。

ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.SparkException:  Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations; for example, dataset1.map(x => dataset2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the dataset1.map transformation. For more information, see SPARK-28702.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.transformationsAndActionsNotInvokedByDriverError(QueryExecutionErrors.scala:1967)
        at org.apache.spark.sql.Dataset.sparkSession$lzycompute(Dataset.scala:198)
        at org.apache.spark.sql.Dataset.sparkSession(Dataset.scala:196)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
        at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res0$1(<console>:40)
        at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res0$1$adapted(<console>:32)
        at scala.collection.Iterator.foreach(Iterator.scala:943)

将大csv拆分为多个

a6b3iqyw

a6b3iqyw1#

如果keyDf不是一个大的 Dataframe ,你可以只做一个收集和迭代键:

import spark.implicits._

val keyDf = spark.sparkContext.parallelize(Seq("Car", "bike", "Bus", "Auto")).toDF("model_name")

val data = Seq(
  (1, "Honda", "city", "YES", "yamaha", "fz", "school", "ford", "Manual"),
  (2, "TATA", "punch", "YES", "hero", "xtreme", "public", "Ashok", "Gas")
)
val InputDf = spark.sparkContext.parallelize(data).toDF("ID", "car_type", "car_name", "car_PRESENT", "bike_type", "bike_name", "bus_type", "bus_name", "auto_type")

keyDf.distinct().collect().map(row => row.getString(0).toLowerCase()).foreach(r => {
  if (List(s"${r}_type", s"${r}_name").forall(InputDf.columns.map(_.toLowerCase()).contains)) {
    val df = InputDf.select("ID", s"${r}_type", s"${r}_name")
    df.show(false)
    df.write.csv(s"path/.../$r.csv")
  }
})

相关问题