带有groupby和row_number问题pyspark收集列表:每次调用show()时,行的顺序都会改变

qyyhg6bp  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(209)

下面列出的行为是预期的还是错误?

创建DF

data_list = [
    ['Blue', 2, 3, 1],
    ['Green', 1, 5, 4],
    ['Green', 4, 1, 3],
    ['Blue', 2, 4, 1],
    ['Green', 1, 5, 2]
]
all_cols = ['COLOR','COL1','COL2','COL3']
df = sqlContext.createDataFrame(data_list, all_cols)
df.show()
+-----+----+----+----+
|COLOR|COL1|COL2|COL3|
+-----+----+----+----+
| Blue|   2|   3|   1|
|Green|   1|   5|   4|
|Green|   4|   1|   3|
| Blue|   2|   4|   1|
|Green|   1|   5|   2|
+-----+----+----+----+

添加行标识

df.createOrReplaceTempView('df')
df = spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df')
df.printSchema()
root
 |-- ROW_ID: integer (nullable = true)
 |-- COLOR: string (nullable = true)
 |-- COL1: long (nullable = true)
 |-- COL2: long (nullable = true)
 |-- COL3: long (nullable = true)
df.show()
+------+-----+----+----+----+
|ROW_ID|COLOR|COL1|COL2|COL3|
+------+-----+----+----+----+
|     1|Green|   4|   1|   3|
|     2| Blue|   2|   4|   1|
|     3|Green|   1|   5|   2|
|     4| Blue|   2|   3|   1|
|     5|Green|   1|   5|   4|
+------+-----+----+----+----+

通过在第一个DF上应用'groupby'来创建另一个DF:

grp_df = df.groupby(col_grp_by).agg(collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
+-----+---------+
|COLOR|  IDX_VAL|
+-----+---------+
|Green|[1, 3, 5]|
| Blue|   [2, 4]|
+-----+---------+
grp_df.printSchema()
root
 |-- COLOR: string (nullable = true)
 |-- IDX_VAL: array (nullable = true)
 |    |-- element: integer (containsNull = true)

如果我再次执行“grp_df.show()”,请查看以下内容。

“IDX_瓦尔”列中的列表元素已更改!!!

grp_df.show()
+-----+---------+
|COLOR|  IDX_VAL|
+-----+---------+
|Green|[2, 3, 5]|
| Blue|   [1, 4]|
+-----+---------+
yi0zb3m4

yi0zb3m41#

正如我在注解中提到的,分组 Dataframe 中的问题实际上是原始 Dataframe 看似随机排序的副作用。请注意,在您的示例中,ROW_ID并没有按COLOR对DataFrame排序,即使您指定它按COLOR排序,* 还是 *?
问题是在order by "COLOR"中的"COLOR"前后有引号,这使得它成为字符串"COLOR",而不是列COLOR
请考虑执行计划的差异:

spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df').explain()

# == Physical Plan ==

# *Project [ROW_ID#350, COLOR#326, COL1#327L, COL2#328L, COL3#329L]

# +- Window [row_number() windowspecdefinition(COLOR ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#350], [COLOR ASC NULLS FIRST]

# +- *Sort [COLOR ASC NULLS FIRST], false, 0

# +- Exchange SinglePartition

# +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]

spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df').explain()

# == Physical Plan ==

# *Project [ROW_ID#342, COLOR#326, COL1#327L, COL2#328L, COL3#329L]

# +- Window [row_number() windowspecdefinition(COLOR#326 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#342], [COLOR#326 ASC NULLS FIRST]

# +- *Sort [COLOR#326 ASC NULLS FIRST], false, 0

# +- Exchange SinglePartition

# +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]

您可以看到第二个(正确)在Window规范定义中具有COLOR#326-这表明它正在使用该列进行排序。
因此,如果删除引号,您应该会看到更一致的结果:

df = spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df')
df.show()

# +------+-----+----+----+----+

# |ROW_ID|COLOR|COL1|COL2|COL3|

# +------+-----+----+----+----+

# |     1| Blue|   2|   3|   1|

# |     2| Blue|   2|   4|   1|

# |     3|Green|   1|   5|   4|

# |     4|Green|   4|   1|   3|

# |     5|Green|   1|   5|   2|

# +------+-----+----+----+----+

请注意,DataFrame实际上是按COLOR排序的。

grp_df = df.groupby('COLOR').agg(f.collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()

# +-----+---------+

# |COLOR|  IDX_VAL|

# +-----+---------+

# | Blue|   [1, 2]|

# |Green|[3, 4, 5]|

# +-----+---------+

但是,在相同的COLOR中,您可能仍然会得到不一致的排序,因为您没有指定在按COLOR排序后如何断开连接。这不会影响在此特定情况下的分组值,但建议您确保在所有情况下排序都是确定的。
对于您的数据,您可以通过执行以下操作来获得确定性结果:

df = spark.sql(
    'select row_number() over (order by COLOR, COL1, COL2, COL3) as ROW_ID, * from df'
)
h43kikqp

h43kikqp2#

当row_number()在分配顺序时遇到平局时,它会以不确定的方式运行。每次调用pyspark上的操作时,它都可能分配不同的行号。如果要保持它不可变,并且除了列“COLOR”之外,顺序对您来说并不重要,请使用cache或persist。

df.createOrReplaceTempView('df')
df = spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df')
df.persist()

grp_df = df.groupby('COLOR').agg(F.collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
+-----+---------+
|COLOR|  IDX_VAL|
+-----+---------+
|Green|[1, 2, 3]|
| Blue|   [4, 5]|
+-----+---------+

grp_df.show()
+-----+---------+
|COLOR|  IDX_VAL|
+-----+---------+
|Green|[1, 2, 3]|
| Blue|   [4, 5]|
+-----+---------+

此外,正如其他人所指出的,最好通过指定其他顺序标准来打破这种平局。

相关问题