goroutine&channel学习

goroutine & channel

  • 不需要共享内存来通信,而是通过通信来共享内存。

1. channel关键的概念总结

  • 通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。
  • channel零值nil
  • channel只能传递一种类型的数据
  • channel实际上是类型化消息的队列,使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给他们的元素的顺序
  • channel是引用类型,make来初始化 ch1 := make(chan string)
  • 通信操作符<-
    • ch <-int1 写入
    • int2 := <- ch 写出
    • <-ch 写出
  • channel的收发都是原子操作

  • channel默认是同步且无缓冲的:接收者接收数据之前,发送不会结束。channel的发送和接收操作都是在对方准备好之前是阻塞的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    package main
    import "fmt"
    func main() {
    ch1 := make(chan int)
    go pump(ch1) // pump hangs
    fmt.Println(<-ch1) // prints only 0
    }
    func pump(ch chan int) {
    for i := 0; ; i++ {
    ch <- i
    }
    }
    // 0

2. 通过channel进行协程同步

1
2
3
4
5
6
7
8
func f1(in chan int) {
fmt.Println(<-in)
}
func main() {
out := make(chan int)
out <- 2
go f1(out)
}

3. channel可以用来完成协程间的通信

  • ch1 := make(chan string, buf) buf为个数
  • 在缓冲满载之前,给通道发送数据是不会被阻塞的
  • 在缓冲空之前,读取是不会阻塞的
  • 协程通过在channel中放置一个值来发出处理结束的信号
  • 信号量是实现互斥锁常见的同步机制

    • 带缓冲通道的容量和要同步的资源容量相同
    • 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
    • 容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      func main() {
      stream := pump()
      go suck(stream)
      time.Sleep(1e9)
      }
      func pump() chan int {
      ch := make(chan int)
      go func() {
      for i := 0; ; i++ {
      ch <- i
      }
      }()
      return ch
      }
      func suck(ch chan int) {
      for {
      fmt.Println(<-ch)
      }
      }
  • channel可以用for关键字循环

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func main() {
    suck(pump())
    time.Sleep(1e9)
    }
    func pump() chan int {
    ch := make(chan int)
    go func() {
    for i := 0; ; i++ {
    ch <- i
    }
    }()
    return ch
    }
    func suck(ch chan int) {
    go func() {
    for v := range ch {
    fmt.Println(v)
    }
    }()
    }
  • channel可以指定数据流动方向

    • 只接收:var recv_noly <-chan int
    • 只发送:var send_only chan<- int
  • channel关闭 defer close(ch)

  • 使用select切换协程,select 被称为通信开关,select可以处理多个通信中的一个

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go pump1(ch1)
    go pump2(ch2)
    go suck(ch1, ch2)
    time.Sleep(1e9)
    }
    func pump1(ch chan int) {
    for i := 0; ; i++ {
    ch <- i * 2
    }
    }
    func pump2(ch chan int) {
    for i := 0; ; i++ {
    ch <- i + 5
    }
    }
    func suck(ch1, ch2 chan int) {
    for {
    select {
    case v := <-ch1:
    fmt.Printf("Received on channel 1: %d\n", v)
    case v := <-ch2:
    fmt.Printf("Received on channel 2: %d\n", v)
    }
    }
    }
  • goroutine recover机制,保证一个协程出错会继续执行其他的协程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func server(workChan <-chan *Work) {
    for work := range workChan {
    go safelyDo(work)
    }
    }
    func safelyDo(work *Work) {
    defer func() {
    if err := revocer(); err != nil {
    log.Printf("")
    }
    }()
    do(work)
    }

4. channel通信方式和传统共享内存方式的对比

  • 使用共享内存来进行同步:
    • 首先定义一个结构体处理一个任务
    • 各个任务组成任务池来共享内存,引入锁机制
    • 任务数量巨大,则锁机制的开销会导致效率急剧下降
  • channel方式

    • 使用一个通道接受需要处理的任务,一个通道接受处理完成的任务及其结果。worker在协程中启动,数量N可以调整
    • 不使用锁的机制,因为从通道获取到新任务不存在竞争关系
    • 我理解实际上锁机制是实现在了channel中,一个channel的获取是原子性的,从而提高性能

      1
      2
      3
      4
      5
      6
      7
      8
      //Worker的写法, Master -> Workers
      func Worker(in, out chan *Task) {
      for {
      t := <- in
      process(t)
      out <- t
      }
      }
  • 使用锁的情景:

    • 访问共享数据结构中的缓存信息
    • 保存应用程序上下文和状态信息数据
  • 使用通道的情景
    • 与异步操作的结果进行交互
    • 分发任务
    • 传递数据所有权

5. int channel实现惰性生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var resume chan int
func integers() chan int {
yield := make(chan int)
count := 0
go func() {
for {
yield <- count
count++
}
}()
return yield
}
func generateInteger() int {
resume = integers()
return <- resume
}
  • 使用空接口,闭包和高阶函数可以实现一个通用的惰性生产期的工厂函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    type Any interface{}
    type EvalFunc func(Any) (Any, Any)
    func BuildLazyEvaluator(evalFunc, initState Any) func() Any {
    retValChan := make(chan Any)
    loopFunc := func(){
    var actState Any = initState
    var retVal
    for {
    retVal, actState = evalFunc(actState)
    retValChan <- retVal
    }
    }
    retFunc := func() Any {
    return <- retValChan
    }
    go loopFunc()
    return retFunc
    }
    func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int {
    ef := BuildLazyIntEvaluator(evalFunc, initState)
    return func() int {
    return ef().(int)
    }
    }
    func main() {
    evenFunc := func(state Any) (Any, Any) {
    os := state.(int)
    ns := os + 2
    return os, ns
    }
    even := BuildLazyIntEvaluator(evenFunc, 0)
    for i := 0; i < 10; i++ {
    fmt.
    }
    }

6. 带缓冲的channel可以用于降频

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const MAXREQS = 50
var sem = make(chan int, MAXREQS)
type Request struct {
a, b int
replyc chan int
}
func process(r *Request) {
// ...
}
func handle(r *Request) {
sem <- 1
process(r)
<- sem
}
func server(service chan *Request) {
for {
request := <- service
go handler(request)
}
}
func main() {
service := make(chan *Request)
go server(service)
}
  • 链式协程,海量协程的创建和运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var ngoroutine = flag.Int("n", 100000, "how many goroutines")
func f(left, right chan int) {
left <- 1 + <- right
}
func main() {
flag.Parse()
leftmost := make(chan int)
var left, right chan int = nil, leftmost
for i := 0; i < *ngoroutine; i++ {
left, right = right, make(chan int)
go f(left, right)
}
right <- 0
x := <-leftmost
fmt.Println(x)
}

并行化大量数据的计算

  • 串行处理流水线

    1
    2
    3
    4
    5
    6
    7
    8
    func SerialProcessData(in <- chan *Data, out chan <- *Data) {
    for data := range in {
    tmpA := PreproceeData(data)
    tmpB := ProcessStepA(tmpA)
    tmpC := ProcessStepB(tmpB)
    out <- PostProcessData(tmpC)
    }
    }
  • 并行计算

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func ParallelProcessData(in <-chan *Data, out chan<- *Data) {
    // make channels:
    preOut := make(chan *Data, 100)
    stepAOut := make(chan *Data, 100)
    stepBOut := make(chan *Data, 100)
    stepCOut := make(chan *Data, 100)
    // start parallel computations:
    go PreprocessData(in, preOut)
    go ProcessStepA(preOut,StepAOut)
    go ProcessStepB(StepAOut,StepBOut)
    go ProcessStepC(StepBOut,StepCOut)
    go PostProcessData(StepCOut,out)
    }

8. 使用channel并发访问对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type Person struct {
Name string
salary float64
chF chan func()
}
func NewPerson(name string, salary float64) *Person {
p := &Person{name, salary, make(chan func())}
go p.backend()
return p
}
func (p *Person) backend() {
for f := range p.chF {
f()
}
}
// Set salary.
func (p *Person) SetSalary(sal float64) {
p.chF <- func() { p.salary = sal }
}
// Retrieve salary.
func (p *Person) Salary() float64 {
fChan := make(chan float64)
p.chF <- func() { fChan <- p.salary }
return <-fChan
}
func (p *Person) String() string {
return "Person - name is: " + p.Name + " - salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64)
}
func main() {
bs := NewPerson("Smith Bill", 2500.5)
fmt.Println(bs)
bs.SetSalary(4000.25)
fmt.Println("Salary changed:")
fmt.Println(bs)
}
Donate comment here