Spark dataframe将多行转换为列

9fkzdhlc  于 2023-03-23  发布在  Apache
关注(0)|答案(4)|浏览(157)

我是spark新手,想转换下面的source dataframe(从JSON文件加载):

+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1|   m3|
| e|    1|   m4|
| e|    1|   m5|
+--+-----+-----+

进入下方结果数据框

+--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+

转换规则如下:

1.结果 Dataframe 由A + (n major columns)组成,其中major列名由以下公式指定:

sorted(src_df.map(lambda x: x[2]).distinct().collect())

1.结果 Dataframe 包含m行,其中A列的值由以下公式提供:

sorted(src_df.map(lambda x: x[0]).distinct().collect())

1.结果 Dataframe 中的每个主列的值是来自对应A和主列上的源 Dataframe 的值(例如,源 Dataframe 中的行1中的计数被Map到box,其中Aa和列m1)。
1.源 Dataframe 中Amajor的组合没有重复(请将其视为SQL中两列上的主键)

umuewwlo

umuewwlo1#

使用zero323的 Dataframe

df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")], 
("a", "cnt", "major"))

你也可以用

reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0)
hl0ma9xz

hl0ma9xz2#

让我们从示例数据开始:

df = sqlContext.createDataFrame([
    ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
    ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
    ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
    ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
    ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
    ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
    ("e", 1, "m4"), ("e", 1, "m5")], 
    ("a", "cnt", "major"))

请注意,我已经将count更改为cnt。Count在大多数SQL方言中是一个保留关键字,它不是一个很好的列名选择。
至少有两种方法可以重塑此数据:

  • 通过DataFrame聚合
from pyspark.sql.functions import col, when, max

majors = sorted(df.select("major")
    .distinct()
    .map(lambda row: row[0])
    .collect())

cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) 
    for m in  majors]
maxs = [max(col(m)).alias(m) for m in majors]

reshaped1 = (df
    .select(col("a"), *cols)
    .groupBy("a")
    .agg(*maxs)
    .na.fill(0))

reshaped1.show()

## +---+---+---+---+---+---+
## |  a| m1| m2| m3| m4| m5|
## +---+---+---+---+---+---+
## |  a|  1|  1|  2|  3|  0|
## |  b|  4|  1|  2|  0|  0|
## |  c|  3|  0|  4|  5|  0|
## |  d|  6|  1|  2|  3|  4|
## |  e|  4|  5|  1|  1|  1|
## +---+---+---+---+---+---+
  • RDD上的groupBy
from pyspark.sql import Row

grouped = (df
    .map(lambda row: (row.a, (row.major, row.cnt)))
    .groupByKey())

def make_row(kv):
    k, vs = kv
    tmp = dict(list(vs) + [("a", k)])
    return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})

reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))

reshaped2.show()

## +---+---+---+---+---+---+
## |  a| m1| m2| m3| m4| m5|
## +---+---+---+---+---+---+
## |  a|  1|  1|  2|  3|  0|
## |  e|  4|  5|  1|  1|  1|
## |  c|  3|  0|  4|  5|  0|
## |  b|  4|  1|  2|  0|  0|
## |  d|  6|  1|  2|  3|  4|
## +---+---+---+---+---+---+
q7solyqu

q7solyqu3#

这是你的原始数据框:

df.show()
+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1|   m3|
| e|    1|   m4|
| e|    1|   m5|
+--+-----+-----+

使用pivot来重塑“major”上的数据,按“A”分组,并将“count”的总和聚合为值:
一个二个一个一个

vd8tlhqk

vd8tlhqk4#

@TrentWoodbury的答案是一个很好的答案,我投了赞成票。但是,如果聚合值不是一个数字,它就不起作用,因为.max('cnt')无法执行所需的聚合。此外,max可能会更慢,或者只是不需要,如果你知道你不会有重复的值。

以下适用于所有数据类型:

from pyspark.sql.functions import first

reshaped_df = df.groupby('a') \
  .pivot('major') \ 
  .agg(first('cnt'))

相关问题