1. Goroutine同步【数据同步】

  • 为什么需要goroutine同步

  • gorotine同步概念、以及同步的几种方式

1.1 为什么需要goroutine同步

package main

import (
	"fmt"
	"sync"
)

var A = 10
var wg = sync.WaitGroup{}

func Add(){
    defer wg.Done()
    for i:=0;i<1000000;i++{
        A += 1
    }
}

func main() {
	wg.Add(2)
    go Add()
    go Add()
	wg.Wait()
	fmt.Println(A)
}
# output:
1061865  # 每运行一次结果都不一样,但是都不是我们预期的结果2000000

 多goroutine【多任务】,有共享资源,且多goroutine修改共享资源,出现数据不安全问题【数据错误】,保证数据安全一致,需要goroutine同步

1.2 goroutine同步

goroutine按照约定的顺序执行,解决数据不安全问题。

 

1.3 goroutine同步方式

  • channel 【csp模型】

  • 互斥锁 【传统同步机制】

  • 读写锁 【传统同步机制】

  • 条件变量 【传统同步机制】

 

2. 传统同步机制

2.1 互斥锁

2.1.1 特点

加锁成功则操作资源,加锁失败则等待直至锁加锁成功—-所有的goroutine互斥,一个得到锁其他全部等待

解决了数据安全问题,降低了程序的性能,适用读写不太频繁的场景

2.1.2 锁颗粒度问题

颗粒度是指,加锁的范围,哪里使用资源哪里加锁,尽可能减少加锁范围

单元测试基本使用流程

  • 新建单元测试文件

  • 编写测试案例

  • gotest运行生成对应的prof文件

  • go tool 查看生成的prof文件

package main_test
import (
	"fmt"
	"sync"
	"testing"
)

var A = 10
var wg = sync.WaitGroup{}
var mux sync.Mutex

func Add(){
	defer wg.Done()
	for i:=0;i<1000000;i++{
        mux.Lock()
		A += 1
        mux.Unlock()
	}
}
/*
// 加大锁颗粒度
func Add(){
	defer wg.Done()
	mux.Lock()
	for i:=0;i<1000000;i++{
		A += 1
	}
	mux.Unlock()
}*/
// 单元测试格式, 
func TestMux(t *testing.T) {
	wg.Add(2)
	go Add()
	go Add()
	wg.Wait()
	fmt.Println(A)
}

 

# 生成prof文件,-cpuprofile 参数指定生成什么类型的prof cpu.prof指定生成profile文件名字
go test mutex_test.go -cpuprofile cpu.prof

# 查看生成的prof文件,pprof 指定查看的文件类型
go tool pprof cpu.prof

# 下面是输出信息
Type: cpu
Time: Jul 10, 2019 at 2:38pm (CST)
Duration: 201.43ms, Total samples = 80ms (39.72%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top  # 这里使用top命令查看测试中cpu使用的信息
Showing nodes accounting for 80ms, 100% of 80ms total
      flat  flat%   sum%        cum   cum%
      60ms 75.00% 75.00%       60ms 75.00%  sync.(*Mutex).Unlock
      20ms 25.00%   100%       20ms 25.00%  sync.(*Mutex).Lock
         0     0%   100%       80ms   100%  command-line-arguments_test.Add
(pprof) svg  #svg 保存可视化文件,可以使用浏览器可视化查看
(pprof) list Add  # 查看对应函数的详细时间消耗信息

 

注意

当前得测试案例,是程序设计的错误【这种快速计算性的,一个goroutine已经可以胜任,更多时候读写分离,互斥锁不适合这种频繁读写场景】,不是锁使用的错误

2.1.3 sync.once 源码阅读

// Once is an object that will perform exactly one action.
type Once struct {
	m    Mutex
	done uint32  // 标识是否已执行过任务,如果设置为1 则说明任务已执行过了
}

// DO 调用用户执行的方法,仅调用一次
func (o *Once) Do(f func()) {
    // 原子操作判断done,已被置成1,如果done是1 说明方法已被执行,直接返回
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}
   
    // 加锁
	o.m.Lock()
	defer o.m.Unlock()
    // done为0则开始,调用用户函数方法
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

 

2.2 读写锁

读写互斥,读者可以重复加锁。写加锁需要等待所有读者解锁,写加锁期间所有读者wait【写优先级高于读,读写同时加锁写着加锁先成功】

适用写少读多的场景,相比互斥锁可以一定程度提高程序性能

  • 仅有读者

 

 

  • 写着加锁更新数据

 

 

 

 

 

 

 

2.3 条件变量

条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的协程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用可以认为条件变量是对锁的一种补充,某种程度上提高锁机制带来的效率低下的问题

2.3.1 条件变量API介绍

  • 创建条件变量 【创建后不能被被拷贝】

// 参数传递一把锁,返回指针类型
cond:=sync.NewCond(&sync.Mutex{})

 Cond.Wait() ,阻塞再条件变量上让出cup资源

// 阻塞在条件变量上面,会把当前gorotine挂载到Cond队列上面
cond.Wait()
// 1. 释放锁,并把自己挂载到通知队列,阻塞等待【原子操作】
// 2. 接收到唤醒信号,尝试获取锁
// 3. 获取锁成功则  返回

 Cond.Signal() 随机唤醒一个阻塞在条件变量上的goroutine

// 唤醒阻塞在条件变量上的goroutine,处于wait【调用了cond.wait】状态的goroutine
// 随机唤醒通知队列上的一个线程,并从通知队列移除
cond.Signal()  // 发送唤醒信号

Cond.Broadcast() 广播通知所有处于wait状态的goroutine

// 广播通知所有处于wait状态的goroutine
// 通知通知队列上的所有的gorotine,并且把所有的goroutine从通知队列 取下来
cond.Broadcast()

2.3.2 条件变量在生产者消费模型中使用

  • 潜在bug–>deadlock【生产者消费者都死锁在cond.wait,没有其他的goroutine唤醒】

package main
import "fmt"
import "sync"
import "math/rand"
import "time"

var cond sync.Cond             // 创建全局条件变量

// 生产者
func producer(out chan<- int, idx int) {
   for {
      cond.L.Lock()           	// 条件变量对应互斥锁加锁
      for len(out) == 3 {          	// 产品区满 等待消费者消费
         cond.Wait()             	// 挂起当前协程, 等待条件变量满足,被消费者唤醒
      }
      num := rand.Intn(1000) 	// 产生一个随机数
      out <- num             	// 写入到 channel 中 (生产)
      fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据\n", idx, num, len(out))
      cond.L.Unlock()             	// 生产结束,解锁互斥锁
      cond.Signal()           	// 唤醒 阻塞的 消费者
      time.Sleep(time.Second)       // 生产完休息一会,给其他协程执行机会, 解决了死锁机会的降低
   }
}
//消费者
func consumer(in <-chan int, idx int) {
   for {
      cond.L.Lock()           	// 条件变量对应互斥锁加锁(与生产者是同一个)
      for len(in) == 0 {      	// 产品区为空 等待生产者生产
         cond.Wait()             	// 挂起当前协程, 等待条件变量满足,被生产者唤醒
      }
      num := <-in                	// 将 channel 中的数据读走 (消费)
      fmt.Printf("---- %dth 消费者, 消费数据 %3d,公共区剩余%d个数据\n", idx, num, len(in))
      cond.L.Unlock()             	// 消费结束,解锁互斥锁
      cond.Signal()           	// 唤醒 阻塞的 生产者
      time.Sleep(time.Millisecond * 500)    	//消费完 休息一会,给其他协程执行机会, 解决了死锁机会的降低
   }
}
func main() {
   rand.Seed(time.Now().UnixNano())  // 设置随机数种子
   quit := make(chan bool)           // 创建用于结束通信的 channel

   product := make(chan int, 3)      // 产品区(公共区)使用channel 模拟
   cond.L = new(sync.Mutex)          // 创建互斥锁和条件变量

   for i := 0; i < 5; i++ {          // 5个消费者
      go producer(product, i+1)
   }
   for i := 0; i < 3; i++ {          // 3个生产者
      go consumer(product, i+1)
   }
   <-quit                         	// 主协程阻塞 不结束
}
  • deadlock原因剖析【极值法】

    1. 极端处理: 1个生产者 2 消费 channle 缓存1

    2. 由于极端一些情况,会导致所有的生产者与消费者都会进入到一个wait 状态,没有人唤醒

  • 解决bug—-单向唤醒,由生产者唤醒消费者

    唤醒方向问题: 由速率低的一方唤醒速率高的一方

package main
import (
	"fmt"
	"runtime"
)
import "sync"
import "math/rand"
import "time"

var cond sync.Cond             // 创建全局条件变量

// 生产者
func producer(out chan<- int, idx int) {
	for {
		num := rand.Intn(1000) 	// 产生一个随机数
		cond.L.Lock()           	// 条件变量对应互斥锁加锁
		select {
		// 尝试向channel写入数据
		case out <- num:
			fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据\n", idx, num, len(out))
		default:
		}
		cond.L.Unlock()             	// 生产结束,解锁互斥锁
		cond.Signal()           	// 唤醒 阻塞的 消费者
		runtime.Gosched()			// 给别更多的机会创建锁
	}
}
//消费者
func consumer(in <-chan int, idx int) {
	var num int
	for {
		cond.L.Lock()           	// 条件变量对应互斥锁加锁(与生产者是同一个)
		for len(in)==0{
			cond.Wait()
		}
		num=<-in
		fmt.Printf("%dth 消费者,消费了 %d, 公共区剩余%d个数据\n", idx, num, len(in))
		cond.L.Unlock()             	// 消费结束,解锁互斥锁
	}
}
func main() {
	rand.Seed(time.Now().UnixNano())  // 设置随机数种子
	quit := make(chan bool)           // 创建用于结束通信的 channel

	product := make(chan int, 3)      // 产品区(公共区)使用channel 模拟
	cond.L = new(sync.Mutex)          // 创建互斥锁和条件变量

	for i := 0; i < 3; i++ {          // 3个生产者
		go producer(product, i+1)
	}
	for i := 0; i < 5; i++ {          // 5个消费者 
		go consumer(product, i+1)  
}
	<-quit                         	// 主协程阻塞 不结束
}

 
问题:

当我们把条件变量取消,使用带缓存的channel,同样很好的完成生产者与消费者模型【channel空与非空主动阻塞等待,直至解除阻塞】,why use cond?

2.3.3 channel vs sync.Cond

使用channel通知多个关注条件的goroutine问题?

关闭的channle 与广播的作用,仅仅单次使用

当状态多重情况的时候,channel 不行了,使用cond广播的方式进行状态更新

 

版权声明:本文为lurenq原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/lurenq/p/11943803.html