在sparkscala中使用struct创建模式

u5rb5r59  于 2021-05-26  发布在  Spark
关注(0)|答案(2)|浏览(821)

我是scala的新手,尝试从元素数组中创建自定义模式,以读取基于新自定义模式的文件。
我从json文件中读取数组,并使用explode方法为列数组中的每个元素创建了一个Dataframe。

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

获得的输出为:

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
 |-- column_id: string (nullable = true)
 |-- data_sensitivty: string (nullable = true)
 |-- datatype: string (nullable = true)
 |-- length: string (nullable = true)
 |-- name: string (nullable = true)

val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name = column_values.select("name", "datatype", "length")

column_name.show(4)

 +------------------+--------+------+
 |              name|datatype|length|
 +------------------+--------+------+
 |     object_number| varchar|   100|
 |     function_type| varchar|   100|
 |             hof_1| decimal|  17,3|
 |             hof_2| decimal|  17,2|
 |            region| varchar|   100|
 |           country| varchar|  null|
 +------------------+--------+------+

现在,对于上面列出的所有值,我尝试使用下面的代码动态地创建val模式

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
  (schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
  )

def getFieldType(typeName: String): DataType = typeName match {
    case "varchar" => StringType
    // TODO include other types here
    case _ => StringType
  }

上面的问题是,我能够在struct中获取数据类型,但是我也希望仅获取(scale和preicion)数据类型decimal,限制条件为max allowable,条件是如果decimal的长度为null或不存在,我们需要将默认值取为(10,0)如果当前值大于38,我们需要将默认值取为(38,0)

ymdaylpp

ymdaylpp1#

可以按此处指定的方式创建精度为decimal的数据类型:

DataTypes.createDecimalType()

在函数“getfieldtype”中,可以添加十进制类型的大小写smth。比如:

case "decimal" => DataTypes.createDecimalType(10,0)
dphi5xsq

dphi5xsq2#

这种方法很有效。
我向您展示了一个完整的示例,它完成了您的代码和预期的结果。
你可以引入更多的变体 val data .

/**
    * to obtain a tuple with precision and scale
    * @param precision Option[String]
    * @return (Int, Int)
    */
  def getDecimalScale(precision: Option[String]): (Int, Int) = {
    precision match {
      case Some(pr) => {
        pr.split(",").toList match {
          case List(h, _) if h.toInt >= 38 => (38,0)
          case List(h, t) => (h.toInt,t.head.toString.toInt)
          case _ => (10, 0)
        }
      }
      case None => (10, 0)
    }
  }
val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
      ("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
      ("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
      ("region", "varchar", "100"), ("country", "varchar", null))

    import spark.implicits._

    val column_name = sc.parallelize(data).toDF("name","datatype","length")

    column_name.show()
/*
+-------------+--------+------+
|         name|datatype|length|
+-------------+--------+------+
|object_number| varchar|   100|
|function_type| varchar|   100|
|        hof_1| decimal|  17,3|
|        hof_2| decimal|  17,2|
|        hof_3| decimal|  null|
|        hof_4| decimal|  39,2|
|       region| varchar|   100|
|      country| varchar|  null|
+-------------+--------+------+

* /

    val schemaColumns = column_name.collect()
    schemaColumns.foreach(println)
/*
[object_number,varchar,100]
[function_type,varchar,100]
[hof_1,decimal,17,3]
[hof_2,decimal,17,2]
[hof_3,decimal,null]
[hof_4,decimal,39,2]
[region,varchar,100]
[country,varchar,null]

* /

    val schema = schemaColumns.foldLeft(new StructType())(
      (schema, columnRow) => {
        columnRow.getAs[String]("datatype") match {
          case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
          case "decimal" => {
            val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
            schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
          }
          case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
        }
      }
    )

    schema.printTreeString()
/*
root
 |-- object_number: string (nullable = true)
 |-- function_type: string (nullable = true)
 |-- hof_1: decimal(17,3) (nullable = true)
 |-- hof_2: decimal(17,2) (nullable = true)
 |-- hof_3: decimal(10,0) (nullable = true)
 |-- hof_4: decimal(38,0) (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)

* /

相关问题