通道上Golang的select的Python等效项

ubbxdtey  于 2022-12-07  发布在  Go
关注(0)|答案(8)|浏览(156)

Go语言有一个适用于通道的select语句。
select语句允许goroutine等待多个通信操作。
一个select阻塞直到它的一个case可以运行,然后它执行那个case。如果有多个case准备好了,它随机选择一个。
下面的代码是否有Python等价物:

package main

import "fmt"

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            c1 <- i
        }
        quit <- 0
    }()

    go func() {
        for i := 0; i < 2; i++ {
            c2 <- i
        }
    }()

    for {
        select {
        case <-c1:
            fmt.Println("Received value from c1")
        case <-c2:
            fmt.Println("Received value from c2")
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

此程序的输出:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit
zsohkypk

zsohkypk1#

这是一个相当直接的翻译,但是“如果多个就绪,选择哪个”部分的工作方式不同--它只是取第一个进来的。

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

基本的变化是用合并消息的线程来模拟select。如果您要经常使用这种模式,可以编写一些select代码:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

但是...
请注意,这个select并不是go,虽然它对你的程序来说并不重要--一个goroutine可能会在一个通道上发送一个结果,如果我们不总是遍历select直到完成,这个结果会在select中排队并丢失!

h5qlskok

h5qlskok2#

还可以考虑一下Benoit Chesneau的offset library,它是Go语言并发模型向Python的移植,使用了隐藏的纤程。
他在PyCon APAC 2013上就此发表了演讲:

irlmq6kh

irlmq6kh3#

您可以使用multiprocessing.Pipe代替chan,使用threading.Thread代替go,使用select.select代替select
下面是一个用Python重新实现的例子:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread

def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print 'Received value from c1'
        elif which == c2_r:
            c2_r.recv()
            print 'Received value from c2'
        elif which == quit_r and len(ready) == 1:
            quit_r.recv()
            print 'Received value from quit'
            return

if __name__ == '__main__':
    main()

该实现基于@托马斯的实现,但与@Thomas的实现不同,它不会产生额外的线程来执行选择。
在Linux上使用Python 2.7.13进行测试。Windows可能会有不同的行为,因为选择是一个Unixy的事情。
编辑:我添加了len(ready) == 1条件,这样quit只在其他管道被清空后才被处理。这在Go语言中是不需要的,因为通道的大小为零,所以func1不能向quit_w发送消息,直到发送给c1_w的消息被接收 * 之后 *。感谢@Sean佩里的评论。

vatpfxk5

vatpfxk54#

在Python 3.5中,有关键字asyncawait,这使得函数可以在执行中被挂起,从而能够在evenloop上运行而不是在线程上运行。asyncio标准库提供了一个。
为了更直接地MapGo语言阻塞通道和select的行为,你可以使用this small library,这样你的示例代码在Python中看起来就非常相似了。

eimct9ow

eimct9ow5#

是的,goless可以实现所有功能。您可以尝试一下。
玩得开心;-)
以下是一个示例:

c1 = goless.chan()
c2 = goless.chan()

def func1():
    time.sleep(1)
    c1.send('one')
goless.go(func1)

def func2():
    time.sleep(2)
    c2.send('two')
goless.go(func2)

for i in range(2):
    case, val = goless.select([goless.rcase(c1), goless.rcase(c2)])
    print(val)
kiz8lqtg

kiz8lqtg6#

下面是另一个例子,它试图模仿go语法:

from threading import Thread
from Queue import Queue

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    Thread(target=lambda: [c1.put(i) for i in range(10)] or quit.put(0)).start()
    Thread(target=lambda: [c2.put(i) for i in range(2)]).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

main()
xxls0lw8

xxls0lw87#

完整性:Go风格通道(包括工作选择)作为pygolang的一部分提供:

ch1 = chan()    # synchronous channel
ch2 = chan(3)   # channel with buffer of size 3

def _():
    ch1.send('a')
    ch2.send('b')
go(_)

ch1.recv()      # will give 'a'
ch2.recv_()     # will give ('b', True)

_, _rx = select(
    ch1.recv,           # 0
    ch2.recv_,          # 1
    (ch2.send, obj2),   # 2
    default,            # 3
)
if _ == 0:
    # _rx is what was received from ch1
    ...
if _ == 1:
    # _rx is (rx, ok) of what was received from ch2
    ...
if _ == 2:
    # we know obj2 was sent to ch2
    ...
if _ == 3:
    # default case
    ...

offset(参见https://stackoverflow.com/a/19143696/9456786)似乎也很有趣。
不幸是,goless(参见https://stackoverflow.com/a/39269599/9456786)具有弱选择实现,这通过设计does not work properly on synchronous channels来实现。

jgzswidk

jgzswidk8#

这里有几个使用queue.Queuethreading.Thread来模拟选择行为的答案,但这不是必需的。

import queue
import os
import select

class EQueue(queue.Queue):
    def __init__(self, *args, **kwargs)
        self._fd = os.eventfd(flags=0x00004001)
        super().__init__(*args, **kwargs)

    def put(self, *args, **kwargs):
        super().put(*args, **kwargs)
        eventfd_write(self._fd, 1)

    def get(self, *args, **kwargs):
        os.eventfd_read(self._fd)
        super().get(*args, **kwargs)

    def fileno(self):
        return self._fd

    def __del__(self):
        os.close(self._fd)

这就在队列周围添加了一个额外的信号量,更重要的是,这个信号量可以通过文件描述符来访问,这意味着你现在可以用select.select()来等待这个队列,所以上面使用队列和线程的例子可以在没有额外线程的情况下重写:

def main():

    c1 = EQueue(maxsize=0)
    c2 = EQueue(maxsize=0)
    quit = EQueue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    rx, _, _ = select.select([c1, c2, quit], [], []):
        if c1 in rx:
            msg = c1.get()
            print 'Received value from c1'
        elif c2 in rx:
            msg = c2.get()
            print 'Received value from c2'
        elif quit in rx:
            print 'Received value from quit'
            return
main()

这里的main函数非常类似于上面的@alkasm给出的函数,但是没有select的自定义实现,也没有每个队列的线程来将所有单独的队列收集到一个队列中;它依赖于操作系统来告诉您队列何时有可用项。
注意os.eventfd只在Python 3.10中被添加,但是在ctypes中实现它是相当简单的,或者在PyPI上有eventfd包。后者也支持Windows,不像其他选项,用管道模拟eventfds。Python文档声称eventfds只在运行glibc〉= 2.8的Linux系统上可用,但是muslc也支持它们。

相关问题