sparkscala:在使用spark按不同的日期排序之后,需要在顶部获得日期为空的记录

s4n0splo  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(396)

我有以下数据:

+-----------+-----------+-----------+-----+-----------+
| Env1_date | Env2_date | Env3_date | Pid | orderDate |
+-----------+-----------+-----------+-----+-----------+
| Null      | Null      | 1/9/2020  | abc | 10/6/2020 |
| Null      | 1/9/2020  | 1/8/2020  | pqr | 10/4/2020 |
| 1/9/2020  | Null      | Null      | xyz | 10/2/2020 |
| 1/8/2020  | 1/7/2020  | Null      | uvw | 10/1/2020 |
+-----------+-----------+-----------+-----+-----------+

我试图创建3个新的列,基本上告诉如果 Pid 对env1、env2和env3有效。为此,我首先在 orderDate 按降序排列的列(已在上表中排序)。
如果为 Env1_date , Env2_date , Env3_date ,最高纪录是 Null ,它们被认为是有效的。之后 Null 记录,如果日期小于特定日期(在本例中 1/9/2020 ),它被认为是有效的。任何其他记录都被标记为无效。
如果顶级记录不是 NULL ,需要检查日期是否等于 1/9/2020 . 如果是,它们也被标记为有效
我的输出应该如下所示:

+-----------+-----------+-----------+-----+-----------+-----------+-----------+-----------+
| Env1_date | Env2_date | Env3_date | Pid | orderDate | Env1_Flag | Env2_Flag | Env3_Flag |
+-----------+-----------+-----------+-----+-----------+-----------+-----------+-----------+
| Null      | Null      | 1/9/2020  | abc | 10/6/2020 | Valid     | Valid     | Valid     |
| Null      | 1/9/2020  | 1/8/2020  | pqr | 10/4/2020 | Valid     | Valid     | Invalid   |
| 1/9/2020  | Null      | Null      | xyz | 10/2/2020 | Valid     | Invalid   | Invalid   |
| 1/8/2020  | 1/7/2020  | Null      | uvw | 10/1/2020 | Invalid   | Invalid   | Invalid   |
+-----------+-----------+-----------+-----+-----------+-----------+-----------+-----------+

我正试图通过使用 Spark 1.5 以及 scala .
我试过用 lag 功能。但不能包含所有的场景。不知道如何解决这个问题。
有人能帮帮我吗。
注意:windows函数、todf()、createdataframe()函数在 spark 我正在使用。这是一个定制的spark环境,几乎没有限制

nwsw7zdq

nwsw7zdq1#

import spark.implicits._

case class Source(
                 Env1_date: Option[String],
                 Env2_date: Option[String],
                 Env3_date: Option[String],
                 Pid: String,
                 orderDate: String
               )
case class Source1(
                   Env1_date: Option[String],
                   Env2_date: Option[String],
                   Env3_date: Option[String],
                   Pid: String,
                   orderDate: String,
                   Env1_Flag: String,
                    Env2_Flag: String,
                    Env3_Flag: String
                 )

val source = Seq(
  Source(None, None, Some("1/9/2020"), "abc", "10/6/2020"),
  Source(None, Some("1/9/2020"), Some("1/8/2020"), "pqr", "10/4/2020"),
  Source(Some("1/9/2020"), None, None, "xyz", "10/2/2020"),
  Source(Some("1/8/2020"), Some("1/7/2020"), None, "abc", "10/6/2020")
).toDF().as[Source].collect()

var env1NextRowInvalid = false
var env2NextRowInvalid = false
var env3NextRowInvalid = false
val source1 = source.map(i => {
  val env1Flag = if (env1NextRowInvalid == false && (i.Env1_date.getOrElse("") == """1/9/2020""" || i.Env1_date.getOrElse("") == "")) "valid" else "invalid"
  env1NextRowInvalid = if(env1NextRowInvalid == false) (i.Env1_date == "1/9/2020") else true
  val env2Flag = if (env2NextRowInvalid == false && (i.Env2_date.getOrElse("") == """1/9/2020""" || i.Env2_date.getOrElse("") == "")) "valid" else "invalid"
  env2NextRowInvalid = if(env2NextRowInvalid == false) (i.Env2_date.getOrElse("") == "1/9/2020") else true
  val env3Flag = if (env3NextRowInvalid == false && (i.Env3_date.getOrElse("") == """1/9/2020""" || i.Env3_date.getOrElse("") == "")) "valid" else "invalid"
  env3NextRowInvalid = if(env3NextRowInvalid == false) (i.Env3_date.getOrElse("") == "1/9/2020") else true
  Source1(i.Env1_date, i.Env2_date, i.Env3_date, i.Pid, i.orderDate, env1Flag, env2Flag, env3Flag)
})

val resDF = source1.toSeq.toDF()
resDF.show(false)
//  +---------+---------+---------+---+---------+---------+---------+---------+
//  |Env1_date|Env2_date|Env3_date|Pid|orderDate|Env1_Flag|Env2_Flag|Env3_Flag|
//  +---------+---------+---------+---+---------+---------+---------+---------+
//  |null     |null     |1/9/2020 |abc|10/6/2020|valid    |valid    |valid    |
//  |null     |1/9/2020 |1/8/2020 |pqr|10/4/2020|valid    |valid    |invalid  |
//  |1/9/2020 |null     |null     |xyz|10/2/2020|valid    |invalid  |invalid  |
//  |1/8/2020 |1/7/2020 |null     |abc|10/6/2020|invalid  |invalid  |invalid  |
//  +---------+---------+---------+---+---------+---------+---------+---------+
mxg2im7a

mxg2im7a2#

一种方法是将所有数据收集到驱动程序中,并将其作为常规数组进行处理,然后再次将其转换为df。不过要小心,数据应该适合驱动程序。
我写的代码可以处理你提供的数据。如果您稍微调整一下(尤其是数据比较部分),您应该会得到您所期望的结果。

// This is how your data is going to look like when you collect it with df.collect
  val arrayData = Array(
    Array("null", "null", "1/9/2020", "abc", "10/6/2020"),
    Array("null", "1/9/2020", "1/8/2020", "pqr", "10/4/2020"),
    Array("1/9/2020", "null", "null", "xyz", "10/2/2020"),
    Array("1/8/2020", "1/7/2020", "null", "uvw", "10/1/2020"),
  )

  // just printing
  arrayData.foreach(arr => println(arr.mkString(" \t| ")))
  println("-".repeat(30))

  // rotates the array, so column become rows and vice verse
  def shiftArray(arr: Array[Array[String]])
    = for(i <- arr(0).indices.toArray) yield arr.map(arr => arr(i))

  // the function that does the validation part
  val someDate = "1/9/2020"
  def processColumn(arr: Array[String]) = {
    val (startingNulls, rest) = arr.span(_ == "null")
    val startingNullsValidated: Array[String] = startingNulls.map(_ => "Valid")
    val restValidated: Array[String] = rest.map(date => if (date == someDate) "Valid" else "Invalid") // implement custom date comparison
    startingNullsValidated ++ restValidated
  }

  val shiftedArray: Array[Array[String]] = shiftArray(arrayData)

  // you need to validate only first 3 columns, so i used take/slice
  val validatedArray = {
    val columnsToProcess = shiftedArray.take(3)
    val otherColumns = shiftedArray.slice(3, shiftedArray.length)
    val processedColumns = for (arr <- columnsToProcess) yield processColumn(arr)
    processedColumns ++ otherColumns
  }

  // rotate array back
  val shiftBackValidatedArray = shiftArray(validatedArray)

  // just printing the final result
  shiftBackValidatedArray.foreach(arr => println(arr.mkString(" \t| ")))

这是上面打印行的输出

null    | null  | 1/9/2020  | abc   | 10/6/2020
null    | 1/9/2020  | 1/8/2020  | pqr   | 10/4/2020
1/9/2020    | null  | null  | xyz   | 10/2/2020
1/8/2020    | 1/7/2020  | null  | uvw   | 10/1/2020
------------------------------
Valid   | Valid     | Valid     | abc   | 10/6/2020
Valid   | Valid     | Invalid   | pqr   | 10/4/2020
Valid   | Invalid   | Invalid   | xyz   | 10/2/2020
Invalid     | Invalid   | Invalid   | uvw   | 10/1/2020

相关问题