python 在pandas、polars或torch中高效迭代和应用函数?懒惰可能吗?

kb5ga3dv  于 2023-11-15  发布在  Python
关注(0)|答案(1)|浏览(117)

目标:在python或python库中,找到一种高效/最快的方法,按列遍历表,并在每列上运行一个函数。
背景:我一直在探索提高函数速度的方法。这是因为我有两个模型/算法,我想运行一个小的,一个大的(使用torch)。我一直在使用小模型进行测试。小模型是每个列的季节分解。

在pandas/polars中设置演示数据:

rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.

字符串

Pandas

pandas和dict:

from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd

class pdDictTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> pd.DataFrame:
        trend_data_dict = {}
        for column in dataframe.columns:
                trend_data_dict[column] = cls().process_col(dataframe[column])
        trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
        return trend_dataframes
import timeit

start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))


程序执行于14.349091062998923
使用列表解析而不是for循环:

trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}


程序执行于14.543703391002055
使用pandas和torch类:
from statsmodels.tsa.seasonal import seasonal_decompose import torch import pandas pd

class pdTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return torch.tensor(trend, dtype=torch.float32).view(-1, 1)

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> torch.Tensor:
        trend_dataframes = torch.Tensor()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
        return trend_dataframes

start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))


程序执行于23.14214362200073

北极星

polars & lamdba:

start = timeit.default_timer()

df_p = df_p.select([
    pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))


程序执行于82.5596211330012
我怀疑这篇文章写得很差&这就是它这么慢的原因。我还没有找到更好的方法。
到目前为止,我已经尝试过,apply_many,apply,map,map_batches或map_elements.. with_columns vs select和其他一些组合。
仅极坐标,用于循环:

class plTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        
        # Handle missing values by replacing NaN with 0
        result.trend[np.isnan(result.trend)] = 0
        return pl.DataFrame({column_data.name: result.trend})

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> pl.DataFrame:
        trend_dataframes = pl.DataFrame()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = trend_dataframes.hstack(trend_data)
        return trend_dataframes


程序执行于13.34212675299932
使用列表解析:
我试过使用polars和列表理解,但在polars语法上有困难。
使用dict & for循环:
程序执行于13.743039597999996
使用dict和list理解:
程序执行于13.008102383002552
关于LazyFrame?
我可以将lazy & collect添加到上面的df_p.select()方法中,但这样做并不能提高时间。其中一个关键问题似乎是传递给lazy操作的函数也需要是lazy的。我希望它可以并行运行每个列。

目前的结论

  • Pandas和dict,似乎是合理的。如果你关心索引,那么这可能是一个很好的选择。
  • 有字典和列表理解能力的Polar是最快的,但也快不了多少。
  • 这两种选择还具有不需要附加包的优点。
  • polars似乎还有改进的空间。在更好的代码方面,但不确定这是否会大大改善时间。作为主要的,计算时间是seasonal_decompose。如果单独运行,每列需要约0.012秒。
  • 对任何改进的反馈持开放态度
kuarbcqp

kuarbcqp1#

关于Polars,在这种情况下使用.select().map_batches()没有多大意义。
你把所有的数据通过Polars表达式引擎,传递回Python运行你的外部函数,再传递回Polars。
您可以简单地循环每个列并将其传递给seasonal_decompose(),类似于pandas理解。

pl.DataFrame({
    col.name: seasonal_decompose(col, model="Additive", period=365).trend
    for col in df_p
})

字符串
我注意到的一件事是,如果你从每一列创建一个LazyFrame并使用pl.collect_all(),它会将.map_batches()方法的速度提高50%左右(也许这可以研究一下)。
(虽然还是比领悟慢了一点。)

lf = df_p.lazy()

lazy_columns = [ 
    lf.select(pl.col(col).map_batches(
        lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend))
    ) 
    for col in lf.columns 
]

out = pl.concat(pl.collect_all(lazy_columns), how="horizontal")


从本质上讲,问题变成了“如何并行化Python for循环?"
正如@roganjosh指出的那样,这是用multiprocessing.完成的

from multiprocessing import get_context

...

if __name__ == "__main__":

    df_p = ...

    with get_context("spawn").Pool() as pool:
        columns = pool.map(process_column, (col for col in df_p))


出于兴趣,这个例子对我来说,多处理比常规理解快50%。
但是它是非常特定于任务/数据/平台的,所以你必须在本地进行基准测试。

相关问题