如何始终从Go通道获取最新值?

gev0vcfq  于 2022-12-20  发布在  Go
关注(0)|答案(7)|浏览(203)

我从Go语言开始,现在编写一个简单的程序,从传感器中读取数据,并将其输入通道进行计算,现在它的工作原理如下:

package main

import (
    "fmt"
    "time"
    "strconv"
)

func get_sensor_data(c chan float64) {
    time.Sleep(1 * time.Second)  // wait a second before sensor data starts pooring in
    c <- 2.1  // Sensor data starts being generated
    c <- 2.2
    c <- 2.3
    c <- 2.4
    c <- 2.5
}

func main() {

    s := 1.1

    c := make(chan float64)
    go get_sensor_data(c)

    for {
        select {
        case s = <-c:
            fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
        default:
            // no new values in the channel
        }
        fmt.Println(s)

        time.Sleep(500 * time.Millisecond)  // Do heavy "work"
    }
}

这种方法工作正常,但传感器会产生大量数据,我总是只对最新的数据感兴趣。然而,在这种设置下,它只会在每个循环中读取下一项,这意味着如果通道在某个点包含20个值,则最新的值只会在10秒后读取。
有没有一种方法可以让通道一次只包含一个值,这样我就只得到我感兴趣的数据,而通道不会使用不必要的内存(尽管内存是我最不担心的)?

ljsrvy3e

ljsrvy3e1#

通道最好被认为是队列(FIFO)。因此你不能跳过它。但是有一些库可以做类似这样的事情:https://github.com/cloudfoundry/go-diodes是一个原子环形缓冲区,它将覆盖旧数据。如果愿意,您可以设置一个较小的大小。
尽管如此,听起来您并不需要队列(或环形缓冲区),您只需要一个互斥锁:

type SensorData struct{
  mu sync.RWMutex
  last float64
}

func (d *SensorData) Store(data float64) {
 mu.Lock()
 defer mu.Unlock()

 d.last = data
}

func (d *SensorData) Get() float64 {
 mu.RLock()
 defer mu.RUnlock()

 return d.last
}

这使用了一个RWMutex,这意味着许多东西可以同时从它读取,而只有一个东西可以写入。

2uluyalo

2uluyalo2#

不。通道是FIFO缓冲区,句号。这就是通道的工作方式,也是它们唯一的用途。如果你只想要最新的值,考虑只使用一个受互斥保护的变量;每当有新数据进入时就向其写入,每当读取它时,总是阅读最新的值。

4si2a6ki

4si2a6ki3#

通道有特定的用途。您可能希望使用锁内部的代码,并在设置新值时更新变量。
这样,接收方将始终获得最新的值。

js81xvg6

js81xvg64#

您无法直接从一个通道获得该值,但可以为每个值使用一个通道,并在出现新值时得到通知:

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

type LatestChannel struct {
    n    float64
    next chan struct{}
    mu   sync.Mutex
}

func New() *LatestChannel {
    return &LatestChannel{next: make(chan struct{})}
}

func (c *LatestChannel) Push(n float64) {
    c.mu.Lock()
    c.n = n
    old := c.next
    c.next = make(chan struct{})
    c.mu.Unlock()
    close(old)
}

func (c *LatestChannel) Get() (float64, <-chan struct{}) {
    c.mu.Lock()
    n := c.n
    next := c.next
    c.mu.Unlock()
    return n, next
}

func getSensorData(c *LatestChannel) {
    time.Sleep(1 * time.Second)
    c.Push(2.1)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.2)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.3)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.4)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.5)
}

func main() {
    s := 1.1

    c := New()
    _, hasNext := c.Get()
    go getSensorData(c)

    for {
        select {
        case <-hasNext:
            s, hasNext = c.Get()
            fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
        default:
            // no new values in the channel
        }
        fmt.Println(s)

        time.Sleep(250 * time.Millisecond) // Do heavy "work"
    }
}

如果不需要通知新值,可以尝试读取Channels inside channels pattern in Golang

wswtfjt7

wswtfjt75#

试用此软件包https://github.com/subbuv26/chanup
它允许生产者用最新的值更新通道,最新的值替换最新的值。并且生产者不会被阻塞。(这样,过时的值会被覆盖)。因此,在消费者端,总是只有最新的项被读取。

import "github.com/subbuv26/chanup"
ch := chanup.GetChan()
_ := ch.Put(testType{
    a: 10,
    s: "Sample",
})
_ := ch.Update(testType{
    a: 20,
    s: "Sample2",
})
// Continue updating with latest values
...

...
// On consumer end
val := ch.Get()
// val contains latest value
t5zmwmid

t5zmwmid6#

解决这个问题还有另一种方法(窍门)

发送方工作速度更快:如果channel_length〉1,则发送方删除信道

go func() {
    for {
        msg:=strconv.Itoa(int(time.Now().Unix()))
        fmt.Println("make: ",msg," at:",time.Now())
        messages <- msg
        if len(messages)>1{
            //remove old message
            <-messages
        }
        time.Sleep(2*time.Second)
    }
}()

接收器工作较慢:

go func() {
    for {
        channLen :=len(messages)
        fmt.Println("len is ",channLen)
        fmt.Println("received",<-messages)
        time.Sleep(10*time.Second)
    }
}()

或者,我们可以从接收方删除旧消息(阅读消息,如删除它)

q43xntqr

q43xntqr7#

这里有一个优雅的通道解决方案,如果你不介意再增加一个通道和goroutine,你可以引入一个无缓冲通道和一个goroutine,它会尝试把最新的值从你的通道发送给它:

package main

import (
    "fmt"
    "time"
)

func wrapLatest(ch <-chan int) <-chan int {
    result := make(chan int) // important that this one i unbuffered
    go func() {
        defer close(result)
        value, ok := <-ch
        if !ok {
            return
        }

    LOOP:
        for {
            select {
            case value, ok = <-ch:
                if !ok {
                    return
                }
            default:
                break LOOP
            }
        }

        for {
            select {
            case value, ok = <-ch:
                if !ok {
                    return
                }
            case result <- value:
                if value, ok = <-ch; !ok {
                    return
                }
            }
        }
    }()

    return result
}

func main() {
    sendChan := make(chan int, 10) // may be buffered or not
    for i := 0; i < 10; i++ {
        sendChan <- i
    }
    go func() {
        for i := 10; i < 20; i++ {
            sendChan <- i
            time.Sleep(time.Second)
        }
        close(sendChan)
    }()
    recvChan := wrapLatest(sendChan)
    for i := range recvChan {
        fmt.Println(i)
        time.Sleep(time.Second * 2)
    }

}

相关问题