为什么这段Python线程代码有竞态条件?

snz8szmq  于 2023-06-28  发布在  Python
关注(0)|答案(2)|浏览(104)

下面的代码创建了一个竞态条件:

import threading

ITERS = 100000
x = [0]

def worker():
    for _ in range(ITERS):
        x[0] += 1  # this line creates a race condition
        # because it takes a value, increments and then writes
        # some inrcements can be done together, and lost

def main():
    x[0] = 0  # you may use `global x` instead of this list trick too
    t1 = threading.Thread(target=worker)
    t2 = threading.Thread(target=worker)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

for i in range(5):
    main()
    print(f'iteration {i}. expected x = {ITERS*2}, got {x[0]}')

输出:

$ python3 test.py
iteration 0. expected x = 200000, got 200000
iteration 1. expected x = 200000, got 148115
iteration 2. expected x = 200000, got 155071
iteration 3. expected x = 200000, got 200000
iteration 4. expected x = 200000, got 200000

Python 3版本:

Python 3.9.7 (default, Sep 10 2021, 14:59:43) 
[GCC 11.2.0] on linux

我以为GIL会阻止它,不允许两个线程一起运行,直到它们做一些与io相关的事情或调用C库。至少这是你可以从文档中得出的结论。
结果我错了。那么,GIL实际上是做什么的,线程何时并行运行?

4c8rllxm

4c8rllxm1#

阅读文档更好,我认为有答案:
CPython解释器使用的机制,用于确保一次只有一个线程执行Python字节码。这简化了CPython的实现,使对象模型(包括关键的内置类型,如dict)对并发访问隐式安全。锁定整个解释器使解释器更容易成为多线程的,但代价是多处理器机器提供的大部分并行性。
然而,一些扩展模块,无论是标准的还是第三方的,都被设计成在执行计算密集型任务(如压缩或散列)时释放GIL。此外,GIL在执行I/O时始终被释放。
我猜这意味着每一行源代码都由多个字节码块组成。字节码行/块是原子的,即它们被单独执行,但源代码行不会。
下面是+=1扩展到的字节码(运行dis.dis('x[0] += 1')查看):

0 LOAD_NAME                0 (x)
          2 LOAD_CONST               0 (0)
          4 DUP_TOP_TWO
          6 BINARY_SUBSCR
          8 LOAD_CONST               1 (1)
         10 INPLACE_ADD
         12 ROT_THREE
         14 STORE_SUBSCR
         16 LOAD_CONST               2 (None)
         18 RETURN_VALUE

当这些行以并发方式执行时,会发生争用条件。
所以,吉尔并不能保存你。它只防止可能损坏listdict等复杂结构的竞争条件。

hs1rzwqc

hs1rzwqc2#

根据我们最后的评论,似乎这个问题已经在python version 3.10及以上版本中得到了解决(ubuntu,windows)。不再出现此问题。
然而,存在可以观察到竞争条件的其他场景。例如:

import threading
import time
 
x = 10
 
def increment(by):
    global x
 
    local_counter = x
    local_counter += by
 
    time.sleep(1)
 
    x = local_counter
    print(f'{threading.current_thread().name} inc x {by}, x: {x}')
 
def main():
    # creating threads
    t1 = threading.Thread(target=increment, args=(5,))
    t2 = threading.Thread(target=increment, args=(10,))
   
    # starting the threads
    t1.start()
    t2.start()
   
    # waiting for the threads to complete
    t1.join()
    t2.join()
   
    print(f'The final value of x is {x}')
 
for i in range(10):
    main()

它产生了这个:

Thread-56 (increment) inc x 10, x: 20Thread-55 (increment) inc x 5, x: 15
 
The final value of x is 15
Thread-57 (increment) inc x 5, x: 20Thread-58 (increment) inc x 10, x: 25
 
The final value of x is 25
Thread-60 (increment) inc x 10, x: 35Thread-59 (increment) inc x 5, x: 30
 
The final value of x is 30
Thread-61 (increment) inc x 5, x: 35
Thread-62 (increment) inc x 10, x: 40
The final value of x is 40
Thread-64 (increment) inc x 10, x: 50Thread-63 (increment) inc x 5, x: 45
 
The final value of x is 45

但这里的解决方法是使用asyncio模块来控制代码流。

相关问题