numpy Lru_cache(来自functools)是如何工作的?

l0oc07j2  于 2023-03-30  发布在  其他
关注(0)|答案(3)|浏览(178)

特别是当使用递归代码时,lru_cache有很大的改进。我知道缓存是一个存储数据的空间,这些数据必须被快速处理,并避免计算机重新计算。

functools中的Pythonlru_cache在内部是如何工作的?
我正在寻找一个具体的答案,它是否像Python的其他部分一样使用字典?它是否只存储return值?
我知道Python是建立在字典之上的,但是,我找不到这个问题的具体答案。

cnwbcb6i

cnwbcb6i1#

functools的源代码可以在这里找到:https://github.com/python/cpython/blob/master/Lib/functools.py
lru_cache使用_lru_cache_wrapper装饰器(带参数模式的python装饰器),它有一个cache字典 * 在上下文 * 中,它保存了所调用函数的返回值(每个装饰的函数都有自己的缓存字典)。字典键是由_make_key函数从参数中生成的。在下面添加了一些粗体注解:

# ACCORDING TO PASSED maxsize ARGUMENT _lru_cache_wrapper
# DEFINES AND RETURNS ONE OF wrapper DECORATORS

def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
    # Constants shared by all lru cache instances:
    sentinel = object()      # unique object used to signal cache misses

    cache = {}                                # RESULTS SAVES HERE
    cache_get = cache.get    # bound method to lookup a key or return None

    # ... maxsize is None:

    def wrapper(*args, **kwds):
        # Simple caching without ordering or size limit
        nonlocal hits, misses
        key = make_key(args, kwds, typed)     # BUILD A KEY FROM ARGUMENTS
        result = cache_get(key, sentinel)     # TRYING TO GET PREVIOUS CALLS RESULT
        if result is not sentinel:            # ALREADY CALLED WITH PASSED ARGS
            hits += 1
            return result                     # RETURN SAVED RESULT
                                              # WITHOUT ACTUALLY CALLING FUNCTION
        misses += 1
        result = user_function(*args, **kwds) # FUNCTION CALL - if cache[key] empty
        cache[key] = result                   # SAVE RESULT

        return result
    # ...

    return wrapper
slsn1g29

slsn1g292#

LRU缓存的Python 3.9源代码:https://github.com/python/cpython/blob/3.9/Lib/functools.py#L429
示例Fib代码

@lru_cache(maxsize=2)
def fib(n):
    if n == 0:
        return 0
    if n == 1:
        return 1
    return fib(n - 1) + fib(n - 2)

LRU缓存装饰器检查一些基本情况,然后用wrapper _lru_cache_wrapper Package 用户函数。在 Package 器内部,发生向该高速缓存添加项的逻辑,LRU逻辑,即向循环队列添加新项,从循环队列中删除项。

def lru_cache(maxsize=128, typed=False):
...
    if isinstance(maxsize, int):
        # Negative maxsize is treated as 0
        if maxsize < 0:
            maxsize = 0
    elif callable(maxsize) and isinstance(typed, bool):
        # The user_function was passed in directly via the maxsize argument
        user_function, maxsize = maxsize, 128
        wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
        wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
        return update_wrapper(wrapper, user_function)
    elif maxsize is not None:
        raise TypeError(
         'Expected first argument to be an integer, a callable, or None')

    def decorating_function(user_function):
        wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
        wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
        return update_wrapper(wrapper, user_function)

    return decorating_function

lru_cache规范化maxsize(when negative),添加CacheInfo细节,最后添加 Package 器并更新装饰器文档和其他细节。

lru_cache_wrapper

  • LRU Cache Package 器具有很少的簿记变量。
sentinel = object()          # unique object used to signal cache misses
 make_key = _make_key         # build a key from the function arguments
 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3   # names for the link fields

 cache = {}
 hits = misses = 0
 full = False
 cache_get = cache.get    # bound method to lookup a key or return None
 cache_len = cache.__len__  # get cache size without calling len()
 lock = RLock()           # because linkedlist updates aren't threadsafe
 root = []                # root of the circular doubly linked list
 root[:] = [root, root, None, None]     # initialize by pointing to self
  • Package 器在执行任何操作之前获取锁。
  • 几个重要的变量-根列表包含所有符合maxsize值的项。要记住的重要概念是根在上一个(0)和下一个位置(1)中自引用自身(root[:] = [root, root, None, None])

三项高级检查

  • 第一种情况是,当maxsize为0时,意味着没有缓存功能, Package 器 Package 没有缓存功能的用户函数。 Package 器增加缓存未命中计数并返回结果。
def wrapper(*args, **kwds):
     # No caching -- just a statistics update
     nonlocal misses
     misses += 1
     result = user_function(*args, **kwds)
     return result
  • 第二种情况。当maxsize为None时。在节中,该高速缓存中存储的元素数量没有限制。因此 Package 器检查缓存中的键(字典)。当键存在时, Package 器返回值并更新该高速缓存命中信息。并且当键缺失时, Package 器调用具有用户传递的参数的用户函数,更新缓存,更新该高速缓存未命中信息,并返回结果。
def wrapper(*args, **kwds):
     # Simple caching without ordering or size limit
     nonlocal hits, misses
     key = make_key(args, kwds, typed)
     result = cache_get(key, sentinel)
     if result is not sentinel:
         hits += 1
         return result
     misses += 1
     result = user_function(*args, **kwds)
     cache[key] = result
     return result
  • 第三种情况,当maxsize是默认值(128)或用户传递的整数值时。这里是实际的LRU缓存实现。整个代码在 Package 器中以线程安全的方式。在执行任何操作之前,从该高速缓存中读取/写入/删除, Package 器获得RLock。

LRU缓存

  • 该高速缓存中的值存储为四项列表(记住根)。第一项是对前一项的引用,第二项是对下一项的引用,第三项是特定函数调用的键,第四项是结果。这里是斐波那契函数参数1 [[[...], [...], 1, 1], [[...], [...], 1, 1], None, None]的实际值。[...]表示对self(列表)的引用。
  • 第一个检查是该高速缓存命中。如果是,缓存中的值是四个值的列表。
nonlocal root, hits, misses, full
 key = make_key(args, kwds, typed)
 with lock:
     link = cache_get(key)
      if link is not None:
          # Move the link to the front of the circular queue
          print(f'Cache hit for {key}, {root}')
          link_prev, link_next, _key, result = link
          link_prev[NEXT] = link_next
          link_next[PREV] = link_prev
          last = root[PREV]
          last[NEXT] = root[PREV] = link
          link[PREV] = last
          link[NEXT] = root
          hits += 1
          return result

当项目已经在该高速缓存中时,不需要检查循环队列是否已满或者从缓存中弹出项目,而是改变循环队列中项目的位置,由于最近使用的项目总是在最上面代码移动到队列顶部的最近值,并且前一个顶部项目成为当前项目last[NEXT] = root[PREV] = linklink[PREV] = lastlink[NEXT] = root的下一个. NEXT和PREV在顶部初始化,指向列表PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields中的适当位置。最后,递增该高速缓存命中信息并返回结果。

  • 缓存未命中时,更新未命中信息,代码检查三种情况,三种操作都发生在获取RLock之后,源代码中的三种情况顺序如下--获取锁密钥后在该高速缓存中找到,缓存已满,缓存可以取新项,为了演示,我们按照顺序,缓存未满,缓存已满,并且在获取锁之后,密钥在该高速缓存中可用。

该高速缓存未满时

...
    else:
        # Put result in a new link at the front of the queue.
        last = root[PREV]
        link = [last, root, key, result]
        last[NEXT] = root[PREV] = cache[key] = link
        # Use the cache_len bound method instead of the len() function
        # which could potentially be wrapped in an lru_cache itself.
        full = (cache_len() >= maxsize)
  • 当该高速缓存未满时,准备最近的result(link = [last, root, key, result])以包含根的先前引用、根、键和计算结果。
  • 然后将最近的结果(链接)指向循环队列的顶部(root[PREV] = link),root的上一个项的下一个指向最近的结果(last[NEXT]=link),并将最近的结果添加该高速缓存(cache[key] = link)。
  • 最后,检查该高速缓存是否已满(cache_len() >= maxsize and cache_len = cache.__len__ is declared in the top),并将状态设置为已满。
  • 对于fib示例,当函数接收到第一个值1时,根为空,根值为[[...], [...], None, None],将结果添加到循环队列后,根值为[[[...], [...], 1, 1], [[...], [...], 1, 1], None, None]。上一个和下一个都指向键1的结果。对于下一个值0,插入后根值为

[[[[...], [...], 1, 1], [...], 0, 0], [[...], [[...], [...], 0, 0], 1, 1], None, None],上一个为[[[[...], [...], None, None], [...], 1, 1], [[...], [[...], [...], 1, 1], None, None], 0, 0],下一个为[[[[...], [...], 0, 0], [...], None, None], [[...], [[...], [...], None, None], 0, 0], 1, 1]

该高速缓存满时

...
    elif full:
        # Use the old root to store the new key and result.
        oldroot = root
        oldroot[KEY] = key
        oldroot[RESULT] = result
        # Empty the oldest link and make it the new root.
        # Keep a reference to the old key and old result to
        # prevent their ref counts from going to zero during the
        # update. That will prevent potentially arbitrary object
        # clean-up code (i.e. __del__) from running while we're
        # still adjusting the links.
        root = oldroot[NEXT]
        oldkey = root[KEY]
        oldresult = root[RESULT]
        root[KEY] = root[RESULT] = None
        # Now update the cache dictionary.
        del cache[oldkey]
        # Save the potentially reentrant cache[key] assignment
        # for last, after the root and links have been put in
        # a consistent state.
        cache[key] = oldroot
  • 当该高速缓存已满时,使用根作为oldroot(oldroot=root)并更新键和结果。
  • 然后将oldroot的下一项作为新的根(root=oldroot[NEXT]),复制新的根键和结果(oldkey = root[KEY] and oldresult = root[RESULT])。
  • 将新的根键和结果设置为None(root[KEY] = root[RESULT] = None)。
  • 从该高速缓存(del cache[oldkey])中删除旧键的项,并将计算结果添加到缓存(cache[key] = oldroot)中。
  • 对于斐波那契的例子,当该高速缓存已满,键为2时,根值为[[[[...], [...], 1, 1], [...], 0, 0], [[...], [[...], [...], 0, 0], 1, 1], None, None],块末尾的新根为[[[[...], [...], 0, 0], [...], 2, 1], [[...], [[...], [...], 2, 1], 0, 0], None, None]。正如您所看到的,键1被删除并替换为键2

获取锁后,key出现在该高速缓存中。

if key in cache:
        # Getting here means that this same key was added to the
        # cache while the lock was released.  Since the link
        # update is already done, we need only return the
        # computed result and update the count of misses.
        pass

当键出现在该高速缓存中时,在获得锁之后,另一个线程可能已经将该值入队。所以没有什么要做的, Package 器返回结果。
最后,代码返回结果。在执行该高速缓存未命中部分之前,代码更新缓存未命中信息并调用make_key函数。
注:我不能让嵌套列表缩进工作,因此答案可能看起来有点少的格式。

ngynwnxp

ngynwnxp3#

你可以在这里查看源代码。
本质上,它使用两种数据结构,一个字典将函数参数Map到其结果,以及一个链表来跟踪函数调用历史。
该高速缓存基本上是使用以下代码实现的,这是非常不言自明的。

cache = {}
cache_get = cache.get
....
make_key = _make_key         # build a key from the function arguments
key = make_key(args, kwds, typed)
result = cache_get(key, sentinel)

更新链表的要点是:

elif full:

    oldroot = root
    oldroot[KEY] = key
    oldroot[RESULT] = result

    # update the linked list to pop out the least recent function call information        
    root = oldroot[NEXT]
    oldkey = root[KEY]
    oldresult = root[RESULT]
    root[KEY] = root[RESULT] = None
    ......

相关问题