创建一个sparkDataframe,包括两个日期之间的日期键

czq61nw1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(487)

我正在创建一个python脚本,它应该定期自动执行。由于它使用日历Dataframe,因此此日历必须自动更新自身。
因此,我想创建一个包含 YearMonth -输入一个特定的开始日期和今天(不包括今天的月份)。选择开始日期2015-01-01,当前日期为2020-09-08,应如下所示:

--------------
YearMonth
--------------
201501
201502
201503
201504
201505
(...)
202006
202007
202008
---------------

如何实现这一点,以便每次执行脚本时,它都以最后一个完成的月份作为结束日期输出Dataframe?
编辑:当我设法从yyyy-mm-dd日期字段中提取yyyymm键时,还可以得到一个Dataframe,其中所有日期都在开始日期和今天之间。在这种情况下,可以在以后执行提取和删除重复项。

jc3wubiy

jc3wubiy1#

scala代码
你可以做以下事情

import java.time.LocalDate
import java.time.{LocalDate, YearMonth}
import java.time.temporal.ChronoUnit
val start =  LocalDate.of(2015,1,1) // you can pass this values from input args as well
val now = LocalDate.now() // 2020-09-08
val numMonths = ChronoUnit.MONTHS.between(YearMonth.from(start),YearMonth.from(now)).toInt
val dateStream: Stream[LocalDate] = start #:: dateStream.map(_.plusMonths(1))
val dates = dateStream.take(numMonths + 1).toSeq.map(t => (t.getYear(), t.getMonth().getValue())).toVector.toSeq

输出:

scala>import java.time.LocalDate
import java.time.LocalDate

scala>import java.time.{LocalDate, YearMonth}
import java.time.{LocalDate, YearMonth}

scala>import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit

scala>val start =  LocalDate.of(2015,1,1) // you can pass this values from input args as well
start: java.time.LocalDate = 2015-01-01

scala>val now = LocalDate.now() // 2020-09-08
now: java.time.LocalDate = 2020-09-08

scala>val numMonths = ChronoUnit.MONTHS.between(YearMonth.from(start),YearMonth.from(now)).toInt
numMonths: Int = 68

scala>val dateStream: Stream[LocalDate] = start #:: dateStream.map(_.plusMonths(1))
dateStream: Stream[java.time.LocalDate] = Stream(2015-01-01, ?)

scala>val dates = dateStream.take(numMonths + 1).toSeq.map(t => (t.getYear(), t.getMonth().getValue())).toVector.toSeq
dates: scala.collection.immutable.Seq[(Int, Int)] = Vector((2015,1), (2015,2), (2015,3), (2015,4), (2015,5), (2015,6), (2015,7), (2015,8), (2015,9), (2015,10), (2015,11), (2015,12), (2016,1), (2016,2), (2016,3), (2016,4), (2016,5), (2016,6), (2016,7), (2016,8), (2016,9), (2016,10), (2016,11), (2016,12), (2017,1), (2017,2), (2017,3), (2017,4), (2017,5), (2017,6), (2017,7), (2017,8), (2017,9), (2017,10), (2017,11), (2017,12), (2018,1), (2018,2), (2018,3), (2018,4), (2018,5), (2018,6), (2018,7), (2018,8), (2018,9), (2018,10), (2018,11), (2018,12), (2019,1), (2019,2), (2019,3), (2019,4), (2019,5), (2019,6), (2019,7), (2019,8), (2019,9), (2019,10), (2019,11), (2019,12), (2020,1), (2020,2), (2020,3), (2020,4), (2020,5), (2020,6), (2020,7), (2020,8), (2020,9))

您可以并行化日期并创建df,如下所示

import org.apache.spark.sql.functions.{concat, lit , col}
 spark.parallelize(dates).toDF.withColumn("YearMonth",concat(col("_c0"), lit(""), col("_c1")).select("YearMonth")
    (OR)
spark.createDataFrame(spark.parallelize(dates)).withColumn("YearMonth",concat(col("_c0"), lit(""), col("_c1")).select("YearMonth")

参考文献:
https://docs.scala-lang.org/tutorials/faq/finding-symbols.html
scala:在apachesparkDataframe中获取过去24个月的每一个组合concatenate列

相关问题