我有一本字典:
dict = {10: 1, 50: 2, 200: 3, 500: 4}
和一个Dask DataFrame:
+---+---+
| a| b|
+---+---+
| 1| 24|
| 1| 49|
| 2|125|
| 3|400|
+---+---+
我想groupBy a并获得最小的b值。之后,我想检查哪个dict键最接近b,并使用dict值创建一个新列。
例如,当B=24时,最接近的键是10。所以我想赋值1。这是我期望的结果:
+---+---+-------+
| a| b|closest|
+---+---+-------+
| 1| 24| 1|
| 1| 49| 2|
| 2|125| 3|
| 3|400| 4|
+---+---+-------+
我发现了一些与PySpark相似的东西。我一直没能让它运行,但它显然是为其他人运行的。无论如何分享它以供参考。
df = spark.createDataFrame(
[
(1, 24),
(1, 49),
(2, 125),
(3, 400)
],
["a", "b"]
)
dict = {10:1, 50:2, 200: 3, 500: 4}
def func(value, dict):
closest_key = (
value if value in dict else builtins.min(
dict.keys(), key=lambda k: builtins.abs(k - value)
)
)
score = dict.get(closest_key)
return score
df = (
df.groupby('a')
.agg(
min('b')
)
).withColumn('closest', func('b', dict))
据我所知,我认为在Spark版本的计算是每行完成的,我还没有能够复制。
3条答案
按热度按时间kxxlusnw1#
所以这是另一种方法,这将返回一个numpy数组,但它比spark快,你可以很容易地重新索引它。
a2mppw5e2#
如果我的解释不正确,你仍然可以使用我写的这个示例,只需要做一些调整。
我将展示一个Fugue的解决方案,它允许您在Pandas中定义逻辑,然后将其带到Dask。
首先进行一些设置,请注意
df
是一个Pandas DataFrame。这意味着您可以在其上进行测试的较小样本:然后我们定义逻辑。这是为了处理一个分区,所以列
a
中的所有内容都已经是相同的值。我们可以在Pandas上测试一下:
我们会得到:
然后我们可以将它带到Dask with Fugue。我们只需要调用
transform
函数:这可以接受Pandas或Dask DataFrame,并将输出Dask DataFrame,因为我们指定了
"dask"
引擎。如果您想要Spark DataFrame,还有一个"spark"
引擎。模式是分布式计算的必要条件,因此我们在这里指定输出模式。我们还按列a进行分区。
zf9nrax13#
我发现了一个使用纯dask的解决方案,以防你不想依赖其他包。
待测样品:
功能:
然后我们应用它: