Go语言编程(2)

《Go语言编程》阅读笔记(2)

并发编程

并发基础

  • 每个进程运行时都有自己的调用栈和堆(有一个完整的上下文),OS在调度进程时会保存被调度进程的上下文,该进程获取时间片后再恢复其上下文
  • 多个进程是可以并发的,主流实现方式如下:
    • 多进程:OS层面的并发模式,所有进程由内核管理,因此开销大
    • 多线程:系统层面的并发模式
    • 基于回调的非阻塞/异步IO:多线程模式容易耗尽服务器的内存和CPU,该模式对流程做了分割,通过事件驱动的方式异步使用IO,尽可能地少用线程
    • 协程:本质上是用户态线程,不需要OS做抢占式调度,实现中寄存于线程

协程coroutine

  • “轻量级线程”

goroutine

  • 举例:让函数func add(x, y int)并发执行,则go add(1, 1)
  • 被调用的函数返回时,该goroutine自动结束——如果函数有返回值,则返回值被丢弃
  • 当main函数返回时,程序退出,并且不会等待其他goroutine结束——需要实现goroutine之间的通信

并发通信

  • 工程上两种常见的并发通信模型:
    • 数据共享:多个单元分别保持对同一个数据(内存数据、磁盘文件等)的引用,C通常使用此方法
    • 消息:认为每个并发单元是独立的个体并有独立变量,不同单元间变量不共享,不同单元之间通过消息通信(作为输入和输出)
  • go的消息通信机制称为channel

channel

  • 用channel在多个goroutine之间传递消息,通过channel传递对象的过程和函数的参数传递类似

  • 跨进程通信建议使用分布式系统的方法,例如使用Socket协议或HTTP协议

  • 一个channel只能传递一种类型的值,需要声明时指定

    image-20220420160825385
    • 定义10个channel的数组,每个channel给10个goroutine
    • goroutine内部通过ch <- 1写入数据,该channel被读取前该操作被阻塞
    • 所有goroutine启动后,通过<-ch从10个channel依次读取数据,对应channel写入数据前该操作同样被阻塞
  • channel语法:

    • 一般的声明形式:var channame chan ElementType,ElementType指定该channel传递的元素类型,例如var ch chan intvar m map[string] chan bool

    • 定义channel:ch := make(chan int, 1)——定义有一个缓冲区的chan,此时向ch发送第一个数据时主协程不会退出

    • 数据写入/发送给channel:ch <- value,数据写入会阻塞程序,直到其他goroutine从channel读取:value := <-ch

    • 若channel中没有写过数据,从channel中读取数据也会阻塞程序

    • select:类似switch,但每个case必须是I/O操作(第一个case从chan1中读取数据并忽略)

      image-20220420162518647
      • select语句不使用default分支时,处于阻塞状态直到其中一个channel的收/发操作准备就绪。如果同时有多个channel的收/发操作准备就绪则随机选择其中一个
      • select语句使用default分支时,处于非阻塞状态,从所有准备就绪(或者channel关闭或者缓冲区有值)的channel中随机选择其中一个。如果处于阻塞状态,则执行default分支
  • 缓冲机制:创建要给带缓冲的channel:ch := make(chan int, 1024),将缓冲区大小作为第二个参数传入,在缓冲区装满前都不会阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    //遍历缓冲区,这里range只有一个返回值
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 5
    for value := range ch {
    fmt.Println(value)
    }
  • 超时机制:可能存在通讯锁死的问题;go没有直接的超时处理机制,但可以利用select:此时如果ch为空,则程序会在1s后从timeout中读取一个数据,继续执行(go关键字表明,该函数会在另一个协程里执行)

    image-20220420170736926
  • channel的传递(这里没有看懂)

    • channel在定义后可以通过channel传递,类似于pipe(handle函数的输入为一系列PipeData,实现流式处理数据的目的)

      image-20220420171901935image-20220420171919612

      image-20220420171936974
  • 单向channel:只能用于发送或接收数据——将一个channel送入函数时,可指定其为单向channel,从而限制函数对该channel的操作

    image-20220420172412886 image-20220420172412886
  • 关闭channel:

    • close(ch)即可
    • 判断一个channel是否已经关闭:x, ok := <-ch,检查ok是否为false,为false表明已经关闭

多核并行

  • 当前Go的编译器不能发现和利用多核的优势,虽然可以创建多个goroutine,但实际上所有goroutine运行在同一个CPU核心上

  • 通过设置环境变量GOMAXPROCS控制使用CPU核心数目:在启动goroutine前,runtime.GOMAXPROCS(16)来设置CPU核心数目

  • 通过runtime.Gosched()控制goroutine出让时间片给其他goroutine

同步

  • 同步锁

    • sync包提供两种锁:
      • Mutex:当一个goroutine获得Mutex后,其他goroutine只能等待其释放Mutex
      • RWMutex:单写多读,读锁占用时会组织写但不阻止读
    • RLock():读锁;Lock():写锁
  • 全局唯一性操作:全局角度只运行一次的代码

    • once类型

      image-20220420185336409
    • sync包还提供atomic子包,提供对一些基础数据类型的原子操作函数,例如func CompareAndSwapUint64,比较和交换两个uint64数据,此时无需专门添加Lock操作

举例

  • 等价二叉查找树(判定给定的两个树中序遍历结果是否相同):tree.New(k)用于构造一个随机结构的已排序的二叉查找树,其中值为k,2k,3k,...,10k

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package main

    import "golang.org/x/tour/tree"
    import "fmt"

    // Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。
    func Walk(t *tree.Tree, ch chan int) {
    if t.Left != nil {
    Walk(t.Left, ch)
    }
    ch <- t.Value
    if t.Right != nil {
    Walk(t.Right, ch)
    }
    }

    // Same 检测树 t1 和 t2 是否有相同的中序遍历结果
    func Same(t1, t2 *tree.Tree) bool {
    ch1 := make(chan int,10)
    ch2 := make(chan int 10)
    go Walk(t1, ch1)
    go Walk(t2, ch2)
    for i := 0; i < 10; i++ {
    n1, n2 := <-ch1, <-ch2
    if n1 != n2 {
    return false
    }
    }
    return true
    }

    func main() {
    same := Same(tree.New(1), tree.New(1))
    if same {
    fmt.Println("PASS : they are same")
    } else {
    fmt.Println("FAIL : they should be same.")
    }

    same = Same(tree.New(1), tree.New(2))
    if !same {
    fmt.Println("PASS : they are not same")
    } else {
    fmt.Println("FAIL : they should be not same")
    }
    }
    // 正确运行结果都应该是Pass
  • 通过sync.Mutex互斥锁(Lock()和Unlock()),保证每次只有一个goroutine访问一个共享的变量,从而避免冲突,并且利用defer保证互斥锁一定被解锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package main

    import (
    "fmt"
    "sync"
    "time"
    )

    // SafeCounter 的并发使用是安全的。
    type SafeCounter struct {
    v map[string]int
    mux sync.Mutex
    }

    // Inc 增加给定 key 的计数器的值。
    func (c *SafeCounter) Inc(key string) {
    c.mux.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    c.v[key]++
    c.mux.Unlock()
    }

    // Value 返回给定 key 的计数器的当前值。
    func (c *SafeCounter) Value(key string) int {
    c.mux.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    defer c.mux.Unlock()
    return c.v[key]
    }

    func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
    go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey")) // 1000
    }