Golang递归爬虫导致阻塞状态

atmip9wb  于 11个月前  发布在  Go
关注(0)|答案(2)|浏览(106)

我正在尝试用Go语言做一个爬虫,它可以产生X个goroutine。目前我只产生一个goroutine(worker =1),我使用一个通道来发送/阅读它。期望的结果是产生一个具体数量的worker,它们从chan读取作业,每个作业可以在不阻塞goroutine的情况下将更多的作业推送到chan。
但是由于某种原因,goroutine被阻塞了,不能处理递归链接。原因是什么?我应该如何处理这个问题?

const maxDepth = 1

type job struct {
    URL   string
    Depth int
}

type Crawler struct {
    jobs             chan job
    workers          int
    db               db.DB
    timeout          int
    imagesFolderName string
    visited          *sync.Map
    wg               *sync.WaitGroup
}

func NewCrawler(db db.DB, workers int, timeout int, imagesFolderName string) *Crawler {
    return &Crawler{
        jobs:             make(chan job),
        workers:          workers,
        db:               db,
        timeout:          timeout,
        imagesFolderName: imagesFolderName,
        visited:          &sync.Map{},
        wg:               &sync.WaitGroup{},
    }
}

func (c *Crawler) enqueue(j job) {
    c.wg.Add(1)
    c.jobs <- j
}

func (c *Crawler) Start(startingLinks []string) {
    if err := os.MkdirAll("apps/imagecrawler/"+c.imagesFolderName, os.ModePerm); err != nil {
        fmt.Printf("Error creating image directory: %v\n", err)
        os.Exit(1)
    }

    for i := 0; i < c.workers; i++ {
        go func() {
            for j := range c.jobs {
                c.runJob(j)
                c.wg.Done()
            }
        }()
    }

    for _, link := range startingLinks {
        c.enqueue(job{URL: link, Depth: 0})
    }
    c.wg.Wait()
    close(c.jobs)
}

func (c *Crawler) runJob(j job) {
    _, alreadyVisited := c.visited.LoadOrStore(j.URL, true)
    if alreadyVisited {
        return
    }

    pageCtx, pageCancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(c.timeout))
    fmt.Println("Crawling URL", j.URL, "Depth:", j.Depth)
    resp, err := http.Get(j.URL)
    if err != nil {
        fmt.Println("Fetching error:", err)
        resp.Body.Close()
        pageCancel()
        return
    }
    b, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("Reading body: ", err)
        resp.Body.Close()
        pageCancel()
        return
    }
    resp.Body.Close()

    if j.Depth >= maxDepth {
        pageCancel()
        return
    }

    links, err := extracter.ExtractLinks(pageCtx, b, j.URL)
    if err != nil {
        fmt.Println("ExtractLinks Err", err)
        pageCancel()
        return
    }

    for _, link := range links {
        c.enqueue(job{URL: link, Depth: j.Depth + 1})
    }

    pageCancel()
}

字符串

lf3rwulv

lf3rwulv1#

解决方案实际上是将递归链接排队在一个单独的goroutine中,这部分地解决了问题。但是,这会创建/运行更多的goroutine。我希望在整个抓取过程中只创建X并运行goroutine。有其他解决方案吗?我可以在不产生额外goroutine的情况下实现相同的行为吗?

func (c *Crawler) runJob(j job) {
    // ...Same stuff

    for _, link := range links {
        c.wg.Add(1)
        --> go func(link string) {
            c.jobs <- job{URL: link, Depth: j.Depth + 1}
        }(link)
    }
    pageCancel()
}

字符串

fzsnzjdm

fzsnzjdm2#

首先,应该为你的Goroutine生成wait group,在你的例子中,就是你正在生成的那个。其次,Goroutine不像worker那样工作,它们是你在后台产生的线程。如果你想建立某种工作机制,你需要运行一个管理器,就像一个永远的循环,它总是在通道上监听,并为每个作业运行Enqueue,部署一个Goroutine。如果你增加Goroutine的数量,它将改变一次可能需要的大量作业的大小。
你的enqueue函数看起来像这样:

func Enqueue(ctx context.Context, job Job) {
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := processJob(job, ctx); err != nil {
            fmt.Println(fmt.Sprintf("job err, %v \n",err.Error()))
        }
    }()
}

字符串

相关问题