如何在pyspark< 2.4中生成整数序列?

3zwjbxry  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(391)

这个问题在这里已经有了答案

如何将具有范围值的列添加到Dataframe(1个答案)
上个月关门了。
pyspark2.4引入了生成整数序列的序列函数。不幸的是,我被PySpark2.3.x困住了。如何生成整数序列?
编辑:
下面是一个玩具的例子,我正在努力实现。
输入(行=年度范围内的科目)

+---------+----------+----------+
| account | min_year | max_year |
+---------+----------+----------+
|       A |     2002 |     2004 |
|       B |     2008 |     2011 |
|       C |     2009 |     2015 |
+---------+----------+----------+

期望输出(范围内每个账户每年一行)

+---------+----------+
| account |     year |
+---------+----------+
|       A |     2002 |
|       A |     2003 |
|       A |     2004 |
|       B |     2008 |
|       B |     2009 |
|       B |     2010 |
|       B |     2011 |
|       C |     2009 |
|       C |     2010 |
|       C |     2011 |
|       C |     2012 |
|       C |     2013 |
|       C |     2014 |
|       C |     2015 |
+---------+----------+
68bkxrlz

68bkxrlz1#

我已经为我的具体案例找到了解决办法。虽然不太漂亮,但很管用。
完整代码:

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd

spark = SparkSession.builder.getOrCreate()

# Create a toy dataset

toy_data = {
    'account':  {
         0: "A",
         1: "B",
         2: "C",
    },
    'min_year': {
         0: 2002,
         1: 2008,
         2: 2009
    },
    'max_year': {
         0: 2004,
         1: 2011,
         2: 2015
    },
}
pdf = pd.DataFrame(toy_data, columns=toy_data.keys())
df = spark.createDataFrame(pdf)

# Specify year range

year_first = 2002
year_last = 2015
year_range = range(year_first, year_last + 1)
n_years = len(year_range)

# Create a column for each year in year_range

# Then, use stack() to convert the columns into rows

stack_expr = ""

for year in year_range:
    x = "year_{}".format(year)
    df = df.withColumn(x, (year >= F.col("min_year")) & (year <= F.col("max_year")))
    stack_expr = stack_expr + ", '{}', {}".format(x, x)

# Finalize the stack expression

stack_expr = "stack({}{}) as (year, in_range)".format(n_years, stack_expr)

# Apply the stack expression

df = df.select("account", F.expr(stack_expr))

# Filter by years in range

df = df.filter(F.col("in_range")).select("account", "year")

# Extract year as integer

df = df.withColumn("year", F.col("year").substr(6,4).cast('int'))

结果:


# df.show()

    +-------+----+
    |account|year|
    +-------+----+
    |      A|2002|
    |      A|2003|
    |      A|2004|
    |      B|2008|
    |      B|2009|
    |      B|2010|
    |      B|2011|
    |      C|2009|
    |      C|2010|
    |      C|2011|
    |      C|2012|
    |      C|2013|
    |      C|2014|
    |      C|2015|
    +-------+----+

相关问题