编译在GPU上运行的pytorch函数

7lrncoxx  于 2023-11-19  发布在  其他
关注(0)|答案(1)|浏览(109)

**目标:**我有一个循环调用的函数,输入一个1DTensor,一个2DTensor。我在这个函数中使用torch.linalg.solve()。我想并行化循环以优化运行时。
**设置:**我有三个主要Tensor:

  1. input_tensor:尺寸50x100x100
  2. host_tensor:尺寸100x100x100
  3. A:尺寸50x100(设计矩阵)
    input_tensor有100x100 input_vector,所有长度都是50。它们也都有不同数量的NaN被屏蔽,因此input_vector被屏蔽的长度小于或等于50**。请注意,设计矩阵A也将被屏蔽,大小为(屏蔽x 100)。
    因为每个input_vectorA具有不同的掩码长度,所以需要逐点运行该函数。

**问题:**有没有什么方法可以让下面的代码更快?如何处理设计矩阵Ainput_vector在每次迭代时的大小不同?
**重要提示:**NaN不能被0替换,因为这会使线性求解过程失败。作为背景,我问了一个关于类似过程here的问题。
产品代码:

import torch
from tqdm import tqdm
import numpy as np
from datetime import datetime

# Create "device" so we can migrate the tensors to GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Set the seed for reproducibility
torch.manual_seed(42) 

# Set shapes to generate tensors
B, C = 500, 500
M, N = 100, 50

# Generate tensors
input_tensor = torch.randn(N, B, C)
host_tensor = torch.randn(M, B, C)
A = torch.randn(N, M)

# --- Here we input random NaNs in the input_tensor to simulate missing data --- #
# Define the probability of inserting NaN at each element
probability = 0.2  # You can adjust this as needed

# Generate random indices based on the probability
shape = input_tensor.shape
random_indices = torch.rand(shape) < probability

# Replace the selected indices with NaN values
input_tensor[random_indices] = float('nan')

# --- Migrate matrices to GPU --- #
A = A.to(device)
input_tensor = input_tensor.to(device)
host_tensor = host_tensor.to(device)
A = A.to(device)

t_start = datetime.now()
# --- Function that creates a vector size M from input_vector (size N) and A
def solver(input_vector, A):

    # We create a mask to reduce the row size of A: rows where input_vector is NaN are not considered in the solver
    mask = ~torch.isnan(input_vector)

    # Mask the vector
    input_vector_masked = input_vector[mask]

    # Mask the array
    A_masked = A[mask]
    A_trans = A_masked.T

    # Solve the linear system of equation: A.TA = A.Tvec_Obs
    return torch.linalg.solve(A_trans@A_masked, A_trans@input_vector_masked)

# --- Iterate through each vector of the input_tensor --- #

# Define the total number of iterations
total_iterations = B*C
# Create a tqdm progress bar
progress_bar = tqdm(total=total_iterations, dynamic_ncols=False, mininterval=1.0)

# Iterate through every cell of input_array
for i in range(host_tensor.shape[1]):
    for j in range(host_tensor.shape[2]):
        host_tensor[:,i,j] = solver(input_tensor[:,i,j], A)
        progress_bar.update(1)  # Update the progress bar
t_stop = datetime.now()

print(f"Inversion took {(t_stop - t_start).total_seconds():.2f}s")

字符串

to94eoyn

to94eoyn1#

我得到了一个有点不满意的答案。但是让我们一步一步来。

清零nan s ==删除nan s

首先,你可以把nan s替换为零。举个例子:假设你有一个向量v和一个矩阵A,给定为

v = [v1 v2 v3]  # N elements
A = [[a11 a12 a13]  # NxM elements
     [a21 a22 a23]
     [a31 a32 a33]]

字符串
现在,假设v2 = nan,因此需要抑制。
您当前在solver()中所做的是将v的非nan元素作为m,将A的相应行作为M,然后计算A_for_solving = M.T @ MB_for_solving = M.T @ v,即

m = [v1 v3]  # Masked v (n < N elements)
M = [[a11 a12 a13]  # Masked A (nxM elements)
     [a31 a32 a33]]
A_for_solving = M.T @ M  # MxM elements
B_for_solving = M.T @ m  # M elements
result = linalg.solve(A_for_solving, B_for_solving)


你应该注意两件事:

  1. A_for_solvingB_for_solving的形状始终保持不变,无论v中有多少元素(以及A中的行)被删除:A_for_solving始终是M×M矩阵,B_for_solving始终是M元素向量。这暗示了我们实际上仍然可以并行计算的可能性。
    1.更重要的是,如果你将v中的nan s和A中相应的行替换为零,你将在A_for_solvingB_for_solving中产生完全相同的值!
    换句话说,你可以做以下事情:
z = [v1 0 v3]  # Zeroed v
Z = [[a11 a12 a13]  # Zeroed A
     [  0   0   0]
     [a31 a32 a33]]
A_for_solving = Z.T @ Z
B_for_solving = Z.T @ z
result = linalg.solve(A_for_solving, B_for_solving)


.你会得到与以前完全相同的linalg.solve()输入!
你可以很容易地用你的当前代码来检查它,方法是扩展它以进行测试,如下所示:

def solver(input_vector, A):

    mask = ~torch.isnan(input_vector)
    input_vector_masked = input_vector[mask]

    A_masked = A[mask]
    A_trans = A_masked.T
    
    # Start sanity check: nan-zeroing is the same as nan-dropping
    A_zeroed = A.clone(); A_zeroed[~mask] = 0
    input_vector_zeroed = input_vector.clone(); input_vector_zeroed[~mask] = 0
    assert torch.allclose(A_masked.T @ A_masked,
                          A_zeroed.T @ A_zeroed, atol=1e-5)
    assert torch.allclose(A_masked.T @ input_vector_masked,
                          A_zeroed.T @ input_vector_zeroed, atol=1e-5)
    # End sanity check
    
    return torch.linalg.solve(A_trans@A_masked, A_trans@input_vector_masked)

批量计算

如果我们使用归零方法,我们可以再次并行化代码,因为我们现在再次为每个掩码提供相同大小的输入。相应的函数可以如下所示:

def solver_batch(inpt, a):
    inpt = inpt.permute(1, 2, 0).unsqueeze(-1)  # BxCxNx1
    mask = torch.isnan(inpt)  # CAUTION: True for NaNs, unlike `mask` in the question!
    a_zeroed = a.repeat(*inpt.shape[:2], 1, 1)  # BxCxNxM
    a_zeroed[mask.expand(-1, -1, -1, a.shape[-1])] = 0
    at_a = a_zeroed.transpose(-2, -1) @ a_zeroed  # BxCxMxM
    inpt_zeroed = inpt.clone()
    inpt_zeroed[mask] = 0
    at_input = a_zeroed.transpose(-2, -1) @ inpt_zeroed  # BxCxMx1
    result = torch.linalg.solve(at_a, at_input)
    return result.squeeze(-1).permute(2, 0, 1)  # MxBxC

注意事项

批处理解决方案与answer that I posted to your previous question非常相似。但有两个警告:

注意事项1:内存使用

由于我们需要一个不同的矩阵A,因此现在每个输入向量都需要A.T @ A,在给定的示例中,我们最终得到一个大小为500×500×100 × 100的Tensorat_a。这是 * 巨大 *(在本例中是25亿个元素的Tensor)。在我的情况下,它不适合GPU,所以我必须做的是分块处理输入Tensor:

chunk_size = 50  # TODO: adjust chunk size for your hardware
for lo in range(0, input_tensor.shape[-1], chunk_size):
    chunk_result = solver_batch(input_tensor[..., lo:lo+chunk_size], A)
    host_tensor[..., lo:lo+chunk_size] = chunk_result


这仍然比逐个处理输入元素要快得多。

警告2:数值不稳定

我试着用下面的for循环来检查结果,类似于我之前的答案中的健全性检查:

for i in range(host_tensor.shape[1]):
    for j in range(host_tensor.shape[2]):
        input_vec = input_tensor[..., i, j]
        res_vec = host_tensor[..., i, j]
        mask = ~torch.isnan(input_vec)
        M = A[mask]
        assert torch.allclose((M.T @ M) @ res_vec, M.T @ input_vec[mask], atol=1e-3)


我们在这里检查的是A @ X = B应该对X = solve(A, B)成立。然而,这似乎不是给定数据的情况,* 无论是对我还是对你的方法 *。我不知道这是一个数值不稳定的问题(我的数学技能不足)还是我犯了一些愚蠢的错误。

相关问题