Skip to content
<

Golang 并发编程

Goroutine(协程)

Goroutine 是 Go 语言的轻量级线程,由 Go 运行时管理。

创建 Goroutine

go
package main

import (
    "fmt"
    "time"
)

func sayHello() {
    for i := 0; i < 5; i++ {
        fmt.Println("Hello", i)
        time.Sleep(100 * time.Millisecond)
    }
}

func sayWorld() {
    for i := 0; i < 5; i++ {
        fmt.Println("World", i)
        time.Sleep(150 * time.Millisecond)
    }
}

func goroutineBasicDemo() {
    // 启动 goroutine
    go sayHello()
    go sayWorld()

    // 主 goroutine 等待
    time.Sleep(1 * time.Second)
    fmt.Println("主程序结束")
}

// 匿名函数 goroutine
func anonymousGoroutine() {
    go func() {
        fmt.Println("匿名 goroutine 执行")
    }()

    // 带参数的匿名 goroutine
    go func(msg string) {
        fmt.Println("消息:", msg)
    }("Hello from goroutine")

    time.Sleep(100 * time.Millisecond)
}

Channel(通道)

Channel 是 goroutine 之间通信的管道。

无缓冲 Channel

go
func unbufferedChannelDemo() {
    // 创建无缓冲 channel
    ch := make(chan int)

    // 发送数据(在 goroutine 中)
    go func() {
        fmt.Println("发送数据: 42")
        ch <- 42 // 发送数据到 channel
    }()

    // 接收数据
    value := <-ch // 从 channel 接收数据
    fmt.Println("接收数据:", value)
}

func channelSyncDemo() {
    done := make(chan bool)

    go func() {
        fmt.Println("执行任务...")
        time.Sleep(500 * time.Millisecond)
        fmt.Println("任务完成")
        done <- true
    }()

    fmt.Println("等待任务完成...")
    <-done // 阻塞,直到接收到数据
    fmt.Println("主程序继续")
}

有缓冲 Channel

go
func bufferedChannelDemo() {
    // 创建容量为 3 的缓冲 channel
    ch := make(chan string, 3)

    // 发送数据(不会阻塞,直到缓冲区满)
    ch <- "消息1"
    ch <- "消息2"
    ch <- "消息3"

    fmt.Println("已发送 3 条消息")

    // 接收数据
    fmt.Println("接收:", <-ch)
    fmt.Println("接收:", <-ch)
    fmt.Println("接收:", <-ch)
}

func producerConsumerDemo() {
    ch := make(chan int, 5)

    // 生产者
    go func() {
        for i := 1; i <= 10; i++ {
            fmt.Printf("生产: %d\n", i)
            ch <- i
            time.Sleep(100 * time.Millisecond)
        }
        close(ch) // 关闭 channel
    }()

    // 消费者
    time.Sleep(200 * time.Millisecond)
    for num := range ch {
        fmt.Printf("消费: %d\n", num)
        time.Sleep(200 * time.Millisecond)
    }
}

Channel 方向

go
// 只发送 channel
func sendOnly(ch chan<- int) {
    ch <- 42
}

// 只接收 channel
func receiveOnly(ch <-chan int) {
    value := <-ch
    fmt.Println("接收:", value)
}

func channelDirectionDemo() {
    ch := make(chan int)

    go sendOnly(ch)
    receiveOnly(ch)
}

关闭 Channel

go
func closeChannelDemo() {
    ch := make(chan int, 3)

    // 发送数据
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch) // 关闭 channel

    // 接收数据
    for value := range ch {
        fmt.Println("接收:", value)
    }

    // 检查 channel 是否关闭
    value, ok := <-ch
    if ok {
        fmt.Println("接收到:", value)
    } else {
        fmt.Println("Channel 已关闭")
    }
}

Select 语句

select 用于处理多个 channel 操作。

go
func selectDemo() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "来自 ch1"
    }()

    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "来自 ch2"
    }()

    // select 等待多个 channel
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("接收:", msg1)
        case msg2 := <-ch2:
            fmt.Println("接收:", msg2)
        }
    }
}

// 带 default 的 select(非阻塞)
func selectDefaultDemo() {
    ch := make(chan int)

    select {
    case value := <-ch:
        fmt.Println("接收:", value)
    default:
        fmt.Println("没有数据可接收")
    }
}

// 超时处理
func selectTimeoutDemo() {
    ch := make(chan string)

    go func() {
        time.Sleep(2 * time.Second)
        ch <- "数据"
    }()

    select {
    case msg := <-ch:
        fmt.Println("接收:", msg)
    case <-time.After(1 * time.Second):
        fmt.Println("超时!")
    }
}

WaitGroup

用于等待一组 goroutine 完成。

go
import "sync"

func waitGroupDemo() {
    var wg sync.WaitGroup

    // 启动 5 个 goroutine
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 增加计数器

        go func(id int) {
            defer wg.Done() // 完成时减少计数器

            fmt.Printf("Worker %d 开始工作\n", id)
            time.Sleep(time.Duration(id*100) * time.Millisecond)
            fmt.Printf("Worker %d 完成工作\n", id)
        }(i)
    }

    fmt.Println("等待所有 worker 完成...")
    wg.Wait() // 阻塞,直到计数器为 0
    fmt.Println("所有 worker 已完成")
}

Mutex(互斥锁)

用于保护共享资源。

go
type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func mutexDemo() {
    counter := &Counter{}
    var wg sync.WaitGroup

    // 启动 1000 个 goroutine 同时增加计数器
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Printf("最终计数: %d\n", counter.Value())
}

RWMutex(读写锁)

允许多个读操作同时进行,但写操作是互斥的。

go
type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        data: make(map[string]int),
    }
}

func (sm *SafeMap) Set(key string, value int) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.data[key] = value
}

func (sm *SafeMap) Get(key string) (int, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    value, ok := sm.data[key]
    return value, ok
}

func rwMutexDemo() {
    safeMap := NewSafeMap()
    var wg sync.WaitGroup

    // 写入数据
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", n)
            safeMap.Set(key, n*10)
            fmt.Printf("写入: %s = %d\n", key, n*10)
        }(i)
    }

    // 读取数据
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", n%10)
            if value, ok := safeMap.Get(key); ok {
                fmt.Printf("读取: %s = %d\n", key, value)
            }
        }(i)
    }

    wg.Wait()
}

Once

确保某个操作只执行一次。

go
var once sync.Once

func initialize() {
    fmt.Println("初始化操作(只执行一次)")
}

func onceDemo() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d 调用 initialize\n", id)
            once.Do(initialize)
        }(i)
    }

    wg.Wait()
}

工作池模式

go
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", id, job)
        time.Sleep(time.Duration(job*100) * time.Millisecond)
        results <- job * 2
        fmt.Printf("Worker %d 完成任务 %d\n", id, job)
    }
}

func workerPoolDemo() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 启动 worker
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for a := 1; a <= numJobs; a++ {
        result := <-results
        fmt.Printf("结果: %d\n", result)
    }
}

Context(上下文)

用于控制 goroutine 的生命周期。

go
import "context"

func contextDemo() {
    // 带取消的 context
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("Goroutine 收到取消信号")
                return
            default:
                fmt.Println("工作中...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }()

    time.Sleep(2 * time.Second)
    fmt.Println("发送取消信号")
    cancel()
    time.Sleep(1 * time.Second)
}

func contextTimeoutDemo() {
    // 带超时的 context
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    select {
    case <-time.After(3 * time.Second):
        fmt.Println("操作完成")
    case <-ctx.Done():
        fmt.Println("超时:", ctx.Err())
    }
}

完整示例:并发下载器

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type DownloadTask struct {
    ID   int
    URL  string
    Size int // MB
}

type Downloader struct {
    workers int
    tasks   chan DownloadTask
    results chan string
    wg      sync.WaitGroup
}

func NewDownloader(workers int) *Downloader {
    return &Downloader{
        workers: workers,
        tasks:   make(chan DownloadTask, 10),
        results: make(chan string, 10),
    }
}

func (d *Downloader) worker(id int) {
    defer d.wg.Done()

    for task := range d.tasks {
        fmt.Printf("[Worker %d] 开始下载: %s (大小: %dMB)\n", id, task.URL, task.Size)

        // 模拟下载时间
        downloadTime := time.Duration(task.Size*100) * time.Millisecond
        time.Sleep(downloadTime)

        result := fmt.Sprintf("任务 %d 完成: %s", task.ID, task.URL)
        d.results <- result

        fmt.Printf("[Worker %d] 完成下载: %s\n", id, task.URL)
    }
}

func (d *Downloader) Start() {
    // 启动 worker
    for i := 1; i <= d.workers; i++ {
        d.wg.Add(1)
        go d.worker(i)
    }
}

func (d *Downloader) AddTask(task DownloadTask) {
    d.tasks <- task
}

func (d *Downloader) Wait() {
    close(d.tasks)
    d.wg.Wait()
    close(d.results)
}

func (d *Downloader) GetResults() []string {
    var results []string
    for result := range d.results {
        results = append(results, result)
    }
    return results
}

func main() {
    fmt.Println("=== 并发下载器 ===")

    downloader := NewDownloader(3)
    downloader.Start()

    // 添加下载任务
    tasks := []DownloadTask{
        {ID: 1, URL: "https://example.com/file1.zip", Size: 5},
        {ID: 2, URL: "https://example.com/file2.zip", Size: 3},
        {ID: 3, URL: "https://example.com/file3.zip", Size: 7},
        {ID: 4, URL: "https://example.com/file4.zip", Size: 4},
        {ID: 5, URL: "https://example.com/file5.zip", Size: 6},
        {ID: 6, URL: "https://example.com/file6.zip", Size: 2},
    }

    for _, task := range tasks {
        downloader.AddTask(task)
    }

    // 等待所有任务完成
    downloader.Wait()

    // 获取结果
    results := downloader.GetResults()
    fmt.Println("\n=== 下载结果 ===")
    for _, result := range results {
        fmt.Println(result)
    }
}

并发模式总结

Fan-Out, Fan-In 模式

go
func fanOutFanIn() {
    // Fan-Out: 一个输入,多个输出
    input := make(chan int)
    outputs := make([]chan int, 3)

    for i := range outputs {
        outputs[i] = make(chan int)
        go func(out chan int) {
            for n := range input {
                out <- n * 2
            }
            close(out)
        }(outputs[i])
    }

    // Fan-In: 多个输入,一个输出
    output := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range outputs {
        wg.Add(1)
        go func(c chan int) {
            defer wg.Done()
            for n := range c {
                output <- n
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(output)
    }()

    // 发送数据
    go func() {
        for i := 1; i <= 5; i++ {
            input <- i
        }
        close(input)
    }()

    // 接收结果
    for result := range output {
        fmt.Println("结果:", result)
    }
}

小结

  • Goroutine:轻量级线程,使用 go 关键字启动
  • Channel:goroutine 间通信的管道
  • 无缓冲 vs 有缓冲:同步 vs 异步通信
  • Select:处理多个 channel 操作
  • WaitGroup:等待多个 goroutine 完成
  • Mutex:互斥锁,保护共享资源
  • RWMutex:读写锁,优化读多写少场景
  • Once:确保操作只执行一次
  • Context:控制 goroutine 生命周期
  • 工作池:限制并发数量,提高资源利用率
  • 并发模式:Fan-Out/Fan-In、Pipeline 等