单生产者多消费者

当场景为单生产者多消费者的时候,生产者执行结束就可以直接关闭通道,此时消费者自动从通道中拿数据,直到通道为空就退出,不会形成阻塞。

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    // 定义共享数据的通道
    itemCh = make(chan int, 20)
    cwg    sync.WaitGroup
)

func producter(ch chan int) {
    for i := 0; i < 10; i++ {
        time.Sleep(time.Second) // 模拟真实场景的处理时间
        ch <- i
        fmt.Printf("set num %d to ch\n", i)
    }
    close(ch)
}

func consumer(ch chan int, name string) {
    defer cwg.Done()
    for n := range ch {
        time.Sleep(time.Second * 2) // 模拟真实场景的处理时间
        fmt.Printf("%s:get num %d from ch\n", name, n)
    }
}

func main() {
    go producter(itemCh)

    // 启动多个消费者
    for i := 0; i < 5; i++ {
        go consumer(itemCh, fmt.Sprintf("c-%d", i))
        cwg.Add(1)
    }

    cwg.Wait()

    fmt.Println("main done!")
}

多生产者多消费者

下面是一个多生产者和多消费者的场景例子,生产者数量和消费者数量都是不定的,此时需要考虑何时关闭通道,比较好的时机是利用计数器,当生产者的计数器清理则表示所有生产者都执行结束,此时就可以安全的关闭通道。这个例子可以作为多线程爬虫的标准写法参考:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    // 定义共享数据的通道
    itemCh = make(chan int, 20)
    // 分别定义生产者和消费者的计数器
    pwg sync.WaitGroup
    cwg sync.WaitGroup
)

func producter(num int, ch chan int, name string) {
    defer pwg.Done()
    time.Sleep(time.Second) // 模拟真实场景的处理时间
    ch <- num
    fmt.Printf("%s:set num %d to ch\n", name, num)
}

func consumer(ch chan int, name string) {
    defer cwg.Done()
    for n := range ch {
        time.Sleep(time.Second * 2) // 模拟真实场景的处理时间
        fmt.Printf("%s:get num %d from ch\n", name, n)
    }
}

func main() {
    // 启动多个生产者
    for i := 0; i < 10; i++ {
        go producter(i, itemCh, fmt.Sprintf("p-%d", i))
        pwg.Add(1)
    }

    // 启动多个消费者
    for i := 0; i < 5; i++ {
        go consumer(itemCh, fmt.Sprintf("c-%d", i))
        cwg.Add(1)
    }

    // 等待生产者执行结束前必须让消费者和生产者都运行起来
    pwg.Wait()
    close(itemCh) // 等所有生产者执行结束就关闭通道

    cwg.Wait()

    fmt.Println("main done!")
}

参考文档