numpy Python大规模数组和高时间开销

6tqwzwtp  于 2023-08-05  发布在  Python
关注(0)|答案(1)|浏览(114)

我想创建一个np.ndarray作为机器学习模型的输入:

array_X = np.array([list(w.values) for w in df[['close', 'volume']].rolling(window=20)][19:-1])

字符串
这是时间序列中的标准方法,我们使用过去值的窗口作为输入来预测未来值。阵列的形状为2 * 20 * 20000000。这将花费大量的时间来构建这样的数组,有时会有一个错误,由数组消耗的内存太大。
有什么方法可以改善上述问题(时间成本和内存错误)?

kognpnkq

kognpnkq1#

你的原始代码给了我一个错误,因为数组中前几个条目的维度不匹配,这些条目太短了,因为窗口还没有满,所以我修改了它,以丢弃第一个值:

def rolling_approach(df, window_size=3):
    return np.array(
        [w.values for w in df[["close", "volume"]].rolling(window=window_size)][
            window_size - 1 :
        ]
    )

字符串
pd.DataFrame.rolling对于这些类型的操作可能非常慢。shiftpandas中非常有效,下面是window_size=3的示例:

pd.concat(
            [
                df[["close", "volume"]].shift().shift(),
                df[["close", "volume"]].shift(),
                df[["close", "volume"]],
            ],
            axis=1,
        )
        .values[2:, :]
        .reshape(-1, 3, 2)
    )


我们将移位叠加,然后重新塑造值。
将其推广到变量window_size,我们得到:

def shift_approach(df, window_size=3):
    shifted_df = pd.concat(
        [df[["close", "volume"]].shift(i) for i in range(window_size - 1, -1, -1)],
        axis=1,
    )
    reshaped_array = shifted_df.values[window_size - 1 :, :].reshape(-1, window_size, 2)
    return reshaped_array


shift的性能比rolling高出近两个数量级:


的数据
在我的MacBook上,它可以很好地扩展到数亿行:

def setup(N):
    np.random.seed(42)
    close_values = np.random.randint(1, 100, size=N)
    volume_values = np.random.randint(100, 1000, size=N)
    df = pd.DataFrame({"close": close_values, "volume": volume_values})
    return [df, 10]

approaches = [rolling_approach, shift_approach]
# Show correctness
for approach in approaches[1:]:
    data = setup(10)
    assert np.isclose(approach(*data), approaches[0](*data)).all()

run_performance_comparison(
    approaches,
    [
        1000,
        3000,
        5000,
        10000,
        30000,
        100_000,
        300_000,
        500_000,
        1_000_000,
        3_000_000,
        5_000_000,
        10_000_000,
    ],
    setup=setup,
    title="Performance Comparison",
    number_of_repetitions=2,
)


分析代码:

import timeit
from functools import partial

import matplotlib.pyplot as plt
from typing import List, Dict, Callable

from contextlib import contextmanager

@contextmanager
def data_provider(data_size, setup=lambda N: N, teardown=lambda: None):
    data = setup(data_size)
    yield data
    teardown()

def run_performance_comparison(approaches: List[Callable],
                               data_size: List[int],
                               setup=lambda N: N,
                               teardown=lambda: None,
                               number_of_repetitions=5, title='Performance Comparison', data_name='N'):
    approach_times: Dict[Callable, List[float]] = {approach: [] for approach in approaches}
    for N in data_size:
        with data_provider(N, setup, teardown) as data:
            for approach in approaches:
                function = partial(approach, *data)
                approach_time = min(timeit.Timer(function).repeat(repeat=number_of_repetitions, number=2))
                approach_times[approach].append(approach_time)

    for approach in approaches:
        plt.plot(data_size, approach_times[approach], label=approach.__name__)
    plt.yscale('log')
    plt.xscale('log')

    plt.xlabel(data_name)
    plt.ylabel('Execution Time (seconds)')
    plt.title(title)
    plt.legend()
    plt.show()

相关问题