在Go语言中使用通道接收响应并同时写入SQL

2fjabf4q  于 2022-12-07  发布在  Go
关注(0)|答案(1)|浏览(105)

我正在使用Go语言实现一个来自外部API的JSON数据管道,处理消息,然后发送到SQL数据库。
我尝试并发运行API请求,然后在返回一个响应后,我想通过另一个goroutine load()把它插入到数据库中。
在我下面的代码中,有时我会在load()函数中收到log.Printf(),有时我不会。这表明我可能关闭了一个通道或没有正确设置通信。
我尝试的模式如下:

package main

import (
    "encoding/json"
    "io/ioutil"
    "log"
    "net/http"
    "time"
)

type Request struct {
    url string
}

type Response struct {
    status  int
    args    Args    `json:"args"`
    headers Headers `json:"headers"`
    origin  string  `json:"origin"`
    url     string  `json:"url"`
}

type Args struct {
}

type Headers struct {
    accept string `json:"Accept"`
}

func main() {
    start := time.Now()

    numRequests := 5
    responses := make(chan Response, 5)
    defer close(responses)
    for i := 0; i < numRequests; i++ {
        req := Request{url: "https://httpbin.org/get"}
        go func(req *Request) {
            resp, err := extract(req)
            if err != nil {
                log.Fatal("Error extracting data from API")
                return
            }
            // Send response to channel
            responses <- resp
        }(&req)

        // Perform go routine to load data
        go load(responses)
    }

    log.Println("Execution time: ", time.Since(start))
}

func extract(req *Request) (r Response, err error) {
    var resp Response
    request, err := http.NewRequest("GET", req.url, nil)
    if err != nil {
        return resp, err
    }
    request.Header = http.Header{
        "accept": {"application/json"},
    }

    response, err := http.DefaultClient.Do(request)
    defer response.Body.Close()

    if err != nil {
        log.Fatal("Error")
        return resp, err
    }
    // Read response data
    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        log.Fatal("Error")
        return resp, err
    }
    json.Unmarshal(body, &resp)
    resp.status = response.StatusCode

    return resp, nil
}

type Record struct {
    origin string
    url    string
}

func load(ch chan Response) {

    // Read response from channel
    resp := <-ch

    // Process the response data
    records := process(resp)
    log.Printf("%+v\n", records)

    // Load data to db stuff here

}

func process(resp Response) (record Record) {
    // Process the response struct as needed to get a record of data to insert to DB
    return record
}
ep6jt1vc

ep6jt1vc1#

在工作完成之前,程序没有完成保护机制,所以有时候程序会在goroutine完成之前就终止。
要防止出现这种情况,请使用WaitGroup:

wg:=sync.WaitGroup{}
   for i := 0; i < numRequests; i++ {
     ...
     wg.Add(1)
     go func() {
        defer wg.Done()
        load(responses)
     }()
   }
  wg.Wait()

相关问题