欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

go语言 7 并发编程

程序员文章站 2022-03-21 15:53:19
文章由作者马志国在博客园的原创,若转载请于明显处标记出处:http://www.cnblogs.com/mazg/ 今天我们学习Go语言编程的第七章,并发编程。语言级别的支持并发编程是Go语言最大的优势和特色,所以这章是Go语言学习的重点和难点,当然内容也比较多。首先我们会介绍并发编程的相关概念,其 ......

 

文章由作者马志国在博客园的原创,若转载请于明显处标记出处:http://www.cnblogs.com/mazg/

对应的视频教程地址(腾讯课堂):https://mazhiguo.ke.qq.com/

对应的视频教程地址(网易云课堂):http://study.163.com/course/introduction/1004982006.htm

对应的视频教程地址(51cto):http://edu.51cto.com/course/12520.html

今天我们学习Go语言编程的第七章,并发编程。语言级别的支持并发编程是Go语言最大的优势和特色,所以这章是Go语言学习的重点和难点,当然内容也比较多。首先我们会介绍并发编程的相关概念,其次介绍Go语言中轻量级的线程,goroutine。最后学习goroutine之间的两种通信机制,一种是消息通信机制,channel。另外一种是共享内存的方式。

7.1 并发编程的相关概念

7.1.1 进程线程

在现代操作系统中,线程是CPU调度和分配的基本单位,进程则作为资源拥有的基本单位。每个进程是由私有的虚拟地址空间、代码、数据和其它各种系统资源组成,进程与进程之间是独立。线程是进程内部的一个执行单元。 每一个进程至少有一个主执行线程,这个主线程无需由用户去主动创建,是由系统自动创建的。 用户根据需要在应用程序中创建其它线程,多个线程并发地运行于同一个进程中,同一进程的不同线程可以共享进程内的资源。对于编程来讲,我们通常需要解决的问题是进程间的通信和线程间的同步。

7.1.2 并行与并发

并发与并行(Concurrency and Parallelism)是两个不同的概念,理解它们对于理解多线程模型非常重要。并发是指在一个时间段内有多个线程或进程在执行,但在某个时间点上只有一个在执行,多个线程或进程通过争抢CPU时间片轮流执行。并行是指一个任意时间点上都有多个线程或进程在执行。并发就像一个家长(cpu)在喂多个孩子(线程),轮换着每个孩子喂一口,表面上多个孩子都在吃饭。并行就像n个家长(cpu)在喂n个孩子(线程),这n个孩子同时都在吃饭。并行需要硬件支持,单核处理器只能是并发,多核处理器才能做到并行。

7.1.3 多线程与多核CPU

多核处理器是指在一个CPU处理器上集成多个运算核心从而提高计算能力,也就是有多个真正并行计算的处理核心,一般一个处理核心对应一个内核线程。例如,单核处理器对应一个内核线程,双核处理器对应两个内核线程,四核处理器对应四个内核线程。现在的电脑一般是双核四线程、四核八线程,是采用超线程技术将一个物理处理核心模拟成两个逻辑处理核心,对应两个内核线程,所以在操作系统中看到的CPU数量是实际物理CPU数量的两倍。

程序一般不会直接去使用内核线程,而是使用用户线程。用户线程与内核线程的对应关系有三种模型:一对一模型、多对一模型、多对多模型,在这以4个内核线程、3个用户线程为例对三种模型进行说明。

1 一对一模型(1:1

对于一对一模型来说,一个用户线程就唯一地对应一个内核线程(反过来不一定成立,一个内核线程不一定有对应的用户线程)。这样,如果CPU没有采用超线程技术(如四核四线程的计算机),一个用户线程就唯一地映射到一个物理CPU的线程,线程之间是并行处理的。而且一个线程因某种原因阻塞时,其他线程的执行不受影响。缺点是操作系统内核线程调度时,上下文切换的开销较大,导致用户线程的执行效率下降。

 

2 多对一模型(M:1

多对一模型将多个用户线程映射到一个内核线程上,线程之间的切换由用户态的代码来进行,因此相对一对一模型,多对一模型的线程切换速度要快许多;此外,多对一模型对用户线程的数量几乎无限制。但多对一模型也有两个缺点:1.如果其中一个用户线程阻塞,那么其它所有线程都将无法执行,因为此时内核线程也随之阻塞了;2.在多处理器系统上,处理器数量的增加对多对一模型的线程性能不会有明显的增加,因为所有的用户线程都映射到一个处理器上了。

3 多对多模型(M:N

多对多模型结合了一对一模型和多对一模型的优点,将多个用户线程映射到多个内核线程上。多对多模型的优点有:1.一个用户线程的阻塞不会导致所有线程的阻塞,因为此时还有别的内核线程可以被调度来执行;2.多对多模型对用户线程的数量没有限制;3.在多处理器的操作系统中,多对多模型的线程也能得到一定的性能提升。

 

7.2 goroutine

goroutineGo语言中的轻量级线程实现,实现了M : N的线程模型,由Go运行时(runtime)管理。与传统的系统级线程和进程相比,其最大优势在于其“轻量级”,因为goroutine使用的是动态栈,可以小到几k。所以,在一台服务器上可以轻松创建上百万个goroutine而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。

 当一个程序启动时,其主函数在一个单独的goroutine中运行,我们叫它main gorouine。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字gogo语句会使其语句中的函数在一个新创建的goroutine中运行,而go语句本身会迅速的完成。

package main

 

import (

    "fmt"

)

func main() {

  go fmt.Println("Hello")

  fmt.Println("World ")

}

上述的代码中,go fmt.Println("Hello")这条语句,会使得fmt.Println("Hello")函数在一个新创建的goroutine中运行,go语句迅速完成。然后执行fmt.Println("World ")输出结果会是以下三种情况:

1>World

  新的gorouine没有来的及执行,main gorouine 输出”World”后程序就直接退出了。

2>Word

Hello

main gorouine 输出”World”,新的gorouine输出”Hello”后,程序才退出。

3>Hello

world

  新的gorouine先输出了”Hello”,接着main gorouine 输出”World”后程序退出。

从以上案例得出的结论是,多个gorouine并发执行时的先后顺序是不确定的。如果希望确定的先输出Hello再输出world,我们可以这样修改代码:

package main

 

import (

    "fmt"

    "runtime"

)

 

func main() {

    go fmt.Println("Hello")

    runtime.Gosched() //出让时间片

    fmt.Println("World ")

 

}

或者

package main

 

import (

    "fmt"

    "time"

  )

 

func main() {

    go fmt.Println("Hello")

    time.Sleep(1) //主goroutine延时1毫秒,也会出让时间片

    fmt.Println("World ")

 

}

通过让goroutine出让时间片,可以使得新创建的goroutine先执行就可以达到目的。但是在工程中要解决的并发问题远不会这么简单。我们不但需要确定多个goroutine之间的执行顺序,还要能够在多个goroutine之间完成通信。两种最常见的并发通信模型模式是:消息通信和共享数据。 

7.3 channel

channelgoroutine之间的消息通信机制。channel是类型相关的。也就是说,一个channel只能传递一种类型的值,这个类型需要定义channel时指定。

使用内置的make函数,我们可以创建一个channel

chi := make(chan int)

    chs := make(chan string)

    chf := make(chan interface{})

map类似,channel也是一个对应make创建的底层数据结构的引用。当我们复制一个channel或用于函数的参数传递时,我们只是拷贝了一个channel引用。channel的零值也是nil

两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相通的对象,那么比较的结果为真。一个channel也可以和nil进行比较。

一个channel有发送和接受两个主要操作,都是使用<-运算符。一个不保存接收结果的接收操作也是合法的。

ch<-x   //发送

    x=<-ch  //接收

    <-ch    //接收操作,但不保存接收结果

channel还支持close操作,用于关闭channel,随后对该channel的任何发送操作都将导致panic异常。对一个已经关闭的channel接收数据,依然可以接收到已经成功发送的数据,如果没有的话,将接收到零值的数据。使用内置的close函数可以关闭channel

close(ch)

如果channel容量大于零,就是带缓存的channel

ch = make(chan int)

    ch = make(chan int,0)

    ch = make(chan int,5)//带缓存的channel

7.3.1 无缓存的channel

一个基于无缓存的channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作。同样,如果接收操作先发生,那么接收者gotoutine也将阻塞,直到有另一个goroutine在相同的channel上执行发送操作。基于无缓存channel的发送和接收操作将导致两个goroutine做一次同步操作,因此,无缓存的channel也称为同步channel。当通过一个无缓存channel发送数据时,接收者收到数据发生在唤醒发送这goroutine之前(happens before)

func Sum(arr []int, ch chan int) {

    sum := 0

    for _, v := range arr {

        sum += v

    }

    ch <- sum

}

 

func main() {

 

    arr := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

    c := make(chan int)

    go Sum(arr[:len(arr)/2], c)

    go Sum(arr[len(arr)/2:], c)

    x, y := <-c, <-c

    fmt.Println(x, y, x+y)

 

}

 

 

7.3.2 串联的channel(Pipeline)

channel可以将多个goroutine链接在一起,一个channel的输出作为下一个channel的输入。类似与进程间通信的管道。下面使用两个channel串联三个goroutine

 go语言 7 并发编程

 

第一个gorutine用于生成0123......整数序列,通过channel传递给第二个goroutine;第二个goroutine将收到的整数求平方,然后将结果通过第二个channel传递给第三个goroutine,第三个goroutine打印收到的每个结果。

package main

 

import (

    "fmt"

    "time"

)

 

func main() {

  num := make(chan int)

  sqr := make(chan int)

  //Counter

  go func() {

    for x := 0; ; x++ {

     num <- x

     time.Sleep(1 * time.Second) //增加这句话,才方便看到运行效果

    }

  }()

  // Squarer

  go func() {

    for {

      x := <-num

      sqr <- x * x

    }

  }()

    //Printer

  for {

    fmt.Println(<-sqr)

  }

}

如果我们希望通过channels只发送有限的数列如何处理?如果没有更多的值需要发送时,可以通过内置的close函数关闭channel。当一个channel被关闭后,再向该channel发送数据将导致panic异常。所以一般在数据发送方确定不在发送数据时,关闭channel

当一个已经关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。上面的num channel并不能终止循环,它依然会受到一个永无休止的零值序列,然后将它们发送给打印者goroutine

没有办法测试一个channel是否被关闭。但是接收操作有一个变体形式:多接收一个结果,多接收的第二个结果是一个布尔值oktrue表示成功从channels接收到值,false表示channel已经被关闭,并且里面没有值可以接收。我们可以这样修改接收数据并计算的goroutine

go func(){

  for{

    x,ok := <-num

    if !ok{

      break

     }

       sqr <- x * x

    }

    close(sqr )

}

 

上面的语法比较繁琐,Go语言的range循环可以直接在channel上迭代。最终的例子:

package main

 

import "fmt"

 

func main() {

    num := make(chan int)

    sqr := make(chan int)

 

    // Counter

    go func() {

        for x := 0; x < 100; x++ {

            num <- x

        }

        close(num)

    }()

 

    // Squarer

    go func() {

        for x := range num {

            sqr <- x * x

        }

        close(sqr)

    }()

 

    // Printer (在主goroutine中)

    for x := range sqr {

        fmt.Println(x)

    }

}

不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。

7.3.3 单向的Channel

随着代码量的增长,通常需要把代码按功能拆分成一个个相对独立的函数。每个函数在一个单独的goroutine中执行,使用channel作为参数进行通信。在函数内部,有的channel只接收数据,有的channel只发送数据,这时可以使用单向的channel来表达这种意图。

chan<-int表示只发送不接收;相反,类型<-chan int只接收不发送。箭头<-和关键字chan的相对位置表明了channel的方向。这种限制将在编译期检测。

将三个goroutine拆分为以下三个函数:

func counter(out chan<-  int)

    func squarer(out chan<- int,in <-chan int)

    func printer(in <-chan int)

因为关闭操作用于断言不再向channel发送新的数据,所有只有在发送者所在的goroutine才会调用close函数,因此对一个只接受的channel调用close函数将是一个编译错误。

package main

 

import "fmt"

 

func counter(out chan<- int) {

    for x := 0; x < 100; x++ {

        out <- x

    }

    close(out)

}

 

func squarer(out chan<- int, in <-chan int) {

    for v := range in {

        out <- v * v

    }

    close(out)

}

 

func printer(in <-chan int) {

    for v := range in {

        fmt.Println(v)

    }

}

 

func main() {

    num := make(chan int)

    sqr:= make(chan int)

 

    go counter(num )

    go squarer(sqr, num )

    printer(sqr)

}

任何双向的channel向单向的channel赋值都将导致隐式转换。但是不能反向转换。

7.3.4 带缓存的channel

带缓存的channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。下面语句创建了一个可以持有3个字符串元素的带缓存的channel

ch = make(chan string,3)

向缓存channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,发送操作将阻塞。如果channel是空的,接受操作将阻塞。通过缓存队列解耦了接收和发送的goroutine

cap函数可以获取channel内部缓存的容量。len函数可以获取channel内部缓存队列中有效元素的个数。多个goroutine并发的向同一个channel发送数据或从同一个channel接收数据都是常见的用法。

 

 

如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收数据而永远阻塞。这种情况,称为goroutines泄露,这将是一个bug。和垃圾变量不同,泄露的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出时总要的。

关于无缓存或带缓存channels之间的选择,或者带缓存channels的容量大小的选择,都可能影响程序的正确性。无缓存channel更强的保证了每个发送操作与接收操作的同步。但是对于带缓存channel是解耦的。

channel的缓存也可能影响程序的性能。

package cake

 

import (

    "fmt"

    "math/rand"

    "time"

)

 

type Shop struct {

    Verbose        bool

    Cakes          int           // number of cakes to bake

    BakeTime       time.Duration // time to bake one cake

    BakeStdDev     time.Duration // standard deviation of baking time

    BakeBuf        int           // buffer slots between baking and icing

    NumIcers       int           // number of cooks doing icing

    IceTime        time.Duration // time to ice one cake

    IceStdDev      time.Duration // standard deviation of icing time

    IceBuf         int           // buffer slots between icing and inscribing

    InscribeTime   time.Duration // time to inscribe one cake

    InscribeStdDev time.Duration // standard deviation of inscribing time

}

 

type cake int

 

func (s *Shop) baker(baked chan<- cake) {

    for i := 0; i < s.Cakes; i++ {

        c := cake(i)

        if s.Verbose {

            fmt.Println("baking", c)

        }

        work(s.BakeTime, s.BakeStdDev)

        baked <- c

    }

    close(baked)

}

 

func (s *Shop) icer(iced chan<- cake, baked <-chan cake) {

    for c := range baked {

        if s.Verbose {

            fmt.Println("icing", c)

        }

        work(s.IceTime, s.IceStdDev)

        iced <- c

    }

}

 

func (s *Shop) inscriber(iced <-chan cake) {

    for i := 0; i < s.Cakes; i++ {

        c := <-iced

        if s.Verbose {

            fmt.Println("inscribing", c)

        }

        work(s.InscribeTime, s.InscribeStdDev)

        if s.Verbose {

            fmt.Println("finished", c)

        }

    }

}

 

// Work runs the simulation 'runs' times.

func (s *Shop) Work(runs int) {

    for run := 0; run < runs; run++ {

        baked := make(chan cake, s.BakeBuf)

        iced := make(chan cake, s.IceBuf)

        go s.baker(baked)

        for i := 0; i < s.NumIcers; i++ {

            go s.icer(iced, baked)

        }

        s.inscriber(iced)

    }

}

 

// work blocks the calling goroutine for a period of time

// that is normally distributed around d

// with a standard deviation of stddev.

func work(d, stddev time.Duration) {

    delay := d + time.Duration(rand.NormFloat64()*float64(stddev))

    time.Sleep(delay)

}

这个包模拟了一个蛋糕店,可以通过不同的参数的调整。它还提供了对上面提到的几种场景提供对应的基准测试(11.4)

package cake_test

 

import (

    "testing"

    "time"

 

    "gopl.io/ch8/cake"

)

 

var defaults = cake.Shop{

    Verbose:      testing.Verbose(),

    Cakes:        20,

    BakeTime:     10 * time.Millisecond,

    NumIcers:     1,

    IceTime:      10 * time.Millisecond,

    InscribeTime: 10 * time.Millisecond,

}

 

func Benchmark(b *testing.B) {

    // Baseline: one baker, one icer, one inscriber.

    // Each step takes exactly 10ms.  No buffers.

    cakeshop := defaults

    cakeshop.Work(b.N) // 224 ms

}

 

func BenchmarkBuffers(b *testing.B) {

    // Adding buffers has no effect.//增加缓存没有影响

    cakeshop := defaults

    cakeshop.BakeBuf = 10

    cakeshop.IceBuf = 10

    cakeshop.Work(b.N) // 224 ms

}

 

func BenchmarkVariable(b *testing.B) {

    // Adding variability to rate of each step

    // increases total time due to channel delays.//通道的延时增加了总时间

    cakeshop := defaults

    cakeshop.BakeStdDev = cakeshop.BakeTime / 4

    cakeshop.IceStdDev = cakeshop.IceTime / 4

    cakeshop.InscribeStdDev = cakeshop.InscribeTime / 4

    cakeshop.Work(b.N) // 259 ms

}

 

func BenchmarkVariableBuffers(b *testing.B) {

    // Adding channel buffers reduces

    // delays resulting from variability. //使用带缓冲区的通道减少延时

    cakeshop := defaults

    cakeshop.BakeStdDev = cakeshop.BakeTime / 4

    cakeshop.IceStdDev = cakeshop.IceTime / 4

    cakeshop.InscribeStdDev = cakeshop.InscribeTime / 4

    cakeshop.BakeBuf = 10

    cakeshop.IceBuf = 10

    cakeshop.Work(b.N) // 244 ms

}

 

func BenchmarkSlowIcing(b *testing.B) {

    // Making the middle stage slower//增加中间环节的时间

    // adds directly to the critical path.

    cakeshop := defaults

    cakeshop.IceTime = 50 * time.Millisecond

    cakeshop.Work(b.N) // 1.032 s

}

 

func BenchmarkSlowIcingManyIcers(b *testing.B) {

    // Adding more icing cooks reduces the cost of icing //

    // to its sequential component, following Amdahl's Law.

    cakeshop := defaults

    cakeshop.IceTime = 50 * time.Millisecond

    cakeshop.NumIcers = 5

    cakeshop.Work(b.N) // 288ms

}

自己的理解:缓存不是越大越好,还要看发送和接收数据的goutine的速度,可以通过设置goroutine的数量调整发送和接收数据的速度。

7.3.5 并发的循环

并发的循环指的是在循环体内,通过go+匿名函数生成多个goroutine,如果在goroutine内用到外部外部函数的变量,不要直接使用,需要将外部变量作为匿名函数的参数传递,保证每个gotoutine运行不同的变量。匿名函数在一个新的goroutine中执行,每个goroutine执行时,i的值是不确定的。只有在调用函数时,通过参数传递才能确定函数内的值。

错误的演示:

func loopgo() {

    for i := 1; i < 10; i++ {

        go func() {

            fmt.Printf("第%d个 goroutine\n", i)

        }()

    }

}

 

正确的演示:

func loopgo() {

    for i := 1; i < 10; i++ {

        go func(i int) {

            fmt.Printf("第%d个 goroutine\n", i)

        }(i)

    }

}

 

 

7.3.6基于select的多路复用

Go语言直接在语言级别支持select关键字,用于处理异步IO问题 。select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:

select {

case <-chan1:

// 如果chan1成功读到数据,则进行该case处理语句

case chan2 <- 1:

// 如果成功向chan2写入数据,则进行该case处理语句

default:

// 如果上面都没有成功,则进入default处理流程

}

可以看出, select不像switch,后面并不带判断条件,而是直接去查看case语句。每个case语句都必须是一个面向channel的操作。比如上面的例子中,第一个case试图从chan1读取一个数据并直接忽略读到的数据,而第二个case则是试图向chan2中写入一个整型数1,如果这两者都没有成功,则到达default语句。
    Go语言没有提供直接的超时处理机制,但我们可以利用select机制。虽然select机制不是专为超时而设计的,却能很方便地解决超时问题。因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。
    基于此特性,我们来为channel实现超时机制:

// 首先,我们实现并执行一个匿名的超时等待函数

timeout := make(chan bool, 1)

go func() {

    time.Sleep(1e9) // 等待1秒钟

    timeout <- true

}()

// 然后我们把timeout这个channel利用起来

select {

case <-ch:

// 从ch中读取到数据

case <-timeout:

    // 一直没有从ch中读取到数据,但从timeout中读取到了数据

}

这样使用select机制可以避免永久等待的问题,因为程序会在timeout中获取到一个数据后继续执行,无论对ch