goroutine & channel
- 不需要共享内存来通信,而是通过通信来共享内存。
1. channel关键的概念总结
- 通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。
- channel零值nil
- channel只能传递一种类型的数据
- channel实际上是类型化消息的队列,使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给他们的元素的顺序
- channel是引用类型,make来初始化
ch1 := make(chan string)
- 通信操作符
<-
ch <-int1
写入int2 := <- ch
写出<-ch
写出
channel的收发都是原子操作
channel默认是同步且无缓冲的:接收者接收数据之前,发送不会结束。channel的发送和接收操作都是在对方准备好之前是阻塞的
12345678910111213package mainimport "fmt"func main() {ch1 := make(chan int)go pump(ch1) // pump hangsfmt.Println(<-ch1) // prints only 0}func pump(ch chan int) {for i := 0; ; i++ {ch <- i}}// 0
2. 通过channel进行协程同步
12345678
func f1(in chan int) { fmt.Println(<-in)}func main() { out := make(chan int) out <- 2 go f1(out)}
3. channel可以用来完成协程间的通信
ch1 := make(chan string, buf)
buf为个数- 在缓冲满载之前,给通道发送数据是不会被阻塞的
- 在缓冲空之前,读取是不会阻塞的
- 协程通过在channel中放置一个值来发出处理结束的信号
信号量是实现互斥锁常见的同步机制
- 带缓冲通道的容量和要同步的资源容量相同
- 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)
12345678910111213141516171819func main() {stream := pump()go suck(stream)time.Sleep(1e9)}func pump() chan int {ch := make(chan int)go func() {for i := 0; ; i++ {ch <- i}}()return ch}func suck(ch chan int) {for {fmt.Println(<-ch)}}
channel可以用for关键字循环
1234567891011121314151617181920func main() {suck(pump())time.Sleep(1e9)}func pump() chan int {ch := make(chan int)go func() {for i := 0; ; i++ {ch <- i}}()return ch}func suck(ch chan int) {go func() {for v := range ch {fmt.Println(v)}}()}channel可以指定数据流动方向
- 只接收:
var recv_noly <-chan int
- 只发送:
var send_only chan<- int
- 只接收:
channel关闭
defer close(ch)
使用select切换协程,select 被称为通信开关,select可以处理多个通信中的一个
12345678910111213141516171819202122232425262728func main() {ch1 := make(chan int)ch2 := make(chan int)go pump1(ch1)go pump2(ch2)go suck(ch1, ch2)time.Sleep(1e9)}func pump1(ch chan int) {for i := 0; ; i++ {ch <- i * 2}}func pump2(ch chan int) {for i := 0; ; i++ {ch <- i + 5}}func suck(ch1, ch2 chan int) {for {select {case v := <-ch1:fmt.Printf("Received on channel 1: %d\n", v)case v := <-ch2:fmt.Printf("Received on channel 2: %d\n", v)}}}goroutine recover机制,保证一个协程出错会继续执行其他的协程
12345678910111213func server(workChan <-chan *Work) {for work := range workChan {go safelyDo(work)}}func safelyDo(work *Work) {defer func() {if err := revocer(); err != nil {log.Printf("")}}()do(work)}
4. channel通信方式和传统共享内存方式的对比
- 使用共享内存来进行同步:
- 首先定义一个结构体处理一个任务
- 各个任务组成任务池来共享内存,引入锁机制
- 任务数量巨大,则锁机制的开销会导致效率急剧下降
channel方式
- 使用一个通道接受需要处理的任务,一个通道接受处理完成的任务及其结果。worker在协程中启动,数量N可以调整
- 不使用锁的机制,因为从通道获取到新任务不存在竞争关系
我理解实际上锁机制是实现在了channel中,一个channel的获取是原子性的,从而提高性能
12345678//Worker的写法, Master -> Workersfunc Worker(in, out chan *Task) {for {t := <- inprocess(t)out <- t}}
使用锁的情景:
- 访问共享数据结构中的缓存信息
- 保存应用程序上下文和状态信息数据
- 使用通道的情景
- 与异步操作的结果进行交互
- 分发任务
- 传递数据所有权
5. int channel实现惰性生成器
|
|
使用空接口,闭包和高阶函数可以实现一个通用的惰性生产期的工厂函数
1234567891011121314151617181920212223242526272829303132333435363738type Any interface{}type EvalFunc func(Any) (Any, Any)func BuildLazyEvaluator(evalFunc, initState Any) func() Any {retValChan := make(chan Any)loopFunc := func(){var actState Any = initStatevar retValfor {retVal, actState = evalFunc(actState)retValChan <- retVal}}retFunc := func() Any {return <- retValChan}go loopFunc()return retFunc}func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int {ef := BuildLazyIntEvaluator(evalFunc, initState)return func() int {return ef().(int)}}func main() {evenFunc := func(state Any) (Any, Any) {os := state.(int)ns := os + 2return os, ns}even := BuildLazyIntEvaluator(evenFunc, 0)for i := 0; i < 10; i++ {fmt.}}
6. 带缓冲的channel可以用于降频
|
|
- 链式协程,海量协程的创建和运行
|
|
并行化大量数据的计算
串行处理流水线
12345678func SerialProcessData(in <- chan *Data, out chan <- *Data) {for data := range in {tmpA := PreproceeData(data)tmpB := ProcessStepA(tmpA)tmpC := ProcessStepB(tmpB)out <- PostProcessData(tmpC)}}
并行计算
12345678910111213func ParallelProcessData(in <-chan *Data, out chan<- *Data) {// make channels:preOut := make(chan *Data, 100)stepAOut := make(chan *Data, 100)stepBOut := make(chan *Data, 100)stepCOut := make(chan *Data, 100)// start parallel computations:go PreprocessData(in, preOut)go ProcessStepA(preOut,StepAOut)go ProcessStepB(StepAOut,StepBOut)go ProcessStepC(StepBOut,StepCOut)go PostProcessData(StepCOut,out)}
8. 使用channel并发访问对象
|
|