numpy 在Python中连接和打乱字符串列表中的条目

iih3973s  于 11个月前  发布在  Python
关注(0)|答案(2)|浏览(120)

假设我有一个字符串列表(或NumPy数组),例如,X =“abc”,“def”,“ghi”,“jkl”],一个索引列表,例如,J = [0,3],以及一个随机排列p,例如,[0,3,2,5,1,4]。
我想找到一种时间效率最高的方法来连接X中对应于索引J的字符串,即,对J中的每个j连接X[j]以得到'abcjkl',并将置换p应用于此字符串,即'ajclbk'。将置换p应用于字符串Y会导致另一个字符串Z,使得Z[i] = Y[p[i]]。

现在,我有X,J和p作为NumPy数组,并且

X = numpy.array(['abc', 'def', 'ghi', 'jkl'])
J = numpy.array([0, 3])
p = numpy.array([0, 3, 2, 5, 1, 4])
Y = ''.join(X[J])
Z = numpy.array(list(Y))
res = ''.join(Z[p])

字符串
有没有更快的方法来实现这一点?* 如果存在列表的解决方案,我不必使用NumPy数组 *。在我的应用程序中,列表X可以有1000万个字符串(使用英语字母表形成),每个字符串的长度为1000,索引J可以为500万,排列p可以为50亿。

pcww981p

pcww981p1#

TL;DR

在假设原始输入中的字符串长度相等的情况下,您可以解析索引串联,结合np.ndarray.view(),可以实现非常快速的仅NumPy实现:

import numpy as np

def take2s_reidx_np(arr, idx_a, idx_b):
    k = len(arr[0])
    q, r = np.divmod(idx_b, k)
    return arr.view(("U", 1))[r + k * idx_a[q]].view(("U", len(idx_b)))[0]

字符串
这可以用Numba重新实现,以最大限度地减少临时内存使用:

import numpy as np
import numba as nb

@nb.njit(fastmath=True)
def apply_cat_idx_nb(arr, idx_a, idx_b, k):
    n = len(idx_b)
    result = np.empty(n, dtype=arr.dtype)
    for i in range(n):
        q, r = divmod(idx_b[i], k)
        result[i] = arr[r + k * idx_a[q]]
    return result

def take2s_at_reidx_nb(arr, idx_a, idx_b):
    # use `np.int32` as a proxy of `("U", 1)` because of Numba support
    return apply_cat_idx_nb(arr.view(np.int32), idx_a, idx_b, len(arr[0])).view(("U", len(idx_b)))[0]


目前还不清楚内部使用np.uint8是否会为更优化的方法带来任何速度优势。
而如果输入使用bytes而不是str,则此操作通常比TODO快(快得多?

更长的答案

实际上,在速度方面,NumPy数组应该是解决这个问题的最方便的容器,特别是对于非常大的输入。
然而,不需要来回地处理字符串(或字节):你可以只在数组本身上应用索引,或者用np.ndarray.view()来适当地查看它,这是一个非常便宜的操作。同样的操作也可以用来再次查看str的数据。
假设OP的代码可以由以下函数总结:

import numpy as np

def take2s_OP(arr, idx_a, idx_b):
    tmp_str = "".join(arr[idx_a])
    tmp_arr = np.array(list(tmp_str))
    return "".join(tmp_arr[idx_b])


基于np.ndarray.view()的解决方案如下所示:

def take2s_np(arr, idx_a, idx_b):
    return arr[idx_a].view(("U", 1))[idx_b].view(("U", len(idx_b)))[0]


上面的代码确实具有处理Unicode字符串(而不仅仅是ASCII)的优点,并且不需要事先进行输入操作。
现在,仍然有一个相对昂贵的(O(n))计算:高级索引,在上面的代码中执行了两次。
通过使用字符串具有相同长度k的事实,我们可以将单个索引idx_ab写为两个idx_a的组合,后跟idx_b

r, q = divmod(idx_b, k)
idx_ab = r + k * idx_a[q]
arr[idx_ab] == arr[idx_a][idx_b]


使用NumPy,它看起来像这样:

def take2s_reidx_np(arr, idx_a, idx_b):
    k = len(arr[0])
    q, r = np.divmod(idx_b, k)
    return arr.view(("U", 1))[r + k * idx_a[q]].view(("U", len(idx_b)))[0]


然而,上面仍然创建了一个大的临时数组来存储只使用一次的索引。在旅途中生成和访问索引会更有效,因此不需要大的临时对象。为此,可以编写一些显式循环来使用Numba加速:

import numpy as np
import numba as nb

@nb.njit(fastmath=True)
def apply_cat_idx_nb(arr, idx_a, idx_b, k):
    n = len(idx_b)
    result = np.empty(n, dtype=arr.dtype)
    for i in range(n):
        q, r = divmod(idx_b[i], k)
        result[i] = arr[r + k * idx_a[q]]
    return result

def take2s_at_reidx_nb(arr, idx_a, idx_b):
    # use `np.int32` as a proxy of `("U", 1)` because of Numba support
    return apply_cat_idx_nb(arr.view(np.int32), idx_a, idx_b, len(arr[0])).view(("U", len(idx_b)))[0]


请注意,原始数组最初被视为np.int32(与Unicode数据类型大小匹配),因为Numba仍然不支持创建Unicode数组。
由于高级索引是NumPy中的一个高度优化的功能,因此仍然使用它和临时数组来存储索引可能会更有效,但是使用Numba加速代码来创建它们:

@nb.njit(fastmath=True)
def cat_idx_nb(idx_a, idx_b, k):
    n = len(idx_b)
    result = np.empty(n, dtype=idx_b.dtype)
    for i in range(n):
        q, r = divmod(idx_b[i], k)
        result[i] = r + k * idx_a[q]
    return result

def take2s_reidx_nb(arr, idx_a, idx_b):
    idx_ab = cat_idx_nb(idx_a, idx_b, len(arr[0]))
    return arr.view(("U", 1))[idx_ab].view(("U", len(idx_b)))[0]


最终,使用bytes而不是str会更快(但仅限于ASCII字符)。
从NumPy字符数组到bytes,可以简单地使用np.ndarray.tobytes()方法。
上文提出的办法如下:

def take2b_OP(arr, idx_a, idx_b):
    tmp_bstr = b"".join(arr[idx_a])
    tmp_arr = np.array(list(tmp_bstr), dtype=np.uint8)
    return b"".join(tmp_arr[idx_b])

def take2b_np(arr, idx_a, idx_b):
    return arr[idx_a].view(np.uint8)[idx_b].tobytes()

def take2b_reidx_np(arr, idx_a, idx_b):
    k = len(arr[0])
    q, r = np.divmod(idx_b, k)
    return arr.view(np.uint8)[r + k * idx_a[q]].tobytes()

def take2b_reidx_nb(arr, idx_a, idx_b):
    idx_ab = cat_idx_nb(idx_a, idx_b, len(arr[0]))
    return arr.view(np.uint8)[idx_ab].tobytes()

def take2b_at_reidx_nb(arr, idx_a, idx_b):
    return apply_cat_idx_nb(arr.view(np.uint8), idx_a, idx_b, len(arr[0])).tobytes()


如果你的字符串只包含ASCII字符,你也可以将输入NumPy数组中大小为N的Unicode数据视为np.uint8,而不是大小为1的Unicode数据。必须注意确保np.uint8项(1字节)与Unicode项(4字节)对齐,所以需要每4个字符读取一次,但这是一个简单的切片:

def take2sbs_np(arr, idx_a, idx_b):
    return arr[idx_a].view(np.uint8)[::4][idx_b].tobytes().decode()


在假设只有ASCII字符串的情况下,可以重写这些字符串以在内部使用np.uint8,只要考虑到NumPy中Unicode数据类型与np.uint8相比的填充不匹配,(Unicode数据类型使用每个字符4个字节),并使用不同的方法将数据转换回字符串(本质上是将np.ndarray.tobytes()str.decode()方法结合起来,以返回str),例如:

def take2s_reidx_u8_np(arr, idx_a, idx_b):
    k = len(arr[0])
    q, r = np.divmod(idx_b, k)
    return arr.view(np.uint8)[::4][r + k * idx_a[q]].tobytes().decode()

def take2s_reidx_u8_nb(arr, idx_a, idx_b):
    idx_ab = cat_idx_nb(idx_a, idx_b, len(arr[0]))
    return arr.view(np.uint8)[::4][idx_ab].tobytes().decode()


目前还不清楚在内部使用np.uint8是否会为更优化的方法带来任何速度优势。
为了完整起见,我还包括两个基于@KarlKnechtel方法的解决方案(一个用于str,一个用于bytes):

def take2s_2d(arr, idx_a, idx_b):
    arr_2d = np.array([list(x.encode()) for x in arr], dtype=np.uint8)
    return bytes(arr_2d[idx_a, :].ravel()[idx_b]).decode()
def take2b_2d(arr, idx_a, idx_b):
    arr_2d = np.array([list(x) for x in arr], dtype=np.uint8)
    return bytes(arr_2d[idx_a, :].ravel()[idx_b])

其具有与take2b_np()类似的精神,但确实需要一些相对较重的输入操作。
为了检查这些方法是否都是等效的,这里报告应用于OP的输入:

s_funcs = take2s_OP, take2s_np, take2s_2d, take2s_reidx_np, take2s_reidx_nb, take2s_at_reidx_nb, take2s_reidx_u8_np, take2s_reidx_u8_nb
b_funcs = take2b_OP, take2b_np, take2b_2d, take2b_reidx_np, take2b_reidx_nb, take2b_at_reidx_nb

X = ["abc", "def", "ghi", "jkl"]
Xs = np.array(X)
Xb = np.array([x.encode() for x in X])
J = np.array([0, 3])
p = np.array([0, 3, 2, 5, 1, 4])

for func in s_funcs:
    print(f"{func.__name__:>20s}  {func(Xs, J, p)}")

for func in b_funcs:
    print(f"{func.__name__:>20s}  {func(Xb, J, p)}")

要查看哪种方法在大输入时更快,可以使用用途:

import string

M = 1000  # string length
n = 500_000
p = n // 2
q = n // 2

idx_a = np.random.randint(0, n, p)
idx_b = np.random.randint(0, M * p, q)
arr_s = np.random.choice(list(string.ascii_letters), (o * M)).view(("U", M))
base = take2s_np2(arr_s, idx_a, idx_b)
print(f"`str` funcs, n={len(arr_s)}")
for func in s_funcs:
    res = func(arr_s, idx_a, idx_b)
    print(f"{func.__name__:>20s}  {base == res}  ", end="")
    %timeit func(arr_s, idx_a, idx_b)
# `str` funcs, n=500000
#            take2s_OP  True  38.1 s ± 989 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#            take2s_np  True  591 ms ± 40.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#            take2s_2d  True  31.7 s ± 1.49 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
#      take2s_reidx_np  True  15.9 ms ± 702 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#      take2s_reidx_nb  True  12.4 ms ± 580 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#   take2s_at_reidx_nb  True  18.4 ms ± 3.4 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
#   take2s_reidx_u8_np  True  14.5 ms ± 666 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#   take2s_reidx_u8_nb  True  11.8 ms ± 1.27 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
arr_b = np.array([x.encode() for x in arr_s])
base = take2b_np2(arr_b, idx_a, idx_b)
print(f"`bytes` funcs, n={len(arr_b)}")
for func in b_funcs:
    res = func(arr_b, idx_a, idx_b)
    print(f"{func.__name__:>20s}  {base == res}  ", end="")
    %timeit func(arr_b, idx_a, idx_b)
# `bytes` funcs, n=500000
#            take2b_OP  True  14.4 s ± 1.34 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
#            take2b_np  True  150 ms ± 9.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
#            take2b_2d  True  30.7 s ± 1.22 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
#      take2b_reidx_np  True  13 ms ± 722 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#      take2b_reidx_nb  True  9.13 ms ± 686 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#   take2b_at_reidx_nb  True  12.8 ms ± 1.12 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

请注意,我使用N = 500_000,因为较大的值会耗尽系统内存。

更多基准测试

要了解不同的方法如何处理不同的输入大小,可以使用以下函数,其中narr的大小,pidx_a的大小,qidx_b的大小:

import string
import pandas as pd
import matplotlib.pyplot as plt

def benchmark(
    funcs,
    ii=range(2, 15, 1),
    m=16,
    is_equal=lambda a, b: all(x == y for x, y in zip(a, b)),
    seed=0,
    unit="ms",
    verbose=True,
    use_str=True
):
    labels = [func.__name__ for func in funcs]
    units = {"s": 0, "ms": 3, "µs": 6, "ns": 9}
    assert unit in units
    np.random.seed(seed)

    M = 1000  # string length
    timings = {}
    for i in ii:
        n = 2 ** i
        p = n // 2
        q = n // 2
        if verbose:
            print(f"i={i}, n={n}, m={m}, k={k}  (p={p}, q={q})")
        alph = string.ascii_letters
        if use_str:
            arr = np.random.choice(list(string.ascii_letters), (n * M)).view(("U", M))
        else:
            arr = np.random.choice(list(string.ascii_letters.encode()), (n * M)).astype(np.uint8).view(("S", M))
        idx_a = np.random.randint(0, n, p)
        idx_b = np.random.randint(0, M * p, q)
        base = funcs[0](arr, idx_a, idx_b)
        timings[n] = []
        for func in funcs:
            res = func(arr, idx_a, idx_b)
            is_good = is_equal(base, res)
            timed = %timeit -n 16 -r 4 -q -o [func(arr, idx_a, idx_b) for arr in arrs]
            timing = timed.best / k
            timings[n].append(timing if is_good else None)
            if verbose:
                print(
                    f"{func.__name__:>24}"
                    f"  {is_good!s:5}"
                    f"  {timing * (10 ** units[unit]):10.3f} {unit}"
                    f"  {timings[n][0] / timing:6.1f}x")
    return timings, labels

def plot(timings, labels, title=None, xlabel="Input Size / #", unit="ms"):
    n_rows = 1
    n_cols = 3
    fig, axs = plt.subplots(n_rows, n_cols, figsize=(8 * n_cols, 6 * n_rows), squeeze=False)
    units = {"s": 0, "ms": 3, "µs": 6, "ns": 9}
    df = pd.DataFrame(data=timings, index=labels).transpose()
    
    base = df[[labels[0]]].to_numpy()
    (df * 10 ** units[unit]).plot(marker="o", xlabel=xlabel, ylabel=f"Best timing / {unit}", ax=axs[0, 0])
    (df / base * 100).plot(marker='o', xlabel=xlabel, ylabel='Relative speed / %', logx=True, ax=axs[0, 1])
    (base / df).plot(marker='o', xlabel=xlabel, ylabel='Speed Gain / x', ax=axs[0, 2])

    if title:
        fig.suptitle(title)
    fig.patch.set_facecolor('white')

被称为:

s_timings, s_labels = benchmark(s_funcs, unit="ms", use_str=True, verbose=True)
plot(s_timings, s_labels, "Benchmark with `str`", unit="ms")

b_timings, b_labels = benchmark(b_funcs, unit="ms", use_str=False, verbose=True)
plot(b_timings, b_labels, "Benchmark with `bytes`", unit="ms")

得到以下图:
100d 1xx 1c 1d 1x的字符串
指出:

  • 基于重新索引的方法导致 * 快得多 * 的执行速度(对于~2000个字符的适度输入大小,~ 4000倍或更高)
  • take2*_np()方法比OP快得多,但比重新索引慢得多
  • 与单独执行两者相比,将重新索引与高级索引一起执行似乎通常是相同的
  • 不清楚将np.uint8用于str输入是否有益。
  • 使用bytes似乎通常更快
  • take2s_2d()似乎比take2s_OP()稍微快一些
  • take2b_2d()似乎比take2b_OP()慢得多
  • take2b_2d()take2s_2d()看起来基本上是一样的。

这意味着在take2*_2d()中对输入的“准备”给整体方法增加了显著的损失,否则它应该与take2b_np()具有类似的性能。

ymzxtsji

ymzxtsji2#

是的,字符串的长度相同(大约为1000)。您可以假设它们仅限于英文字母
在这些条件下,我希望Numpy能提供相当大的优势(我还假设X可以被预处理,并将以不同的Jp值重用。
X表示为一个2D字节数组(实际上是8位数值,而不是Python的bytes类型):

X = np.array([list(b'abc'), list(b'def'), list(b'ghi'), list(b'jkl')], dtype=np.uint8)

字符串
通过适当的切片然后使用ravel进行连接:

J = (0, 3)
Y = X[J,:].ravel()


像以前一样置换;然后不用join来连接字节(bytes类型有一个等价的类型,但它是多余的),直接将数组传递给bytes构造函数:

p = (0, 3, 2, 5, 1, 4)
Z = bytes(Y[p,]) # b'ajclbk'


如果有必要,我们可以将其.decode到字符串的末尾,就像我们在创建Xencode一样。

相关问题