Go语言 如何限制生产者和阅读消费者的信息?

vwhgwdsa  于 2023-05-27  发布在  Go
关注(0)|答案(1)|浏览(181)

我想用Go得到应用程序生产者-消费者(通过信号关闭)。
生产者在队列中不断生成消息,限制为10条。一些消费者阅读并处理该频道。如果队列中的消息数为0,则生产者再次生成10条消息。当接收到停止信号时,生产者停止生成新消息,消费者处理通道中的所有内容。
我发现了一个代码,但不能理解它是否正常工作,因为发现奇怪的事情:
1.为什么在停止程序后,队列中的所有消息都没有被处理,似乎是部分数据丢失了。(截图中,发送了15条消息,但处理了5条)
1.如何正确地将队列限制在10条消息的上限,即必须写入10条消息,等待队列计数器变为0时进行处理,然后再写入10条消息?
1.是否有可能在停止信号后通知生产者,以便他不再向通道生成新消息?(截图中,制作人成功写入队列-12,13,14,15)
结果:

代码示例:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}
hgb9j2n6

hgb9j2n61#

为什么在停止程序后,队列中的所有消息都没有被处理,似乎是部分数据丢失了。
这是因为当ctx完成时,(Consumer).Consume停止从in通道阅读,但go p.Produce()创建的goroutine仍然写入in通道。
下面的演示解决了这个问题并简化了源代码。

备注

  1. Producectx完成时停止。它关闭了in通道。
    1.字段jobsConsumer中删除,工作线程直接从in通道读取。
    1.下面的要求被忽略了,因为它很奇怪。一个常见的行为是,当一个作业产生,而in通道未满时,该作业将立即发送到in通道;当它已满时,发送操作将阻塞,直到从in通道读取作业。
    如果队列中的消息数为0,则生产者再次生成10条消息
package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

相关问题