如何在spark scala中创建日期范围的容器?

cvxl0en2  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(517)

嗨,怎么样?我是一个python开发人员,正在尝试学习sparkscala。我的任务是创建日期范围容器,并计算每个容器中出现的频率(直方图)。
我的输入Dataframe看起来像这样

我的箱子边缘是这样的(在python中):

bins = ["01-01-1990 - 12-31-1999","01-01-2000 - 12-31-2009"]

我要查找的输出Dataframe是(每个bin的原始Dataframe中有多少个值的计数):

有没有人可以指导我如何做这是Spark斯卡拉?我有点迷路了。谢谢您。

lrl1mhuk

lrl1mhuk1#

您是否希望得到如下结果:

+------------------------+------------------------+
|01-01-1990 -- 12-31-1999|01-01-2000 -- 12-31-2009|
+------------------------+------------------------+
|                       3|                    null|
|                    null|                       2|
+------------------------+------------------------+

它可以通过少量sparksql和pivot函数实现,如下所示
检查左连接条件

val binRangeData = Seq(("01-01-1990","12-31-1999"),
                    ("01-01-2000","12-31-2009"))
val binRangeDf = binRangeData.toDF("start_date","end_date")
// binRangeDf.show

val inputDf = Seq((0,"10-12-1992"),
                    (1,"11-11-1994"),
                    (2,"07-15-1999"),
                    (3,"01-20-2001"),
                    (4,"02-01-2005")).toDF("id","input_date")

// inputDf.show

binRangeDf.createOrReplaceTempView("bin_range")
inputDf.createOrReplaceTempView("input_table")

val countSql = """
SELECT concat(date_format(c.st_dt,'MM-dd-yyyy'),' -- ',date_format(c.end_dt,'MM-dd-yyyy')) as header, c.bin_count
FROM (
(SELECT
b.st_dt, b.end_dt, count(1) as bin_count
FROM
(select to_date(input_date,'MM-dd-yyyy') as date_input , * from input_table) a
left join
(select to_date(start_date,'MM-dd-yyyy') as st_dt, to_date(end_date,'MM-dd-yyyy') as end_dt from bin_range ) b
on
a.date_input >= b.st_dt and a.date_input < b.end_dt
group by 1,2) ) c"""

val countDf = spark.sql(countSql)

countDf.groupBy("bin_count").pivot("header").sum("bin_count").drop("bin_count").show

不过,由于您有2个bin范围,因此将生成2行。

pobjuy32

pobjuy322#

我们可以通过观察 date 列并确定每个记录落在哪个范围内。

// First we set up the problem

// Create a format that looks like yours
val dateFormat = java.time.format.DateTimeFormatter.ofPattern("MM-dd-yyyy")

// Get the current local date
val now = java.time.LocalDate.now

// Create a range of 1-10000 and map each to minusDays 
// so we can have range of dates going 10000 days back
val dates = (1 to 10000).map(now.minusDays(_).format(dateFormat))

// Create a DataFrame we can work with.
val df = dates.toDF("date")

到现在为止,一直都还不错。我们有日期条目要处理,它们就像你的格式( MM-dd-yyyy ). 接下来,我们需要一个函数,如果日期在范围内,则返回1,如果不在范围内,则返回0。我们创造了一个 UserDefinedFunction (udf)这样我们就可以将它同时应用于spark执行器中的所有行。

// We will process each range one at a time, so we'll take it as a string 
// and split it accordingly. Then we perform our tests. Using Dates is
// necessary to cater to your format.
import java.text.SimpleDateFormat

def isWithinRange(date: String, binRange: String): Int = {
  val format = new SimpleDateFormat("MM-dd-yyyy")
  val startDate = format.parse(binRange.split(" - ").head)
  val endDate = format.parse(binRange.split(" - ").last)
  val testDate = format.parse(date)

  if (!(testDate.before(startDate) || testDate.after(endDate))) 1
  else 0
}

// We create a udf which uses an anonymous function taking two args and 
// simply pass the values to our prepared function
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

def isWithinRangeUdf: UserDefinedFunction =
  udf((date: String, binRange: String) => isWithinRange(date, binRange))

现在我们有了 UDF 设置时,我们在 DataFrame 并按给定的bins进行分组,将值相加(因此,我们将函数的值计算为 Int )

// We define our bins List
val bins = List("01-01-1990 - 12-31-1999",
                "01-01-2000 - 12-31-2009",
                "01-01-2010 - 12-31-2020")

// We fold through the bins list, creating a column from each bin at a time,
// enriching the DataFrame with more columns as we go
import org.apache.spark.sql.functions.{col, lit}

val withBinsDf = bins.foldLeft(df){(changingDf, bin) =>
  changingDf.withColumn(bin, isWithinRangeUdf(col("date"), lit(bin))) 
}

withBinsDf.show(1)
//+----------+-----------------------+-----------------------+-----------------------+
//|      date|01-01-1990 - 12-31-1999|01-01-2000 - 12-31-2009|01-01-2010 - 12-31-2020|
//+----------+-----------------------+-----------------------+-----------------------+
//|09-01-2020|                      0|                      0|                      1|
//+----------+-----------------------+-----------------------+-----------------------+
//only showing top 1 row

最后我们 select 我们的垃圾箱和 groupBy 他们和 sum .

val binsDf = withBinsDf.select(bins.head, bins.tail:_*)
val sums = bins.map(b => sum(b).as(b)) // keep col name as is
val summedBinsDf = binsDf.groupBy().agg(sums.head, sums.tail:_*)

summedBinsDf.show
//+-----------------------+-----------------------+-----------------------+
//|01-01-1990 - 12-31-1999|01-01-2000 - 12-31-2009|01-01-2010 - 12-31-2020|
//+-----------------------+-----------------------+-----------------------+
//|                   2450|                   3653|                   3897|
//+-----------------------+-----------------------+-----------------------+

2450+3653+3897=10000,看来我们的工作是对的。也许我做得太多了,有一个更简单的解决办法,请告诉我,如果你知道更好的方法(特别是处理问题) MM-dd-yyyy 日期)。

相关问题