我想用Go得到应用程序生产者-消费者(通过信号关闭)。
生产者在队列中不断生成消息,限制为10条。一些消费者阅读并处理该频道。如果队列中的消息数为0,则生产者再次生成10条消息。当接收到停止信号时,生产者停止生成新消息,消费者处理通道中的所有内容。
我发现了一个代码,但不能理解它是否正常工作,因为发现奇怪的事情:
1.为什么在停止程序后,队列中的所有消息都没有被处理,似乎是部分数据丢失了。(截图中,发送了15条消息,但处理了5条)
1.如何正确地将队列限制在10条消息的上限,即必须写入10条消息,等待队列计数器变为0时进行处理,然后再写入10条消息?
1.是否有可能在停止信号后通知生产者,以便他不再向通道生成新消息?(截图中,制作人成功写入队列-12,13,14,15)
结果:
代码示例:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
in := make(chan int, 10)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(wg, i)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}
type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.jobs {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
fmt.Println("Consumer close channel")
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
}
}
1条答案
按热度按时间hgb9j2n61#
为什么在停止程序后,队列中的所有消息都没有被处理,似乎是部分数据丢失了。
这是因为当
ctx
完成时,(Consumer).Consume
停止从in
通道阅读,但go p.Produce()
创建的goroutine仍然写入in
通道。下面的演示解决了这个问题并简化了源代码。
备注:
Produce
在ctx
完成时停止。它关闭了in
通道。1.字段
jobs
从Consumer
中删除,工作线程直接从in
通道读取。1.下面的要求被忽略了,因为它很奇怪。一个常见的行为是,当一个作业产生,而
in
通道未满时,该作业将立即发送到in
通道;当它已满时,发送操作将阻塞,直到从in
通道读取作业。如果队列中的消息数为0,则生产者再次生成10条消息