目标:在python或python库中,找到一种高效/最快的方法,按列遍历表,并在每列上运行一个函数。
背景:我一直在探索提高函数速度的方法。这是因为我有两个模型/算法,我想运行一个小的,一个大的(使用torch)。我一直在使用小模型进行测试。小模型是每个列的季节分解。
在pandas/polars中设置演示数据:
rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.
字符串
Pandas
pandas和dict:
from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd
class pdDictTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {}
for column in dataframe.columns:
trend_data_dict[column] = cls().process_col(dataframe[column])
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
import timeit
start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
型
程序执行于14.349091062998923
使用列表解析而不是for循环:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
型
程序执行于14.543703391002055
使用pandas和torch类:
from statsmodels.tsa.seasonal import seasonal_decompose import torch import pandas pd
class pdTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
型
程序执行于23.14214362200073
北极星
polars & lamdba:
start = timeit.default_timer()
df_p = df_p.select([
pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
型
程序执行于82.5596211330012
我怀疑这篇文章写得很差&这就是它这么慢的原因。我还没有找到更好的方法。
到目前为止,我已经尝试过,apply_many,apply,map,map_batches或map_elements.. with_columns vs select和其他一些组合。
仅极坐标,用于循环:
class plTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
# Handle missing values by replacing NaN with 0
result.trend[np.isnan(result.trend)] = 0
return pl.DataFrame({column_data.name: result.trend})
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_dataframes = pl.DataFrame()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = trend_dataframes.hstack(trend_data)
return trend_dataframes
型
程序执行于13.34212675299932
使用列表解析:
我试过使用polars和列表理解,但在polars语法上有困难。
使用dict & for循环:
程序执行于13.743039597999996
使用dict和list理解:
程序执行于13.008102383002552
关于LazyFrame?
我可以将lazy & collect添加到上面的df_p.select()
方法中,但这样做并不能提高时间。其中一个关键问题似乎是传递给lazy操作的函数也需要是lazy的。我希望它可以并行运行每个列。
目前的结论
- Pandas和dict,似乎是合理的。如果你关心索引,那么这可能是一个很好的选择。
- 有字典和列表理解能力的Polar是最快的,但也快不了多少。
- 这两种选择还具有不需要附加包的优点。
- polars似乎还有改进的空间。在更好的代码方面,但不确定这是否会大大改善时间。作为主要的,计算时间是seasonal_decompose。如果单独运行,每列需要约0.012秒。
- 对任何改进的反馈持开放态度
1条答案
按热度按时间kuarbcqp1#
关于Polars,在这种情况下使用
.select()
和.map_batches()
没有多大意义。你把所有的数据通过Polars表达式引擎,传递回Python运行你的外部函数,再传递回Polars。
您可以简单地循环每个列并将其传递给
seasonal_decompose()
,类似于pandas理解。字符串
我注意到的一件事是,如果你从每一列创建一个LazyFrame并使用
pl.collect_all()
,它会将.map_batches()
方法的速度提高50%左右(也许这可以研究一下)。(虽然还是比领悟慢了一点。)
型
从本质上讲,问题变成了“如何并行化Python for循环?"
正如@roganjosh指出的那样,这是用multiprocessing.完成的
型
出于兴趣,这个例子对我来说,多处理比常规理解快50%。
但是它是非常特定于任务/数据/平台的,所以你必须在本地进行基准测试。