我有一个长度为M
的1d数据流,连接了N
个通道,我想将其馈送到处理函数(即其总长度为M*N
。该处理函数预期数据被分割到通道中,并且切片大小为K
,其中K
小于或等于M
,即对应于形状为[N, K]
的2D阵列。
我目前的方法是将我的输入数据从1d整形为2d数组列表(即实际上是3D阵列)的情况如下:
# Generation of test data with a shape of N*M as 1d-array
data_list:list[float] = []
for channel_no in range(N):
data_list.extend(
(np.arange(M) + 10 * M * channel_no).tolist()
)
# Reshaping of 1d-array into 2d-arrays, effectively splitting
# the data into N channels
in_data: np.ndarray = np.asarray(data_list).reshape(
N, M
)
# Generating list of 2d-arrays with shape [N, K] and a length of M // K
target_data: np.ndarray = reshape_numpy_array_with_equal_blocks(
in_array=in_data,
target_shape=(N, K),
drop_last=True,
use_padding=True,
)
字符串
与相应的函数reshape_numpy_array_with_equal_blocks()
:
def reshape_numpy_array_with_equal_blocks(
in_array: np.ndarray,
target_shape: Tuple[int, int],
use_padding: bool = False,
drop_last: bool = True,
padding_val: float = 0.0,
) -> np.ndarray:
"""_summary_
Args:
in_array (np.ndarray): _description_
target_shape (Tuple[int, int]): _description_
use_padding (bool, optional): _description_. Defaults to False.
drop_last (bool, optional): _description_. Defaults to True.
padding_val (float, optional): _description_. Defaults to 0.0.
Raises:
NotImplementedError: _description_
Returns:
np.ndarray: _description_
"""
in_array_shape = in_array.shape
if len(in_array_shape) != 2:
raise NotImplementedError()
ret_array = []
for i in range(0, in_array_shape[-1], target_shape[-1]):
channel_array = []
if i + target_shape[-1] <= in_array_shape[-1]:
for channel in range(in_array_shape[0]):
channel_array.append(
in_array[channel, i : i + target_shape[-1]].tolist()
)
ret_array.append(channel_array[:])
else:
if not drop_last and use_padding:
for channel in range(in_array_shape[0]):
cur_data = in_array[channel, i:].tolist()
cur_data_len = len(cur_data)
cur_data.extend([padding_val] * (target_shape[-1] - cur_data_len))
channel_array.append(cur_data[:])
ret_array.append(channel_array[:])
return np.array(ret_array)
型
但是,我不确定这种方法是否是最有效的版本,因为它包含了相当多的内存重新分配和复制。更好的方法是什么?
我正在考虑使用一个生成器来访问原始数据列表的切片部分,这样就不需要额外的分配/副本,但我也不确定这是否是正确的方法。这是一个可行的解决方案吗?或者有更好的方法吗?或者这个问题可以完全解决吗?
1条答案
按热度按时间r6vfmomb1#
如果我理解正确,
M
总是可以被K
整除,那么可以使用np.split
将in_data
数组拆分为具有M//K
列的部分,然后使用np.stack
合并它们。字符串
如果
M
不总是能被K
整除,那么看起来你的代码只是删除了等于M/K
余数的列数。型
而且,正如@hpaulj在评论中所暗示的那样,这也可以使用
reshape
s和transpose
s来完成。型
在速度方面,我使用N=20,M=10和K=2进行测试。
split
/stack
:28.9 µs ± 184 ns/环路(平均值±标准值运行7次,每次循环10,000次)reshape
/transpose
:2.37 µs ± 73.5 ns/环路(平均值±标准值运行7次,每次循环100,000次)