go语言实战 - 并发

go语言实战 - 并发

第六章 并发

  • go语言的并发是指让某个函数独立于其他函数运行的能力
  • go语言的并发同步模型来自于通信顺序进程泛型(Communication SequentialProcess,CSP)
    • CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而非对数据加锁来实现同步访问
    • go引入了新数据类型,通道channle

并发与并行

  • 操作系统的进程与线程

    • 进程包含了应用程序在运行时需要用到和维护的各种资源的容器,如内存地址空间,文件,设备的句柄及线程
    • 线程是一个执行空间,被操作系统调度来执行代码
  • 操作系统会在物理处理器上调度线程来运行,go语言会在逻辑处理器上调度goroutine来运行

  • go语言默认给每个可用的物理处理器分配一个逻辑处理器
  • 一个逻辑处理器可以调度无数个goroutine
    • 创建一个goroutine并准备运行
    • goroutine会首先进入调度器的全局运行队列中
    • 调度器将队列中的goroutine分配给一个逻辑处理器,也就是将goroutinue放置到逻辑处理器对应的本地运行队列中
    • 本地运行队列中的goroutine会等待到自己被分配的逻辑处理器执行

goroutine

  • runtime.GOMAXPROCS可以更改调度器可以使用的逻辑处理器的数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func main() {
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("Start Goroutine")
go func() {
defer wg.Done()
for count := 0; count < 3; count++ {
for char := 'a'; char < 'z'; char++ {
fmt.Printf("%c", char)
}
}
}()
go func() {
defer wg.Done()
for count := 0; count < 3; count++ {
for char := 'A'; char < 'Z'; char++ {
fmt.Printf("%c", char)
}
}
}()
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("Finish this")
}
  • 设置一个逻辑处理器的结果
1
2
3
4
Start Goroutine
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y A B C D E F G H I J K L M N O P Q R S T U V W X Y A B C D E F G H I J K L M N O P Q R S T U V W X Y a b c d e f g h i j k l m n o p q r s t u v w x y a b c d e f g h i j k l m n o p q r s t u v w x y a b c d e f g h i j k l m n o p q r s t u v w x y
Finish this
  • 设置多个逻辑处理器的结果
1
2
3
4
Start Goroutine
Waiting To Finish
a b c d e f g h A B C D E F G H I J K L M N i j k l m n o p q r O P Q R S s t u v w x y a b c d e f g h i j k l m n o p q r s t u v w x y a b c d e f g h i j k l m n o p q r s t u v w x y T U V W X Y A B C D E F G H I J K L M N O P Q R S T U V W X Y A B C D E F G H I J K L M N O P Q R S T U V W X Y
Finish this
  • 一个正运行的goroutine再工作结束前,可以被停止并重新调度,避免一个goroutine长时间占用逻辑处理器
  • runtime包提供了修改go语言运行时配置参数的能力
  • 并不是逻辑处理器数量越多越好,需要经过基准测试来评估

竞争状态

  • 两个或者多个goroutine访问某个共享资源,则称其处于互相竞争的状态
  • runtime.Gosched()强制调度器切换两个goroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var (
counter int
waitGroup sync.WaitGroup
)
func main() {
waitGroup.Add(2)
go incCounter(1)
go incCounter(2)
waitGroup.Wait()
fmt.Println("Final Counter: ", counter)
}
func incCounter(Iid int) {
defer waitGroup.Done()
for count := 0; count < 2; count++ {
value := counter
// 退出当前goroutine,将其放回队列
runtime.Gosched()
value++
counter = value
}
}

锁住共享资源

原子函数

  • 原子函数能够以底层的加锁机制来同步访问整醒变量和指针
  • atomic包的AddInt64函数会同步整型值的加法,强制同一时刻只能有一个goroutine运行并完成这个加法操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var (
counter int64
waitGroup sync.WaitGroup
)
func main() {
waitGroup.Add(2)
go incCounter(1)
go incCounter(2)
waitGroup.Wait()
fmt.Println("Final Counter: ", counter)
}
func incCounter(Iid int) {
defer waitGroup.Done()
for count := 0; count < 2; count++ {
atomic.AddInt64(&counter, 1)
// 退出当前goroutine,将其放回队列
runtime.Gosched()
}
}
  • LoadInt64和StoreInt64提供了一种安全地读写一个整型值的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
var (
shutdown int64
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(1*time.Second)
fmt.Println("shutdown now")
atomic.StoreInt64(&shutdown, 1)
wg.Wait()
}
func doWork(name string) {
defer wg.Done()
for {
fmt.Printf("Doing %s Work \n", name)
time.Sleep(250*time.Millisecond)
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s Down \n", name)
break
}
}
}

互斥锁

  • 互斥锁mutex用于在代码上创建一个临界区,保证统一时间只有一个goroutine可以执行这个临界区代码,修正上述代码如下
  • 使用大括号可以使得临界区看起来更加清晰
  • 强制退出当前线程之后,如果调度器分配其他线程执行,会因为拿不到临界区的互斥锁而等待,从而保证调度器会分配回当前线程来进行程序继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var (
counter int
waitGroup sync.WaitGroup
mutex sync.Mutex
)
func main() {
waitGroup.Add(2)
go incCounter(1)
go incCounter(2)
waitGroup.Wait()
fmt.Println("Final Counter: ", counter)
}
func incCounter(Iid int) {
defer waitGroup.Done()
for count := 0; count < 2; count++ {
mutex.Lock()
{
value := counter
// 退出当前goroutine,将其放回队列
runtime.Gosched()
value++
counter = value
}
mutex.Unlock()
}
}

通道

  • 通道通过发送和接收需要共享的资源,在goroutine之间做同步
  • 声明通道时,需要指定将要被共享的数据的类型,可以通过通道共享内置类型,命名类型,结构类型和引用类型的值或者指针
  • make关键字来创建通道
  • <-运算符表示数据的流向

无缓冲的通道

  • 无缓冲的通道指的是接手前没有能力保存任何值的通道
  • 这种对通道进行发送和接收的交互 行为本身就是同步的,否则会阻塞,发送和接收的goroutine互相依赖
  • 网球比赛示例如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
court := make(chan int)
wg.Add(2)
go player("Nodal", court)
go player("Dick", court)
court <- 1
wg.Wait()
}
func player(name string, court chan int) {
defer wg.Done()
for {
ball, ok := <- court
if !ok {
fmt.Printf("Player %s win \n", name)
return
}
n := rand.Intn(100)
if n % 13 == 0 {
fmt.Printf("Player %s missed \n", name)
close(court)
return
}
fmt.Printf("Player %s hit %d \n", name, ball)
ball++
court <- ball
}
}

有缓冲通道

  • 有缓冲通道时一种在被接收前能存储一个或多个值的通道
  • 并不强制要求goroutine之间必须同时完成接收和发送
  • 4个goroutine来完成10个工作可以使用带缓冲的通道来实现
  • 需要注意,通道关闭之后是只能从通道读取数据,不能写入,不会有数据丢失
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
const (
numberGoroutine = 4
taskLoad = 10
)
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
tasks := make(chan string, taskLoad)
wg.Add(numberGoroutine)
for gr := 1; gr <= numberGoroutine; gr++ {
go worker(tasks, gr)
}
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task %d", post)
}
close(tasks)
wg.Wait()
}
func worker(tasks chan string, worker int) {
defer wg.Done()
for {
task, ok := <- tasks
if !ok {
fmt.Printf("Worker %d shutdown \n", worker)
return
}
fmt.Printf("Worker %d start work %s \n", worker, task)
time.Sleep(time.Duration(rand.Int63n(100))*time.Millisecond)
fmt.Printf("Worker %d finshed work %s \n", worker, task)
}
}
/*
Worker 2 start work Task 1
Worker 4 start work Task 4
Worker 1 start work Task 2
Worker 3 start work Task 3
Worker 4 finshed work Task 4
Worker 4 start work Task 5
Worker 1 finshed work Task 2
Worker 1 start work Task 6
Worker 3 finshed work Task 3
Worker 3 start work Task 7
Worker 2 finshed work Task 1
Worker 2 start work Task 8
Worker 4 finshed work Task 5
Worker 4 start work Task 9
Worker 4 finshed work Task 9
Worker 4 start work Task 10
Worker 1 finshed work Task 6
Worker 1 shutdown
Worker 2 finshed work Task 8
Worker 3 finshed work Task 7
Worker 3 shutdown
Worker 2 shutdown
Worker 4 finshed work Task 10
Worker 4 shutdown
*/

小结

  • 并发是指goroutine运行的时候是互相独立的
  • 使用关键字go创建goroutine来运行函数
  • goroutine在逻辑处理器上执行,而逻辑处理器则具有独立的系统线程和运行队列
  • 竞争状态是指两个或者多个goroutine试图访问同一个资源
  • 原子函数和互斥锁提供了一种防止出现竞争状态的方法
  • 通道提供了一种在两个goroutine之间共享数据的简单方法
  • 无缓冲的通道保证同时交换数据,有缓冲的通道不做这种保证

第七章 并发模式

runner

  • runner包用于展示如何通过通道来监视程序的执行时间
  • runner包也可以终止程序
  • 适合用于定时任务中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")
// Runner 在给定的超时时间内执行一组任务,并且在操作系统发出终端信号时结束这些任务
type Runner struct {
// 操作系统信号
interrupt chan os.Signal
complete chan error
timeout <-chan time.Time
tasks []func(int)
}
func NewRunner(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
func (r *Runner) Start() error {
signal.Notify(r.interrupt, os.Interrupt)
go func() {
r.complete <- r.run()
}()
select {
case err := <- r.complete:
return err
case <-r.timeout:
return ErrTimeout
}
}
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
func (r *Runner) gotInterrupt() bool {
select {
case <- r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
  • 函数调用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const timeout = 3 * time.Second
func main() {
log.Println("Starting work")
r := runner.NewRunner(timeout)
r.Add(createTask(), createTask(), createTask())
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt")
os.Exit(2)
}
}
log.Println("Process ended.")
}
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task %d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}

pool

  • pool用于展示如何使用有缓冲的通道实现资源池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package pool
import (
"errors"
"io"
"log"
"sync"
)
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
func NewPool(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size value too small")
}
return &Pool{
factory:fn,
resources: make(chan io.Closer, size),
}, nil
}
var ErrPoolClosed = errors.New("pool has been closed")
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <- p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
log.Println("Acquire:", "New Resource")
return p.factory()
}
}
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.resources <- r:
log.Println("Release: ", "In Queue")
default:
log.Println("Release:", "Closing")
r.Close()
}
}
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
close(p.resources)
for r := range p.resources{
r.Close()
}
}
  • 连接池的使用代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"api/books/go-in-action/7-concurrency-pattern/pool"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
const (
maxGoroutines = 25
poolResources = 2
)
type dbConnection struct {
ID int32
}
func (conn *dbConnection) Close() error {
log.Println("Close: Connection", conn.ID)
return nil
}
var idCounter int32
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{ID:id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
p, err := pool.NewPool(createConnection, poolResources)
if err != nil {
log.Println(err)
}
for query := 0; query < maxGoroutines; query++ {
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("Shutdown Program")
p.Close()
}
func performQueries(query int, p *pool.Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d] \n", query, conn.(*dbConnection).ID)
}

work

  • work包的目的是展示如何使用无缓冲的通道来创建一个goroutine池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package work
import "sync"
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 1; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
func (p *Pool) Run(w Worker) {
p.work <- w
}
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
  • 运行函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import (
"api/books/go-in-action/7-concurrency-pattern/work"
"log"
"sync"
"time"
)
var names = []string {
"steve",
"bob",
"mary",
"therese",
"jason",
}
type namePrinter struct {
name string
}
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
func main() {
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
for _, name := range names {
np := namePrinter{name:name}
go func() {
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
p.Shutdown()
}

小结

  • 可以使用通道来控制程序的声明周期
  • 带default分支的select语句可以用来尝试向通道发送或者接受数据,而不会被阻塞
  • 有缓冲的通道可以用来管理一组可服用的资源
  • 语言运行时会处理好通道的协作和同步
  • 使用无缓冲的通道来创建完成工作的goroutines池
  • 任何时间都可以用无缓冲的通道来让两个goroutine交换数据,在通道操作完成时一定保证对方接收到了数据
Donate comment here