pandas 在Dask中将列与字典进行比较

v8wbuo2f  于 2023-03-28  发布在  其他
关注(0)|答案(3)|浏览(120)

我有一本字典:

dict = {10: 1, 50: 2, 200: 3, 500: 4}

和一个Dask DataFrame:

+---+---+
|  a|  b|
+---+---+
|  1| 24|
|  1| 49|
|  2|125|
|  3|400|
+---+---+

我想groupBy a并获得最小的b值。之后,我想检查哪个dict键最接近b,并使用dict值创建一个新列。
例如,当B=24时,最接近的键是10。所以我想赋值1。这是我期望的结果:

+---+---+-------+
|  a|  b|closest|
+---+---+-------+
|  1| 24|      1|
|  1| 49|      2|
|  2|125|      3|
|  3|400|      4|
+---+---+-------+

我发现了一些与PySpark相似的东西。我一直没能让它运行,但它显然是为其他人运行的。无论如何分享它以供参考。

df = spark.createDataFrame(
    [
        (1, 24),
        (1, 49),
        (2, 125),
        (3, 400)
    ],
    ["a", "b"]
)

dict = {10:1, 50:2, 200: 3, 500: 4}

def func(value, dict):
    closest_key = (
        value if value in dict else builtins.min(
            dict.keys(), key=lambda k: builtins.abs(k - value)
        )
    )
    score = dict.get(closest_key)
    return score

df = (
    df.groupby('a')
        .agg(
            min('b')
        )
    ).withColumn('closest', func('b', dict))

据我所知,我认为在Spark版本的计算是每行完成的,我还没有能够复制。

kxxlusnw

kxxlusnw1#

所以这是另一种方法,这将返回一个numpy数组,但它比spark快,你可以很容易地重新索引它。

import numpy as np
a = pydf.toNumpy()
a = a[:,1] # Grabs your b column
np.where([a <=10,a <=50,a<=200,a<=500],[1,2,3,4],a) # Check the closest values and fill them with what you want
a2mppw5e

a2mppw5e2#

如果我的解释不正确,你仍然可以使用我写的这个示例,只需要做一些调整。
我将展示一个Fugue的解决方案,它允许您在Pandas中定义逻辑,然后将其带到Dask。
首先进行一些设置,请注意df是一个Pandas DataFrame。这意味着您可以在其上进行测试的较小样本:

import pandas as pd
import dask.dataframe as dd
import numpy as np

_dict = {10: 1, 50: 2, 200: 3, 500: 4}
df = pd.DataFrame({"a": [1,1,2,3], "b":[24,49,125,400]})
ddf = dd.from_pandas(df, npartitions=2)

然后我们定义逻辑。这是为了处理一个分区,所以列a中的所有内容都已经是相同的值。

def logic(df: pd.DataFrame) -> pd.DataFrame:
    # handles the logic for 1 group. all values in a are the same
    min_b = df['b'].min()
    keys = np.array(list(_dict.keys()))
    # closest taken from https://stackoverflow.com/a/10465997/11163214
    closest = keys[np.abs(keys - min_b).argmin()]
    closest_val = _dict[closest]
    df = df.assign(closest=closest_val)
    return df

我们可以在Pandas上测试一下:

logic(df.loc[df['a'] == 1])

我们会得到:

a   b   closest
0   1   24  1
1   1   49  1

然后我们可以将它带到Dask with Fugue。我们只需要调用transform函数:

from fugue import transform

ddf = transform(ddf,
          logic,
          schema="*,closest:int",
          partition={"by":"a"},
          engine="dask")
ddf.compute()

这可以接受Pandas或Dask DataFrame,并将输出Dask DataFrame,因为我们指定了"dask"引擎。如果您想要Spark DataFrame,还有一个"spark"引擎。
模式是分布式计算的必要条件,因此我们在这里指定输出模式。我们还按列a进行分区。

zf9nrax1

zf9nrax13#

我发现了一个使用纯dask的解决方案,以防你不想依赖其他包。
待测样品:

import pandas as pd
import dask.dataframe as dd
import numpy as np

_dict = {10: 1, 50: 2, 200: 3, 500: 4}
df = pd.DataFrame({"a": [1,1,2,3], "b":[24,49,125,400]})
ddf = dd.from_pandas(df, npartitions=2)

功能:

def get_closest(value: int, boundaries: dict) -> int:
    keys = np.array(list(boundaries.keys()))
    closest = keys[np.abs(keys - value).argmin()]
    score = boundaries[closest]

    return score

然后我们应用它:

ddf['closest'] = ddf['b'].apply(
    get_closest,
    args=(_dict, True),
    meta=('int32')
)

相关问题