确定pyspark df arraytype列上的运行的干净方法

ymzxtsji  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(444)

给定PyparkDataframe的形式:

+----+--------+
|time|messages|
+----+--------+
| t01|    [m1]|
| t03|[m1, m2]|
| t04|    [m2]|
| t06|    [m3]|
| t07|[m3, m1]|
| t08|    [m1]|
| t11|    [m2]|
| t13|[m2, m4]|
| t15|    [m2]|
| t20|    [m4]|
| t21|      []|
| t22|[m1, m4]|
+----+--------+

我想重构它以压缩包含相同消息的运行(输出的顺序无关紧要,但为了清晰起见,对其进行了排序):

+----------+--------+-------+
|start_time|end_time|message|
+----------+--------+-------+
|       t01|     t03|     m1|
|       t07|     t08|     m1|
|       t22|     t22|     m1|
|       t03|     t04|     m2|
|       t11|     t15|     m2|
|       t06|     t07|     m3|
|       t13|     t13|     m4|
|       t20|     t20|     m4|
|       t22|     t22|     m4|
+----------+--------+-------+

(即处理 message 列作为序列,并标识每个消息的“运行”的开始和结束),
有没有一种干净的方法可以让spark实现这种转变?目前,我把它当作一个6gbtsv来处理。
我愿意接受 toPandas -如果Pandas有一个干净的方法来做这个聚合的话,就在驱动程序上添加这个和累积。
(请看下面我的答案。)ïve基线实施)。

mv1qrgav

mv1qrgav1#

找到了一种合理的方法,如果在应用窗口操作时可以进行分区,那么可以很好地进行扩展(在任何实际的数据集上都可以进行分区,而在我从中导出此问题的数据集上也可以进行分区)。
为了便于解释,将其分解成块(导入仅在第一个片段中)。
设置:


# Need these for the setup

import pandas as pd
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

# We'll need these later

from pyspark.sql.functions import array_except, coalesce, col, explode, from_json, lag, lit, rank
from pyspark.sql.window import Window

rows = [
    ['t01',['m1']],
    ['t03',['m1','m2']],
    ['t04',['m2']],
    ['t06',['m3']],
    ['t07',['m3','m1']],
    ['t08',['m1']],
    ['t11',['m2']],
    ['t13',['m2','m4']],
    ['t15',['m2']],
    ['t20',['m4']],
    ['t21',[]],
    ['t22',['m1','m4']],
]

pdf = pd.DataFrame(rows,columns=['time', 'messages'])
schema = StructType([
    StructField("time", StringType(), True),
    StructField("messages", ArrayType(StringType()), True)
])
df = spark.createDataFrame(pdf,schema=schema)

按时间、延迟排序并生成消息数组的差异,以标识运行的开始和结束:

w = Window().partitionBy().orderBy('time')
df2 = df.withColumn('messages_lag_1', lag('messages', 1).over(w))\
        .withColumn('end_time', lag('time', 1).over(w))\
        .withColumnRenamed('time', 'start_time')\
        .withColumn('messages_lag_1',          # Replace nulls with []
            coalesce(                          # cargoculted from
                col('messages_lag_1'),         # https://stackoverflow.com/a/57198009
                from_json(lit('[]'), ArrayType(StringType()))
            )
        )\
        .withColumn('message_run_starts', array_except('messages', 'messages_lag_1'))\
        .withColumn('message_run_ends', array_except('messages_lag_1', 'messages'))\
        .drop(*['messages', 'messages_lag_1']) # ^ only on Spark > 2.4

+----------+--------+------------------+----------------+
|start_time|end_time|message_run_starts|message_run_ends|
+----------+--------+------------------+----------------+
|       t01|    null|              [m1]|              []|
|       t03|     t01|              [m2]|              []|
|       t04|     t03|                []|            [m1]|
|       t06|     t04|              [m3]|            [m2]|
|       t07|     t06|              [m1]|              []|
|       t08|     t07|                []|            [m3]|
|       t11|     t08|              [m2]|            [m1]|
|       t13|     t11|              [m4]|              []|
|       t15|     t13|                []|            [m4]|
|       t20|     t15|              [m4]|            [m2]|
|       t21|     t20|                []|            [m4]|
|       t22|     t21|          [m1, m4]|              []|
+----------+--------+------------------+----------------+

按时间和消息分组,并对开始表和结束表应用秩。连接,如果为空,则复制 start_time 进入 end_time :

w_start = Window().partitionBy('message_run_starts').orderBy(col('start_time'))
df3 = df2.withColumn('message_run_starts', explode('message_run_starts')).drop('message_run_ends', 'end_time')
df3 = df3.withColumn('start_row_id',rank().over(w_start))

w_end = Window().partitionBy('message_run_ends').orderBy(col('end_time'))
df4 = df2.withColumn('message_run_ends', explode('message_run_ends')).drop('message_run_starts', 'start_time')
df4 = df4.withColumn('end_row_id',rank().over(w_end))

df_combined = df3\
    .join(df4, (df3.message_run_starts == df4.message_run_ends) & (df3.start_row_id == df4.end_row_id), how='full')\
        .drop(*['message_run_ends','start_row_id','end_row_id'])\
        .withColumn('end_time',coalesce(col('end_time'),col('start_time'))) 

df_combined.show()

+----------+------------------+--------+
|start_time|message_run_starts|end_time|
+----------+------------------+--------+
|       t01|                m1|     t03|
|       t07|                m1|     t08|
|       t22|                m1|     t22|
|       t03|                m2|     t04|
|       t11|                m2|     t15|
|       t06|                m3|     t07|
|       t13|                m4|     t13|
|       t20|                m4|     t20|
|       t22|                m4|     t22|
+----------+------------------+--------+
yizd12fk

yizd12fk2#

您可以使用正向填充尝试以下方法(不需要spark 2.4+):

步骤1:执行以下操作:

对于按时间排序的每一行,查找上一条消息和下一条消息
将消息分解为单个消息
对于每条消息,如果prev\u messages为null或消息不在prev\u messages中,则设置start=time,请参见下面的sql语法:

IF(prev_messages is NULL or !array_contains(prev_messages, message),time,NULL)

可以简化为:

IF(array_contains(prev_messages, message),NULL,time)

如果next\u messages为null或message不在next\u messages中,则设置end=time
代码如下:

from pyspark.sql import Window, functions as F

# rows is defined in your own post

df = spark.createDataFrame(rows, ['time', 'messages'])

w1 = Window.partitionBy().orderBy('time')

df1 = df.withColumn('prev_messages', F.lag('messages').over(w1)) \
    .withColumn('next_messages', F.lead('messages').over(w1)) \
    .withColumn('message', F.explode('messages')) \
    .withColumn('start', F.expr("IF(array_contains(prev_messages, message),NULL,time)")) \
    .withColumn('end', F.expr("IF(array_contains(next_messages, message),NULL,time)"))

df1.show()

# +----+--------+-------------+-------------+-------+-----+----+

# |time|messages|prev_messages|next_messages|message|start| end|

# +----+--------+-------------+-------------+-------+-----+----+

# | t01|    [m1]|         null|     [m1, m2]|     m1|  t01|null|

# | t03|[m1, m2]|         [m1]|         [m2]|     m1| null| t03|

# | t03|[m1, m2]|         [m1]|         [m2]|     m2|  t03|null|

# | t04|    [m2]|     [m1, m2]|         [m3]|     m2| null| t04|

# | t06|    [m3]|         [m2]|     [m3, m1]|     m3|  t06|null|

# | t07|[m3, m1]|         [m3]|         [m1]|     m3| null| t07|

# | t07|[m3, m1]|         [m3]|         [m1]|     m1|  t07|null|

# | t08|    [m1]|     [m3, m1]|         [m2]|     m1| null| t08|

# | t11|    [m2]|         [m1]|     [m2, m4]|     m2|  t11|null|

# | t13|[m2, m4]|         [m2]|         [m2]|     m2| null|null|

# | t13|[m2, m4]|         [m2]|         [m2]|     m4|  t13| t13|

# | t15|    [m2]|     [m2, m4]|         [m4]|     m2| null| t15|

# | t20|    [m4]|         [m2]|           []|     m4|  t20| t20|

# | t22|[m1, m4]|           []|         null|     m1|  t22| t22|

# | t22|[m1, m4]|           []|         null|     m4|  t22| t22|

# +----+--------+-------------+-------------+-------+-----+----+

步骤2:创建按消息分区的windspec,并向前填充start列。

w2 = Window.partitionBy('message').orderBy('time')

# for illustration purpose, I used a different column-name so that we can

# compare `start` column before and after ffill

df2 = df1.withColumn('start_new', F.last('start', True).over(w2))
df2.show()

# +----+--------+-------------+-------------+-------+-----+----+---------+

# |time|messages|prev_messages|next_messages|message|start| end|start_new|

# +----+--------+-------------+-------------+-------+-----+----+---------+

# | t01|    [m1]|         null|     [m1, m2]|     m1|  t01|null|      t01|

# | t03|[m1, m2]|         [m1]|         [m2]|     m1| null| t03|      t01|

# | t07|[m3, m1]|         [m3]|         [m1]|     m1|  t07|null|      t07|

# | t08|    [m1]|     [m3, m1]|         [m2]|     m1| null| t08|      t07|

# | t22|[m1, m4]|           []|         null|     m1|  t22| t22|      t22|

# | t03|[m1, m2]|         [m1]|         [m2]|     m2|  t03|null|      t03|

# | t04|    [m2]|     [m1, m2]|         [m3]|     m2| null| t04|      t03|

# | t11|    [m2]|         [m1]|     [m2, m4]|     m2|  t11|null|      t11|

# | t13|[m2, m4]|         [m2]|         [m2]|     m2| null|null|      t11|

# | t15|    [m2]|     [m2, m4]|         [m4]|     m2| null| t15|      t11|

# | t06|    [m3]|         [m2]|     [m3, m1]|     m3|  t06|null|      t06|

# | t07|[m3, m1]|         [m3]|         [m1]|     m3| null| t07|      t06|

# | t13|[m2, m4]|         [m2]|         [m2]|     m4|  t13| t13|      t13|

# | t20|    [m4]|         [m2]|           []|     m4|  t20| t20|      t20|

# | t22|[m1, m4]|           []|         null|     m4|  t22| t22|      t22|

# +----+--------+-------------+-------------+-------+-----+----+---------+

步骤3:删除end为null的行,然后只选择所需的列:

df2.selectExpr("message", "start_new as start", "end") \
    .filter("end is not NULL") \
    .orderBy("message","start").show()

# +-------+-----+---+

# |message|start|end|

# +-------+-----+---+

# |     m1|  t01|t03|

# |     m1|  t07|t08|

# |     m1|  t22|t22|

# |     m2|  t03|t04|

# |     m2|  t11|t15|

# |     m3|  t06|t07|

# |     m4|  t13|t13|

# |     m4|  t20|t20|

# |     m4|  t22|t22|

# +-------+-----+---+

总结以上步骤,我们有以下几个步骤:

from pyspark.sql import Window, functions as F

# define two Window Specs

w1 = Window.partitionBy().orderBy('time')
w2 = Window.partitionBy('message').orderBy('time')

df_new = df \
    .withColumn('prev_messages', F.lag('messages').over(w1)) \
    .withColumn('next_messages', F.lead('messages').over(w1)) \
    .withColumn('message', F.explode('messages')) \
    .withColumn('start', F.expr("IF(array_contains(prev_messages, message),NULL,time)")) \
    .withColumn('end', F.expr("IF(array_contains(next_messages, message),NULL,time)")) \
    .withColumn('start', F.last('start', True).over(w2)) \
    .select("message", "start", "end") \
    .filter("end is not NULL")

df_new.orderBy("start").show()
9o685dep

9o685dep3#

您可以在spark 2.4中找到数组函数的信息,explode\u outer是一个explode,在空数组中,它将生成一个具有“null”值的行。
首先要获得每个时刻的消息数组,开始的消息数组,以及每个时刻结束的消息数组(start\u of和end\u of)。
然后,我们只保留消息开始或结束的时刻,然后创建一个包含3列的Dataframe,每个消息开始和结束各一列。当m1和m2被创建时,会产生2行开始,当m1开始和结束时,会产生2行,带一个m1星,和m1结束。
最后,使用窗口功能按“消息”分组并按时间排序,确保如果消息在同一时刻(同一时间)开始和结束,则开始时间将优先。现在我们可以保证在每次开始之后,都会有一个结束行。把它们混合在一起,你就会知道每条信息的开头和结尾。
很好的思考练习。
我已经用scala做了这个例子,但是应该很容易翻译。每行标记为showandcontinue,打印处于该状态的示例以显示其功能。

val w = Window.partitionBy().orderBy("time")
val w2 = Window.partitionBy("message").orderBy($"time", desc("start_of"))
df.select($"time", $"messages", lag($"messages", 1).over(w).as("pre"), lag("messages", -1).over(w).as("post"))
  .withColumn("start_of", when($"pre".isNotNull, array_except(col("messages"), col("pre"))).otherwise($"messages"))
  .withColumn("end_of",  when($"post".isNotNull, array_except(col("messages"), col("post"))).otherwise($"messages"))
  .filter(size($"start_of") + size($"end_of") > 0)
  .showAndContinue
  .select(explode(array(
    struct($"time", $"start_of", array().as("end_of")),
    struct($"time", array().as("start_of"), $"end_of")
  )).as("elem"))
  .select("elem.*")
  .select($"time", explode_outer($"start_of").as("start_of"), $"end_of")
  .select( $"time", $"start_of", explode_outer($"end_of").as("end_of"))
  .filter($"start_of".isNotNull || $"end_of".isNotNull)
  .showAndContinue
  .withColumn("message", when($"start_of".isNotNull, $"start_of").otherwise($"end_of"))
  .showAndContinue
  .select($"message", when($"start_of".isNotNull, $"time").as("starts_at"), lag($"time", -1).over(w2).as("ends_at"))
  .filter($"starts_at".isNotNull)
  .showAndContinue

还有table

+----+--------+--------+--------+--------+--------+
|time|messages|     pre|    post|start_of|  end_of|
+----+--------+--------+--------+--------+--------+
| t01|    [m1]|    null|[m1, m2]|    [m1]|      []|
| t03|[m1, m2]|    [m1]|    [m2]|    [m2]|    [m1]|
| t04|    [m2]|[m1, m2]|    [m3]|      []|    [m2]|
| t06|    [m3]|    [m2]|[m3, m1]|    [m3]|      []|
| t07|[m3, m1]|    [m3]|    [m1]|    [m1]|    [m3]|
| t08|    [m1]|[m3, m1]|    [m2]|      []|    [m1]|
| t11|    [m2]|    [m1]|[m2, m4]|    [m2]|      []|
| t13|[m2, m4]|    [m2]|    [m2]|    [m4]|    [m4]|
| t15|    [m2]|[m2, m4]|    [m4]|      []|    [m2]|
| t20|    [m4]|    [m2]|      []|    [m4]|    [m4]|
| t22|[m1, m4]|      []|    null|[m1, m4]|[m1, m4]|
+----+--------+--------+--------+--------+--------+

+----+--------+------+
|time|start_of|end_of|
+----+--------+------+
| t01|      m1|  null|
| t03|      m2|  null|
| t03|    null|    m1|
| t04|    null|    m2|
| t06|      m3|  null|
| t07|      m1|  null|
| t07|    null|    m3|
| t08|    null|    m1|
| t11|      m2|  null|
| t13|      m4|  null|
| t13|    null|    m4|
| t15|    null|    m2|
| t20|      m4|  null|
| t20|    null|    m4|
| t22|      m1|  null|
| t22|      m4|  null|
| t22|    null|    m1|
| t22|    null|    m4|
+----+--------+------+

+----+--------+------+-------+
|time|start_of|end_of|message|
+----+--------+------+-------+
| t01|      m1|  null|     m1|
| t03|      m2|  null|     m2|
| t03|    null|    m1|     m1|
| t04|    null|    m2|     m2|
| t06|      m3|  null|     m3|
| t07|      m1|  null|     m1|
| t07|    null|    m3|     m3|
| t08|    null|    m1|     m1|
| t11|      m2|  null|     m2|
| t13|      m4|  null|     m4|
| t13|    null|    m4|     m4|
| t15|    null|    m2|     m2|
| t20|      m4|  null|     m4|
| t20|    null|    m4|     m4|
| t22|      m1|  null|     m1|
| t22|      m4|  null|     m4|
| t22|    null|    m1|     m1|
| t22|    null|    m4|     m4|
+----+--------+------+-------+

+-------+---------+-------+
|message|starts_at|ends_at|
+-------+---------+-------+
|     m1|      t01|    t03|
|     m1|      t07|    t08|
|     m1|      t22|    t22|
|     m2|      t03|    t04|
|     m2|      t11|    t15|
|     m3|      t06|    t07|
|     m4|      t13|    t13|
|     m4|      t20|    t20|
|     m4|      t22|    t22|
+-------+---------+-------+

在创建的第一个表中,可以优化提取在同一时刻开始和结束的所有元素,因此它们不必再次“匹配”开始和结束,但这取决于这是一个常见情况,还是只是少量情况。它将像这样的优化(相同的窗口)

val dfStartEndAndFiniteLife = df.select($"time", $"messages", lag($"messages", 1).over(w).as("pre"), lag("messages", -1).over(w).as("post"))
  .withColumn("start_of", when($"pre".isNotNull, array_except(col("messages"), col("pre"))).otherwise($"messages"))
  .withColumn("end_of",  when($"post".isNotNull, array_except(col("messages"), col("post"))).otherwise($"messages"))
  .filter(size($"start_of") + size($"end_of") > 0)
  .withColumn("start_end_here", array_intersect($"start_of", $"end_of"))
  .withColumn("start_of", array_except($"start_of", $"start_end_here"))
  .withColumn("end_of", array_except($"end_of", $"start_end_here"))
  .showAndContinue

val onlyStartEndSameMoment = dfStartEndAndFiniteLife.filter(size($"start_end_here") > 0)
  .select(explode($"start_end_here"), $"time".as("starts_at"), $"time".as("ends_at"))
  .showAndContinue

val startEndDifferentMoment = dfStartEndAndFiniteLife
  .filter(size($"start_of") + size($"end_of") > 0)
  .showAndContinue
  .select(explode(array(
    struct($"time", $"start_of", array().as("end_of")),
    struct($"time", array().as("start_of"), $"end_of")
  )).as("elem"))
  .select("elem.*")
  .select($"time", explode_outer($"start_of").as("start_of"), $"end_of")
  .select( $"time", $"start_of", explode_outer($"end_of").as("end_of"))
  .filter($"start_of".isNotNull || $"end_of".isNotNull)
  .showAndContinue
  .withColumn("message", when($"start_of".isNotNull, $"start_of").otherwise($"end_of"))
  .showAndContinue
  .select($"message", when($"start_of".isNotNull, $"time").as("starts_at"), lag($"time", -1).over(w2).as("ends_at"))
  .filter($"starts_at".isNotNull)
  .showAndContinue

val result = onlyStartEndSameMoment.union(startEndDifferentMoment)

result.orderBy("col", "starts_at").show()

还有table

+----+--------+--------+--------+--------+------+--------------+
|time|messages|     pre|    post|start_of|end_of|start_end_here|
+----+--------+--------+--------+--------+------+--------------+
| t01|    [m1]|    null|[m1, m2]|    [m1]|    []|            []|
| t03|[m1, m2]|    [m1]|    [m2]|    [m2]|  [m1]|            []|
| t04|    [m2]|[m1, m2]|    [m3]|      []|  [m2]|            []|
| t06|    [m3]|    [m2]|[m3, m1]|    [m3]|    []|            []|
| t07|[m3, m1]|    [m3]|    [m1]|    [m1]|  [m3]|            []|
| t08|    [m1]|[m3, m1]|    [m2]|      []|  [m1]|            []|
| t11|    [m2]|    [m1]|[m2, m4]|    [m2]|    []|            []|
| t13|[m2, m4]|    [m2]|    [m2]|      []|    []|          [m4]|
| t15|    [m2]|[m2, m4]|    [m4]|      []|  [m2]|            []|
| t20|    [m4]|    [m2]|      []|      []|    []|          [m4]|
| t22|[m1, m4]|      []|    null|      []|    []|      [m1, m4]|
+----+--------+--------+--------+--------+------+--------------+

+---+---------+-------+
|col|starts_at|ends_at|
+---+---------+-------+
| m4|      t13|    t13|
| m4|      t20|    t20|
| m1|      t22|    t22|
| m4|      t22|    t22|
+---+---------+-------+

+----+--------+--------+--------+--------+------+--------------+
|time|messages|     pre|    post|start_of|end_of|start_end_here|
+----+--------+--------+--------+--------+------+--------------+
| t01|    [m1]|    null|[m1, m2]|    [m1]|    []|            []|
| t03|[m1, m2]|    [m1]|    [m2]|    [m2]|  [m1]|            []|
| t04|    [m2]|[m1, m2]|    [m3]|      []|  [m2]|            []|
| t06|    [m3]|    [m2]|[m3, m1]|    [m3]|    []|            []|
| t07|[m3, m1]|    [m3]|    [m1]|    [m1]|  [m3]|            []|
| t08|    [m1]|[m3, m1]|    [m2]|      []|  [m1]|            []|
| t11|    [m2]|    [m1]|[m2, m4]|    [m2]|    []|            []|
| t15|    [m2]|[m2, m4]|    [m4]|      []|  [m2]|            []|
+----+--------+--------+--------+--------+------+--------------+

+----+--------+------+
|time|start_of|end_of|
+----+--------+------+
| t01|      m1|  null|
| t03|      m2|  null|
| t03|    null|    m1|
| t04|    null|    m2|
| t06|      m3|  null|
| t07|      m1|  null|
| t07|    null|    m3|
| t08|    null|    m1|
| t11|      m2|  null|
| t15|    null|    m2|
+----+--------+------+

+----+--------+------+-------+
|time|start_of|end_of|message|
+----+--------+------+-------+
| t01|      m1|  null|     m1|
| t03|      m2|  null|     m2|
| t03|    null|    m1|     m1|
| t04|    null|    m2|     m2|
| t06|      m3|  null|     m3|
| t07|      m1|  null|     m1|
| t07|    null|    m3|     m3|
| t08|    null|    m1|     m1|
| t11|      m2|  null|     m2|
| t15|    null|    m2|     m2|
+----+--------+------+-------+

+-------+---------+-------+
|message|starts_at|ends_at|
+-------+---------+-------+
|     m1|      t01|    t03|
|     m1|      t07|    t08|
|     m2|      t03|    t04|
|     m2|      t11|    t15|
|     m3|      t06|    t07|
+-------+---------+-------+

+---+---------+-------+
|col|starts_at|ends_at|
+---+---------+-------+
| m1|      t01|    t03|
| m1|      t07|    t08|
| m1|      t22|    t22|
| m2|      t03|    t04|
| m2|      t11|    t15|
| m3|      t06|    t07|
| m4|      t13|    t13|
| m4|      t20|    t20|
| m4|      t22|    t22|
+---+---------+-------+

相关问题