我试图创建多个csv文件和一个大的csv文件的基础上阅读另一个csv文件,其中有关键字/部分值打破csv文件成许多
- key.csv
| 型号名称|
| - ------|
| 车|
| 脚踏车|
| 巴士|
| 自动| - Input.csv
| 识别号|车型|汽车名称|汽车当前|自行车类型|自行车_名称|总线类型|总线名称|自动类型|
| - ------|- ------|- ------|- ------|- ------|- ------|- ------|- ------|- ------|
| 1个|本田|城市|是|雅马哈|自由区|学校|福特|手册|
| 第二章|塔塔|打孔器|是|英雄|极限|公众|阿肖克|气体|
因此,我想读取密钥.csv文件,并根据密钥.csv文件值创建/中断输入.csv文件,如...
car.csv
| 识别号|车型|汽车名称|
| - ------|- ------|- ------|
| 1个|本田|城市|
| 第二章|塔塔|打孔器|
bike.csv
| 识别号|自行车类型|自行车_名称|
| - ------|- ------|- ------|
| 1个|雅马哈|自由区|
| 第二章|英雄|极限|
bus.csv和auto.csv也是如此
为了得到这个结果,我尝试使用以下:
import spark.implicits._;
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val input_file = "/input.csv"
val mod_in_path = "/key.csv"
val df_input_mod=spark.read.format("csv").option("header","true").option("delimiter","|").load(mod_in_path)
val model_names = df_input_mod.select("model_name")
val df_input=spark.read.format("csv").option("header","true").option("delimiter","|").load(input_file)
val all_cols = df_input.columns
val party_col = all_cols.filter(_.contains("id"))
for( mname <- model_names){
println(mname)
var mname_col = all_cols.filter(_.contains(mname.mkString(""))).filter(! _.contains("PRESENT")).mkString(",")
println(mname_col)
var final_col = party_col.mkString("").concat(",").concat(mname_col)
println(final_col)
var colName = Seq(final_col)
var columnsAll=colName.map(m=>col(m))
#var final_val = df_input.select(final_col.split(",").map(_.toString).map(col): _*)
var final_val = df_input.select(columnsAll:_*)
final_val.repartition(1).write.mode("overwrite").option("delimiter", "|").option("header",true).csv("/output/"+mname)
println("output file created for "+mname )
}
在循环内使用Map时出现以下错误。
ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.SparkException: Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations; for example, dataset1.map(x => dataset2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the dataset1.map transformation. For more information, see SPARK-28702.
at org.apache.spark.sql.errors.QueryExecutionErrors$.transformationsAndActionsNotInvokedByDriverError(QueryExecutionErrors.scala:1967)
at org.apache.spark.sql.Dataset.sparkSession$lzycompute(Dataset.scala:198)
at org.apache.spark.sql.Dataset.sparkSession(Dataset.scala:196)
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res0$1(<console>:40)
at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res0$1$adapted(<console>:32)
at scala.collection.Iterator.foreach(Iterator.scala:943)
将大csv拆分为多个
1条答案
按热度按时间a6b3iqyw1#
如果keyDf不是一个大的 Dataframe ,你可以只做一个收集和迭代键: