python-3.x 使用多处理队列处理块中的元素

b09cbbtk  于 2023-04-13  发布在  Python
关注(0)|答案(2)|浏览(141)

bounty还有7天到期。回答此问题可获得+50声望奖励。user3541631希望引起更多关注此问题。

我有一个多重处理队列;通过使用SENTINEL值(一个字符串)来通知队列的结束。

aq =  Queue()

........................
队列中的示例属于A类:

class A:
 id: str
 desc: str

在一个函数中,我从队列aq中获取元素,并分块处理它们。第一个元素(如果只有一个)可以是SENTINEL,没有要处理的元素。

def process:
  chunk_data = []
  all = [
  item = aq.get()
  if not isinstance(item, A):
    return
  chunk_data.append(item.id)
  while item != SENTINEL:
   # start process in chunks
   # adding elements to the chunk list until is full
    while len(chunk_data) < CHUNK_MAX_SIZE: # 50
      item = aq.get()
      if item == SENTINEL:
        break
      chunk_data.append(item.id)
    # the chunk list is full start processing
    chunk_process_ids = process_data(chunk_data) # process chunks
    all.extend(chunk_process_ids)
    # empty chunk list and start again
    chunk_data.clear()

该函数按预期工作,但我认为代码是复杂的,我正在寻找一个简单,更清晰的版本。

42fyovps

42fyovps1#

我更倾向于将代码结构化如下:

def get_chunks():
    chunk_data = []
    while True:
        item = aq.get()
        if item == SENTINEL: # or: if not isinstance(item A):
            break
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk
            chunk_data = []
    # Do we have a "small" chunk?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    for chunk_data in get_chunks():
        all.extend(process_data(chunk_data))

但是对于“writer”来说,最好使用get_chunks,这样就可以将已经制作好的块写入队列。这将导致更少(但更大)的队列访问,这 * 通常 * 会更有效。

下面是一个例子,我假设所有的A示例都在一个列表list_of_a_instances中:

def get_chunks():
    chunk_data = []
    for item in list_of_a_instances: # a list of all the A instances, for example
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk
            chunk_data = []
    # Do we have a "small" chunk?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    while True:
        chunk_data = aq.get()
        if chunk_data == SENTINEL:
            break
        all.extend(process_data(chunk_data))

def writer():
    for chunk_data in get_chunks():
        aq.put(chunk_data)
    aq.put(SENTINEL)
bvuwiixz

bvuwiixz2#

为了遵循DRY principle,这里是我认为是相同逻辑的一个更干净的版本,没有代码重复。请注意,当输入值的类型不符合预期时,引发异常通常比简单地返回更好。

def process():
    all = []
    while True:
        chunk_data = []
        for _ in range(CHUNK_MAX_SIZE):
            if (item := aq.get()) == SENTINEL:
                break
            assert isinstance(item, A)
            chunk_data.append(item.id)
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) < CHUNK_MAX_SIZE:
            break

相关问题