Golang 并发编程
Goroutine(协程)
Goroutine 是 Go 语言的轻量级线程,由 Go 运行时管理。
创建 Goroutine
go
package main
import (
"fmt"
"time"
)
func sayHello() {
for i := 0; i < 5; i++ {
fmt.Println("Hello", i)
time.Sleep(100 * time.Millisecond)
}
}
func sayWorld() {
for i := 0; i < 5; i++ {
fmt.Println("World", i)
time.Sleep(150 * time.Millisecond)
}
}
func goroutineBasicDemo() {
// 启动 goroutine
go sayHello()
go sayWorld()
// 主 goroutine 等待
time.Sleep(1 * time.Second)
fmt.Println("主程序结束")
}
// 匿名函数 goroutine
func anonymousGoroutine() {
go func() {
fmt.Println("匿名 goroutine 执行")
}()
// 带参数的匿名 goroutine
go func(msg string) {
fmt.Println("消息:", msg)
}("Hello from goroutine")
time.Sleep(100 * time.Millisecond)
}Channel(通道)
Channel 是 goroutine 之间通信的管道。
无缓冲 Channel
go
func unbufferedChannelDemo() {
// 创建无缓冲 channel
ch := make(chan int)
// 发送数据(在 goroutine 中)
go func() {
fmt.Println("发送数据: 42")
ch <- 42 // 发送数据到 channel
}()
// 接收数据
value := <-ch // 从 channel 接收数据
fmt.Println("接收数据:", value)
}
func channelSyncDemo() {
done := make(chan bool)
go func() {
fmt.Println("执行任务...")
time.Sleep(500 * time.Millisecond)
fmt.Println("任务完成")
done <- true
}()
fmt.Println("等待任务完成...")
<-done // 阻塞,直到接收到数据
fmt.Println("主程序继续")
}有缓冲 Channel
go
func bufferedChannelDemo() {
// 创建容量为 3 的缓冲 channel
ch := make(chan string, 3)
// 发送数据(不会阻塞,直到缓冲区满)
ch <- "消息1"
ch <- "消息2"
ch <- "消息3"
fmt.Println("已发送 3 条消息")
// 接收数据
fmt.Println("接收:", <-ch)
fmt.Println("接收:", <-ch)
fmt.Println("接收:", <-ch)
}
func producerConsumerDemo() {
ch := make(chan int, 5)
// 生产者
go func() {
for i := 1; i <= 10; i++ {
fmt.Printf("生产: %d\n", i)
ch <- i
time.Sleep(100 * time.Millisecond)
}
close(ch) // 关闭 channel
}()
// 消费者
time.Sleep(200 * time.Millisecond)
for num := range ch {
fmt.Printf("消费: %d\n", num)
time.Sleep(200 * time.Millisecond)
}
}Channel 方向
go
// 只发送 channel
func sendOnly(ch chan<- int) {
ch <- 42
}
// 只接收 channel
func receiveOnly(ch <-chan int) {
value := <-ch
fmt.Println("接收:", value)
}
func channelDirectionDemo() {
ch := make(chan int)
go sendOnly(ch)
receiveOnly(ch)
}关闭 Channel
go
func closeChannelDemo() {
ch := make(chan int, 3)
// 发送数据
ch <- 1
ch <- 2
ch <- 3
close(ch) // 关闭 channel
// 接收数据
for value := range ch {
fmt.Println("接收:", value)
}
// 检查 channel 是否关闭
value, ok := <-ch
if ok {
fmt.Println("接收到:", value)
} else {
fmt.Println("Channel 已关闭")
}
}Select 语句
select 用于处理多个 channel 操作。
go
func selectDemo() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自 ch2"
}()
// select 等待多个 channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("接收:", msg1)
case msg2 := <-ch2:
fmt.Println("接收:", msg2)
}
}
}
// 带 default 的 select(非阻塞)
func selectDefaultDemo() {
ch := make(chan int)
select {
case value := <-ch:
fmt.Println("接收:", value)
default:
fmt.Println("没有数据可接收")
}
}
// 超时处理
func selectTimeoutDemo() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "数据"
}()
select {
case msg := <-ch:
fmt.Println("接收:", msg)
case <-time.After(1 * time.Second):
fmt.Println("超时!")
}
}WaitGroup
用于等待一组 goroutine 完成。
go
import "sync"
func waitGroupDemo() {
var wg sync.WaitGroup
// 启动 5 个 goroutine
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加计数器
go func(id int) {
defer wg.Done() // 完成时减少计数器
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf("Worker %d 完成工作\n", id)
}(i)
}
fmt.Println("等待所有 worker 完成...")
wg.Wait() // 阻塞,直到计数器为 0
fmt.Println("所有 worker 已完成")
}Mutex(互斥锁)
用于保护共享资源。
go
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func mutexDemo() {
counter := &Counter{}
var wg sync.WaitGroup
// 启动 1000 个 goroutine 同时增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("最终计数: %d\n", counter.Value())
}RWMutex(读写锁)
允许多个读操作同时进行,但写操作是互斥的。
go
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func NewSafeMap() *SafeMap {
return &SafeMap{
data: make(map[string]int),
}
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.data[key] = value
}
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
value, ok := sm.data[key]
return value, ok
}
func rwMutexDemo() {
safeMap := NewSafeMap()
var wg sync.WaitGroup
// 写入数据
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
key := fmt.Sprintf("key%d", n)
safeMap.Set(key, n*10)
fmt.Printf("写入: %s = %d\n", key, n*10)
}(i)
}
// 读取数据
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
key := fmt.Sprintf("key%d", n%10)
if value, ok := safeMap.Get(key); ok {
fmt.Printf("读取: %s = %d\n", key, value)
}
}(i)
}
wg.Wait()
}Once
确保某个操作只执行一次。
go
var once sync.Once
func initialize() {
fmt.Println("初始化操作(只执行一次)")
}
func onceDemo() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d 调用 initialize\n", id)
once.Do(initialize)
}(i)
}
wg.Wait()
}工作池模式
go
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, job)
time.Sleep(time.Duration(job*100) * time.Millisecond)
results <- job * 2
fmt.Printf("Worker %d 完成任务 %d\n", id, job)
}
}
func workerPoolDemo() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动 worker
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for a := 1; a <= numJobs; a++ {
result := <-results
fmt.Printf("结果: %d\n", result)
}
}Context(上下文)
用于控制 goroutine 的生命周期。
go
import "context"
func contextDemo() {
// 带取消的 context
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("Goroutine 收到取消信号")
return
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}()
time.Sleep(2 * time.Second)
fmt.Println("发送取消信号")
cancel()
time.Sleep(1 * time.Second)
}
func contextTimeoutDemo() {
// 带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-time.After(3 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("超时:", ctx.Err())
}
}完整示例:并发下载器
go
package main
import (
"fmt"
"sync"
"time"
)
type DownloadTask struct {
ID int
URL string
Size int // MB
}
type Downloader struct {
workers int
tasks chan DownloadTask
results chan string
wg sync.WaitGroup
}
func NewDownloader(workers int) *Downloader {
return &Downloader{
workers: workers,
tasks: make(chan DownloadTask, 10),
results: make(chan string, 10),
}
}
func (d *Downloader) worker(id int) {
defer d.wg.Done()
for task := range d.tasks {
fmt.Printf("[Worker %d] 开始下载: %s (大小: %dMB)\n", id, task.URL, task.Size)
// 模拟下载时间
downloadTime := time.Duration(task.Size*100) * time.Millisecond
time.Sleep(downloadTime)
result := fmt.Sprintf("任务 %d 完成: %s", task.ID, task.URL)
d.results <- result
fmt.Printf("[Worker %d] 完成下载: %s\n", id, task.URL)
}
}
func (d *Downloader) Start() {
// 启动 worker
for i := 1; i <= d.workers; i++ {
d.wg.Add(1)
go d.worker(i)
}
}
func (d *Downloader) AddTask(task DownloadTask) {
d.tasks <- task
}
func (d *Downloader) Wait() {
close(d.tasks)
d.wg.Wait()
close(d.results)
}
func (d *Downloader) GetResults() []string {
var results []string
for result := range d.results {
results = append(results, result)
}
return results
}
func main() {
fmt.Println("=== 并发下载器 ===")
downloader := NewDownloader(3)
downloader.Start()
// 添加下载任务
tasks := []DownloadTask{
{ID: 1, URL: "https://example.com/file1.zip", Size: 5},
{ID: 2, URL: "https://example.com/file2.zip", Size: 3},
{ID: 3, URL: "https://example.com/file3.zip", Size: 7},
{ID: 4, URL: "https://example.com/file4.zip", Size: 4},
{ID: 5, URL: "https://example.com/file5.zip", Size: 6},
{ID: 6, URL: "https://example.com/file6.zip", Size: 2},
}
for _, task := range tasks {
downloader.AddTask(task)
}
// 等待所有任务完成
downloader.Wait()
// 获取结果
results := downloader.GetResults()
fmt.Println("\n=== 下载结果 ===")
for _, result := range results {
fmt.Println(result)
}
}并发模式总结
Fan-Out, Fan-In 模式
go
func fanOutFanIn() {
// Fan-Out: 一个输入,多个输出
input := make(chan int)
outputs := make([]chan int, 3)
for i := range outputs {
outputs[i] = make(chan int)
go func(out chan int) {
for n := range input {
out <- n * 2
}
close(out)
}(outputs[i])
}
// Fan-In: 多个输入,一个输出
output := make(chan int)
var wg sync.WaitGroup
for _, ch := range outputs {
wg.Add(1)
go func(c chan int) {
defer wg.Done()
for n := range c {
output <- n
}
}(ch)
}
go func() {
wg.Wait()
close(output)
}()
// 发送数据
go func() {
for i := 1; i <= 5; i++ {
input <- i
}
close(input)
}()
// 接收结果
for result := range output {
fmt.Println("结果:", result)
}
}小结
- Goroutine:轻量级线程,使用
go关键字启动 - Channel:goroutine 间通信的管道
- 无缓冲 vs 有缓冲:同步 vs 异步通信
- Select:处理多个 channel 操作
- WaitGroup:等待多个 goroutine 完成
- Mutex:互斥锁,保护共享资源
- RWMutex:读写锁,优化读多写少场景
- Once:确保操作只执行一次
- Context:控制 goroutine 生命周期
- 工作池:限制并发数量,提高资源利用率
- 并发模式:Fan-Out/Fan-In、Pipeline 等