我在Spark上使用pandas API。我使用groupby.agg
操作。我找到了a similar issue,但解决方案不适合我。我也查了官方文件。但这些文档并没有提供足够的例子。
以下是我的样本数据:
| | O_ORDER优先级| O_ORDERPRIORITY |
| --|--|--|
| 0 |邮件|2 -高|
| 1 |船|1 -紧急|
我只想通过以下udf对L_SHIPMODE
进行分组并对O_ORDERPRIORITY
进行计数:
@pandas_udf(IntegerType())
def g1(x):
return ((x == "1-URGENT") | (x == "2-HIGH")).sum()
@pandas_udf(IntegerType())
def g2(x):
return ((x != "1-URGENT") & (x != "2-HIGH")).sum()
# tryied register, but it seems this is not the problem
# spark.udf.register('g1_udf', g1)
# spark.udf.register('g2_udf', g2)
total = jn.groupby("L_SHIPMODE", as_index=False)["O_ORDERPRIORITY"].agg({"O_ORDERPRIORITY": [g1, g2]})
我阴阳怪气:ValueError: aggs must be a dict mapping from column name to aggregate functions (string or list of strings).
是否有关于如何在groupby.agg
上使用UDF的详细示例?
1条答案
按热度按时间bfrts1fy1#
我认为如果你在groupby之前创建一些新列(使用与你类似的示例框架),你可以完全避免使用udfs:
测试结果: