如何使用pyspark.sql.functions.current_date计算从今天开始的第一个即将到来的星期日?

ffx8fchx  于 2023-05-06  发布在  Spark
关注(0)|答案(3)|浏览(118)

给定今天的日期,我想计算前N个即将到来的星期天,并把它们放在一个DataFrame中。假设今天的日期是2023-05-02,N = 3,输出应该如下所示:

Date
2023-05-07
2013-05-14
2013-05-21

我尝试了以下方法:

from pyspark.sql import functions as f
from pyspark.sql.types import DateType

current_date = f.current_date()
day_offset = datetime.timedelta(6 - current_date.weekday())
first_sunday = current_date + day_offset

dates = []
for weeks in range(3):
    days = weeks * 7
    dates.append(first_sunday + datetime.timedelta(days=days))

out = spark.createDataFrame(dates, DateType()).toDF("Date")

这给了我以下错误:

TypeError: 'Column' object is not callable

我知道我可以使用datetime.date.today()获得今天的日期,但我特别想使用f.current_date(),因为我必须稍后使用@patch来使我的测试在固定的日期运行,而我不能使用datetime.date.today()进行日期。
先谢谢你了!

7gs2gvoe

7gs2gvoe1#

这就是我的尝试,也许你应该简化它:

from pyspark.sql.functions import current_date, date_add, expr

# I don't include the pyspark session creation (spark)

days_to_first_sunday = expr(
  "7 - IF(DAYOFWEEK(current_date()) = 1, 0, DAYOFWEEK(current_date()) - 1)"
)
days_to_first_sunday_df = spark.range(1).select(days_to_first_sunday.alias(
  "days_to_first_sunday")
)

first_sunday_df = days_to_first_sunday_df.select(
  date_add(current_date(), "days_to_first_sunday").alias("Date")
)

N = 3
upcoming_sundays = [date_add(first_sunday_df.columns[0], 7 * i) for i in range(N)]

sundays_df = first_sunday_df.select(
    *upcoming_sundays
).toDF(
    *[f"Date_{i}" for i in range(N)]
).selectExpr(
    f"stack({str(N)}, 'Date_0', Date_0, 'Date_1', Date_1, 'Date_2', Date_2) as (FieldName, Date)"
).drop("FieldName")

sundays_df.show()

作为输出:

+----------+
|      Date|
+----------+
|2023-05-07|
|2023-05-14|
|2023-05-21|
+----------+
mhd8tkvw

mhd8tkvw2#

由于f.current_date创建了一个列对象,所以我认为从一个已经存在的pyspark Dataframe 开始是有意义的-我们可以将[1,2,3]放入一个名为upcoming_sunday_number的列中,然后按以下方式使用f.current_date

from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType

# note: using f.dayofweek --> sunday is 1

N = 3

spark.createDataFrame(list(range(1,N+1)), "integer").toDF("upcoming_sunday_number").withColumn(
    "current_date", f.current_date()
).withColumn(
    "number_of_days", (1-f.dayofweek(f.current_date()) + 7*f.col('upcoming_sunday_number')).cast(IntegerType())
).withColumn(
    "upcoming_sunday_date", f.date_add(f.current_date(), f.col("number_of_days"))
).show()

+----------------------+------------+--------------+--------------------+
|upcoming_sunday_number|current_date|number_of_days|upcoming_sunday_date|
+----------------------+------------+--------------+--------------------+
|                     1|  2023-05-04|             3|          2023-05-07|
|                     2|  2023-05-04|            10|          2023-05-14|
|                     3|  2023-05-04|            17|          2023-05-21|
+----------------------+------------+--------------+--------------------+

或者更简洁地说:

spark.createDataFrame(list(range(1,N+1)), "integer").toDF("upcoming_sunday_number").withColumn(
    "upcoming_sunday_date", f.date_add(f.current_date(), (1-f.dayofweek(f.current_date()) + 7*f.col('upcoming_sunday_number')).cast(IntegerType()))
)

+----------------------+--------------------+
|upcoming_sunday_number|upcoming_sunday_date|
+----------------------+--------------------+
|                     1|          2023-05-07|
|                     2|          2023-05-14|
|                     3|          2023-05-21|
+----------------------+--------------------+

请注意,在我最初的答案中,我有点粗心,将upcoming_sunday_number设置为string而不是integer,但这仍然应该编译为正确的最终结果:

root
 |-- upcoming_sunday_number: string (nullable = true)
 |-- current_date: date (nullable = false)
 |-- number_of_days: integer (nullable = true)
 |-- upcoming_sunday_date: date (nullable = true)
hk8txs48

hk8txs483#

from pyspark.sql.functions import date_add, date_sub, col, dayofweek, lit
from pyspark.sql.types import DateType

today = date_sub(current_date(), 0)
days_until_sunday = 6 - dayofweek(today)
next_sunday = date_add(today, days_until_sunday)

N = 3
dates = []
for i in range(N):
    date = date_add(next_sunday, i * 7)
    dates.append(date)

df = spark.createDataFrame([(date,) for date in dates], ["date"])
df.show()

相关问题