使用sparkscala获取每月数据

3z6pesqy  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(382)

我试着从一个文件中提取一个月的数据,然后处理它。基本上,我需要每个月提取数据并进行一些转换。因为我的作业每天都在运行,所以我想利用它填充当月的数据,直到运行日期。
我有两种方法:
方法1:
仅填写上月的数据。例如,如果我的当前日期或运行日期是 May ,我将填充 April . 这可以通过将月份从 current_date() 和减法 1 从它那里。类似于以下内容: df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1)) 这只是一个想法。这段代码不会达到我想做的,因为我是减去月部分单独和不考虑 Year 部分。
但在这种情况下,我的工作将每天运行,以填充整个月的相同数据。这样做没有意义。
方法2:
如果我现在的约会是 2020-05-27 ,我想从 2020-05-01 to 2020-05-26 . 如果我现在的日期是 2020-06-01 ,它应该填充来自 2020-05-01 to 2020-05-31 .
我想实现方法2。我唯一能想到的办法就是写几封信 Case 语句来检查日期并相应地填充它。
有人能分享一下吗。有什么简单的方法吗。
我正在使用 Spark 1.5

kjthegm6

kjthegm61#

检查这是否有用-

1. 加载测试数据

val data =
      """
        |2018-04-07 07:07:17
        |2018-04-07 07:32:27
        |2018-04-07 08:36:44
        |2018-04-07 08:38:00
        |2018-04-07 08:39:29
        |2018-04-08 01:43:08
        |2018-04-08 01:43:55
        |2018-04-09 07:52:31
        |2018-04-09 07:52:42
        |2019-01-24 11:52:31
        |2019-01-24 12:52:42
        |2019-01-25 12:52:42
      """.stripMargin
    val df = spark.read
      .schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
      .csv(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

输出-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+

root
 |-- startDate: timestamp (nullable = true)

2. 基于当前日期创建筛选列

val filterCOl = (currentDate: String) =>  when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
      ,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
     date_format(col("startDate"), "yyyy-MM") ===
       date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
    ).otherwise(to_date(col("startDate"))
     .between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))

3. 检查当前数据是否在月份之间

var currentDateStr = "2018-04-08"
    df.filter(filterCOl(currentDateStr)).show(false)

输出-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
+-------------------+

4. 检查当前数据何时为每月第一天

currentDateStr = "2018-05-01"
    df.filter(filterCOl(currentDateStr)).show(false)

输出-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
+-------------------+

相关问题