Go语言 如何组织并行处理,排除具有相同标记的任务?

dkqlctbz  于 9个月前  发布在  Go
关注(0)|答案(1)|浏览(60)

我有一个可以并行处理的任务流。但是,具有相同标记的任务必须以FIFO顺序处理。

type task struct {
  tag string
  data []int
}

func process (ch chan task) {
  for t := range ch {
    go processTask (t)
  }
}

字符串
我需要添加逻辑来防止并行处理具有相同标记的任务。这些任务必须以FIFO顺序处理。
在99%的情况下,标签是唯一的。在极少数情况下,几个任务可能具有相同的标签。如果我们已经处理了一个标签为X的任务,并且出现了一个标签为X的新任务,我们必须等到处理完前一个任务,然后再开始下一个任务。
如果有多个带有标签X的任务正在等待,则必须按照发送到通道的相同顺序处理这些任务。
不应阻止处理带有其他标记的任务。
推荐的方法是什么?

wsewodh2

wsewodh21#

如果你想只在进程中按标签执行FIFO队列,你可以为每个标签分配一个通道和一个工作器。你可以在Map中跟踪每个标签的通道。
这里有一个实现(https://go.dev/play/p/8r8-oaRaY2x),它非常适合你,你的实际通道消息可以包括一个结果通道。

package main

import (
    "sync"
    "log"
)

type Route[T any] struct {
    Input  chan []T
    Output chan T
    Worker func(<-chan T, *sync.WaitGroup)
}

func (r *Route[T]) Accept(wg *sync.WaitGroup) {
    defer wg.Done()
    for inputs := range r.Input {
        for _, input := range inputs {
            r.Output <- input
        }
    }
    close(r.Output)
}

func (cr *Route[T]) enqueue(messages []T, wg *sync.WaitGroup) {
    defer wg.Done()
    cr.Input <- messages
}

func (r *Route[T]) Start(wg *sync.WaitGroup) *Route[T] {
    wg.Add(2)
    go r.Accept(wg)
    go r.Worker(r.Output, wg)
    return r
}

func (r *Route[T]) Close() {
    close(r.Input)
}

type Router[T any] struct {
    sync.Mutex
    routes map[string]*Route[T]
    wg     sync.WaitGroup
    routers_wg sync.WaitGroup
    routemaker func(tag string)*Route[T]
}

func (cr *Router[_]) Close() {
    for _, r := range cr.routes {
        r.Close()
    }
}

func (cr *Router[_]) Wait() {
    cr.routers_wg.Wait()
    cr.wg.Wait()
}

func (cr *Router[T]) Route(tag string, messages []T, wg *sync.WaitGroup) {
    cr.Lock()
    defer cr.Unlock()
    for m, r := range cr.routes {
        if m == tag {
            go r.enqueue(messages, wg)
            return
        }
    }
    new_route := cr.routemaker(tag).Start(&cr.routers_wg)
    cr.routes[tag] = new_route
    go new_route.enqueue(messages, wg)
}

type worker[T any] interface {
    Work(<-chan T)
}

type collateWork[T any] struct {
    work []T
    tag string
}

func (cw *collateWork[T]) Work(c <-chan T, wg *sync.WaitGroup) {
    defer wg.Done()
    for w := range c {
        cw.work = append(cw.work, w)
    }
    log.Printf("Worker '%s' collation: %v", cw.tag, cw.work)
}

func main() {

    var routes = Router[int]{
        routes: make(map[string]*Route[int]),
        routemaker: func(tag string) *Route[int] {
            return &Route[int]{
                Input:  make(chan []int, 30),
                Output: make(chan int, 30),
                Worker: (&collateWork[int]{tag: tag}).Work,
            }
        },
    }
    var router_waits sync.WaitGroup
    router_waits.Add(6)
    go routes.Route("a", []int{1, 2, 3, 4, 5, 6, 7}, &router_waits)
    go routes.Route("b", []int{10, 20, 30, 40, 50, 60, 70}, &router_waits)
    go routes.Route("a", []int{8, 9, 10, 11, 12, 13, 14, 15}, &router_waits)
    go routes.Route("b", []int{100, 200, 300, 400, 500, 600, 700}, &router_waits)
    go routes.Route("c", []int{1000, 2000, 3000, 4000, 5000, 6000, 7000}, &router_waits)
    go routes.Route("c", []int{8000, 9000 }, &router_waits)
    router_waits.Wait()
    routes.Close()
    routes.Wait()
}

个字符

相关问题