从spark dataframe或rdd生成键值对,键值中存在列名

ee7vknir  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(657)

我有一个sparkDataframe,需要如下所示的键值对。我特别需要键中的列名。我想做这个使用单一的Map传递。
原始数据集:

需要键值对:(attribute\u name,attribute\u value,class),1
Map器单次通过后的预期结果:
预期数据集

ukqbszuj

ukqbszuj1#

这将有助于:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.{explode, udf, typedLit}
import org.apache.spark.sql.SparkSession

object test extends App {

  val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")

  val sc: SparkContext = new SparkContext(conf)

    val spark = SparkSession
      .builder()
      .appName("test")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

  val df = spark.read.format("csv").option("header", true).load("file:///Users/test/Desktop/file2.csv")

  val header: Seq[String] = df.columns.toSeq.map(x => x.trim)

  val df1 = df.withColumn("header", typedLit(header))

  val transform = udf((col0: String, col1: String, col2: String, col3: String, header: Seq[String]) => {
    Array(
      ((header(0), col0.trim, col3.trim),1),
      ((header(1), col1.trim, col3.trim),1),
      ((header(2), col2.trim, col3.trim),1)
    )
  })

  val df2 = df1.withColumn("transformed",transform($"A1", $" A2", $" A3", $" Class", $"header"))
    .withColumn("exploded", explode($"transformed"))
    .select($"exploded")

  df2.take(1).foreach(println)
}

输出:https://imgur.com/a/je1m3dx

相关问题