numpy 在Python中将1d-array整形为2d-array列表

rn0zuynd  于 2023-08-05  发布在  Python
关注(0)|答案(1)|浏览(72)

我有一个长度为M的1d数据流,连接了N个通道,我想将其馈送到处理函数(即其总长度为M*N。该处理函数预期数据被分割到通道中,并且切片大小为K,其中K小于或等于M,即对应于形状为[N, K]的2D阵列。
我目前的方法是将我的输入数据从1d整形为2d数组列表(即实际上是3D阵列)的情况如下:

# Generation of test data with a shape of N*M as 1d-array
data_list:list[float] = []
for channel_no in range(N):
    data_list.extend(
        (np.arange(M) + 10 * M * channel_no).tolist()
    )

# Reshaping of 1d-array into 2d-arrays, effectively splitting 
# the data into N channels
in_data: np.ndarray = np.asarray(data_list).reshape(
    N, M
)

# Generating list of 2d-arrays with shape [N, K] and a length of M // K
target_data: np.ndarray = reshape_numpy_array_with_equal_blocks(
    in_array=in_data,
    target_shape=(N, K),
    drop_last=True,
    use_padding=True,
)

字符串
与相应的函数reshape_numpy_array_with_equal_blocks()

def reshape_numpy_array_with_equal_blocks(
    in_array: np.ndarray,
    target_shape: Tuple[int, int],
    use_padding: bool = False,
    drop_last: bool = True,
    padding_val: float = 0.0,
) -> np.ndarray:
    """_summary_

    Args:
        in_array (np.ndarray): _description_
        target_shape (Tuple[int, int]): _description_
        use_padding (bool, optional): _description_. Defaults to False.
        drop_last (bool, optional): _description_. Defaults to True.
        padding_val (float, optional): _description_. Defaults to 0.0.

    Raises:
        NotImplementedError: _description_

    Returns:
        np.ndarray: _description_
    """
    in_array_shape = in_array.shape
    if len(in_array_shape) != 2:
        raise NotImplementedError()
    ret_array = []
    for i in range(0, in_array_shape[-1], target_shape[-1]):
        channel_array = []
        if i + target_shape[-1] <= in_array_shape[-1]:
            for channel in range(in_array_shape[0]):
                channel_array.append(
                    in_array[channel, i : i + target_shape[-1]].tolist()
                )
            ret_array.append(channel_array[:])
        else:
            if not drop_last and use_padding:
                for channel in range(in_array_shape[0]):
                    cur_data = in_array[channel, i:].tolist()
                    cur_data_len = len(cur_data)
                    cur_data.extend([padding_val] * (target_shape[-1] - cur_data_len))
                    channel_array.append(cur_data[:])
                ret_array.append(channel_array[:])
    return np.array(ret_array)


但是,我不确定这种方法是否是最有效的版本,因为它包含了相当多的内存重新分配和复制。更好的方法是什么?
我正在考虑使用一个生成器来访问原始数据列表的切片部分,这样就不需要额外的分配/副本,但我也不确定这是否是正确的方法。这是一个可行的解决方案吗?或者有更好的方法吗?或者这个问题可以完全解决吗?

r6vfmomb

r6vfmomb1#

如果我理解正确,M总是可以被K整除,那么可以使用np.splitin_data数组拆分为具有M//K列的部分,然后使用np.stack合并它们。

target_data = np.stack(np.split(in_data, M//K, 1))

字符串
如果M不总是能被K整除,那么看起来你的代码只是删除了等于M/K余数的列数。

target_data = np.stack(np.split(in_data[:,:None if not M%K else -(M%K)], M//K, 1))


而且,正如@hpaulj在评论中所暗示的那样,这也可以使用reshape s和transpose s来完成。

target_data = np.transpose(in_data[:,:None if not M%K else -(M%K)].T.reshape(M//K,K,N), (0,2,1))


在速度方面,我使用N=20,M=10和K=2进行测试。

  • OP的方法:62.7 µs ± 3.92 µs/循环(平均值±标准值)运行7次,每次循环10,000次)
  • split / stack:28.9 µs ± 184 ns/环路(平均值±标准值运行7次,每次循环10,000次)
  • reshape / transpose:2.37 µs ± 73.5 ns/环路(平均值±标准值运行7次,每次循环100,000次)

相关问题