goroutine和channel是go语言的两大基石,这边主要来研究一下goroutine,channel可以查看这里

并发

发展演变

  • 多进程——-开销太大,都是基于内核的调用
  • 多线程——-相对开销小,但是远远达不到需求,最多并发1万这样
  • 基于回调的非阻塞/异步io—-共享内存式的同步异步,导致编程相当复杂
  • 协程—-轻量级线程,轻松达到100w的并发,使用成本低、消耗资源低、能效高

线程和协程的对比

  1. 内存占用

    创建一个goroutine不需要太多的内存 - 大概2KB左右的栈空间。如果需要更多的栈空间,就从堆里分配额外的空间来使用。新创建的线程会占用1MB的内存空间(这大约是goroutine的500倍)

  2. 创建和销毁的开销

    线程需要从操作系统里请求资源并在用完之后释放回去,因此创建和销毁线程的开销非常大。为了避免这些开销,我们通常的做法是维护一个线程池。Goroutine的创建和销毁是由运行环境(runtime)完成的。这些操作的开销就比较小。Go语言不支持手工管理goroutine。

  3. 切换开销

    当一个线程阻塞的时候,另外一个线程需要被调度到当前处理器上运行。线程的调度是抢占式的(preemptively)。当切换一个线程的时候,调度器需要保存/恢复所有的寄存器。这包括16个通用寄存器,程序指针(program counter),栈指针(stack pointer),段寄存器(segment registers)和16个XMM寄存器,浮点协处理器状态,16个AVX寄存器,所有的特殊模块寄存器(MSR)等。当在线程间快速切换的时候这些开销就变得非常大了。

    Goroutine的调度是协同合作式的(cooperatively)。不依赖操作系统和其提供的线程,golang自己实现的CSP并发模型实现:M, P, G。当切换goroutine的时候,调度器只需要保存和恢复三个寄存器 - 程序指针,栈指针和DX。切换的开销就小多了。

    前面已经谈到了,goroutine的数目会比线程多很多,但这并不影响切换的时间。有两个原因:第一,只有可以运行的goroutine才会被考虑,正在阻塞的goroutine会被忽略。第二,现代的调度器的复杂度都是O(1)的。这意味着选择的数目(线程或者是goroutine)不会影响切换的时间。

go协程

所以在协程的基础上,go支持在语言上实现协程并发goroutine,goroutine类似于进程,各个协程之间互不干涉,通过channel通信控制,这样减少了很多的复杂问题,goroutine的使用非常简单:

  • 关键字go—–只要在执行体前加上关键字go就能实现协程并发
  • go func(){}()加了()就不仅是类型定义,而且要实例化(需要绑定参数,就是最后一个括号中的参数)。

goroutine的模型来源

Goroutine,Go语言基于并发(并行)编程给出的自家的解决方案。goroutine是什么?通常goroutine会被当做coroutine(协程)的 golang实现,从比较粗浅的层面来看,这种认知也算是合理,但实际上,goroutine并非传统意义上的协程,现在主流的线程模型分三种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),传统的协程库属于用户级线程模型,而goroutine和它的Go Scheduler在底层实现上其实是属于两级线程模型,因此,有时候为了方便理解可以简单把goroutine类比成协程,但心里一定要有个清晰的认知 — goroutine并不等同于协程。

并发模型一代代地升级,有IO多路复用、多进程以及多线程,这几种模型都各有长短。多线程,因为其轻量和易用,成为并发编程中使用频率最高的并发模型,而后衍生的协程等其他子产品,也都基于它,而我们今天要分析的 goroutine 也是基于线程,因此,我们先来聊聊线程的三大模型:

线程的实现模型主要有3种,在上一段说过:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),它们之间最大的差异就在于用户线程与内核调度实体(KSE,Kernel Scheduling Entity)之间的对应关系上。而所谓的内核调度实体 KSE 就是指可以被操作系统内核调度器调度的对象实体。简单来说 KSE 就是内核级线程,是操作系统内核的最小调度单元,也就是我们写代码的时候通俗理解上的线程了

1、用户级线程模型(比如python的gevent)

用户线程与内核线程KSE是多对一(N : 1)的映射模型,多个用户线程的一般从属于单个进程并且多线程的调度是由用户自己的线程库来完成,线程的创建、销毁以及多线程之间的协调等操作都是由用户自己的线程库来负责而无须借助系统调用来实现。

这种线程模型并不能做到真正意义上的并发,假设在某个用户进程上的某个用户线程因为一个阻塞调用(比如I/O阻塞)而被CPU给中断(抢占式调度)了,那么该进程内的所有线程都被阻塞(因为单个用户进程内的线程自调度是没有CPU时钟中断的,从而没有轮转调度)

2、内核级线程模型(比如Java的java.lang.Thread、C++11的std::thread等等)

用户线程与内核线程KSE是一对一(1 : 1)的映射模型,也就是每一个用户线程绑定一个实际的内核线程,而线程的调度则完全交付给操作系统内核去做,应用程序对线程的创建、终止以及同步都基于内核提供的系统调用来完成,大部分编程语言的线程库(比如Java的java.lang.Thread、C++11的std::thread等等)都是对操作系统的线程(内核级线程)的一层封装,

3、两级线程模型

用户线程与内核KSE是多对多(N : M)的映射模型:,两级线程模型中的一个进程可以与多个内核线程KSE关联,也就是说一个进程内的多个线程可以分别绑定一个自己的KSE,这点和内核级线程模型相似;它的进程里的线程并不与KSE唯一绑定,而是可以多个用户线程映射到同一个KSE,当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行。goroutine的优势在于上下文切换在完全用户态进行,无需像线程一样频繁在用户态与内核态之间切换,节约了资源消耗。所以,两级线程模型靠(自身调度与系统调度协同工作),也就是 — 『薛定谔的模型』(误),因为这种模型的高度复杂性,操作系统内核开发者一般不会使用,所以更多时候是作为第三方库的形式出现,而Go语言中的runtime调度器就是采用的这种实现方案,实现了Goroutine与KSE之间的动态关联,不过Go语言的实现更加高级和优雅;

go的调度模型

G-P-M 模型概述

G: 表示Goroutine,每个Goroutine对应一个G结构体,G存储Goroutine的运行堆栈、状态以及任务函数,可重用。G并非执行体,每个G需要绑定到P才能被调度执行。每个Goroutine对象中的sched保存着其上下文信息

P: Processor,表示逻辑处理器, 对G来说,P相当于CPU核,G只有绑定到P(在P的local runq中)才能被调度。对M来说,P提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定,但是不论GOMAXPROCS设置为多大,P的数量最大为256。或者通过运行时调用函数runtime.GOMAXPROCS()进行设置。Processor数量固定意味着任意时刻只有固定数量的线程在运行go代码。Goroutine中就是我们要执行并发的代码。图中P正在执行的Goroutine为蓝色的;处于待执行状态的Goroutine为灰色的,灰色的Goroutine形成了一个队列runqueues

默认情况下,在任意时刻,只有一个Goroutine可以被调度执行。我们未来可能会将其设计的更加智能,但是目前,你必须通过 设置 GOMAXPROCS 环境变量或者导入 runtime 包并调用 runtime.GOMAXPROCS(NCPU) , 来告诉Go 的运行时系统最大并行执行的Goroutine数目。你可以通过 runtime.NumCPU() 获得当前运行系统的逻 辑核数,作为一个有用的参考。

M代表内核级线程,一个M就是一个线程,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。代表着真正执行计算的资源,在绑定有效的P后,进入schedule循环;而schedule循环的机制大致是从Global队列、P的Local队列以及wait队列中获取G,切换到G的执行栈上并执行G的函数,调用goexit做清理工作并回到M,如此反复。M并不保留G状态,这是G可以跨M调度的基础,M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致系统调度不过来,目前默认最大限制为10000个。对内核级线程的封装,数量对应真实的CPU数(真正干活的对象)

三者关系的宏观的图

work-stealing 的j均衡调度算法

  • 每个P维护一个G的本地队列;
  • 当一个G被创建出来,或者变为可执行状态时,就把他放到P的可执行队列中;
  • 当一个G在M里执行结束后,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行, M就随机选择另外一个P,从其可执行的G队列中取走一半。

当通过go关键字创建一个新的goroutine的时候,它会优先被放入P的本地队列。为了运行goroutine,M需要持有(绑定)一个P,接着M会启动一个OS线程,循环从P的本地队列里取出一个goroutine并执行。当然还有上文提及的 work-stealing调度算法:当M执行完了当前P的Local队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从Global队列寻找G来执行,如果Global队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。

goroutine调度模型的瓶颈

有基于G-P-M的Go调度器背书,go程序的并发编程中,可以任性地起大规模的goroutine来执行任务,官方也宣称用golang写并发程序的时候随便起个成千上万的goroutine毫无压力。然而,你起1000个goroutine没有问题,10000也没有问题,10w个可能也没问题;那,100w个呢?1000w个呢?

即便每个goroutine只分配2KB的内存,但如果是恐怖如斯的数量,聚少成多,内存暴涨,也会对GC造成极大的负担(gc在1.8之后去掉了STW(Stop The World)机制)

其实这个和golang本身并没有太大的关系,任何技术不加以控制,也是会奔溃的,一般我们都是使用池化技术来解决这类问题,具体可以看并发使用

相关问题

1.go为什么要实现自己的协程调度,而不用系统调度?

  • 线程较多时,开销较大。
  • OS 的调度,程序不可控。而 Go GC 需要停止所有的线程,使内存达到一致状态。

2.GM为啥不行?P有什么作用?

  • 是让我们可以直接放开其他线程,当遇到内核线程阻塞的时候。否则每个 P 都有一个队列,用来存正在执行的 G。避免 Global Sched Lock。
  • 每个 M 运行都需要一个 MCache 结构。M Pool 中通常有较多 M,但执行的只有几个,为每个池子中的每个 M 分配一个 MCache 则会形成不必要的浪费,通过把 cache 从 M 移到 P,每个运行的 M 都有关联的 P,这样只有运行的 M 才有自己的 MCache。

3.Goroutine vs OS thread 有什么区别?

  • 其实 goroutine 用到的就是线程池的技术,当 goroutine 需要执行时,会从 thread pool 中选出一个可用的 M 或者新建一个 M。而 thread pool 中如何选取线程,扩建线程,回收线程,Go Scheduler 进行了封装,对程序透明,只管调用就行,从而简化了 thread pool 的使用。

并发使用

简单并发

简单的并发,一般用于启动多协程来处理后端数据业务,类似于多线程处理数据,直接使用for循环并发一定数量的goroutine,然后对数据进行处理,如果是处理同一个数据,则需要使用锁,通过传递参数,还可以对业务进行分通道处理

我们没法控制goroutine产生数量,如果处理程序稍微耗时,在单机万级十万级qps请求下,goroutine大规模爆发,内存暴涨,处理效率会很快下降甚至引发程序崩溃。

它无法控制创建goroutine的数量。因为我们每分钟收到了一百万个POST请求,很快就奔溃了。

控制并发是必然的,也是防止大量资源被占用导致崩死的必须手段,当然并发的管理也是必要的,确保并发能够完整的执行,就要使用go-context

工作池

工作池+job队列 先启动一定数量的goroutine,使用channel,让当前goroutine处于阻塞状态,当有task往通道里传输,然后进行处理。

将请求放入队列,通过一定数量(例如CPU核心数)goroutine组成一个worker池(pool),workder池中的worker读取队列执行任务,这样可以处理百万级请求并且控制并发量不会崩溃。

  • 工作者工作协程,挂入调度器,取Job,执行Job,周而复始
  • 调度器,从Job队列取Job,分配给工作者,周而复始
  • web响应里,模拟了客户的请求-Job,并将此Job放入Job队列,只有有客户端请求,就周而复始的工作

工作池实现

首先,我们定义一个job的接口, 具体内容由具体job实现;

type Job interface {
    Do() error
}

然后定义一下job队列和work池类型,这里我们work池也用golang的channel实现。

// define job channel
type JobChan chan Job

// define worker channer
type WorkerChan chan JobChan

我们分别维护一个全局的job队列和工作池。

var (
    JobQueue          JobChan
    WorkerPool        WorkerChan
)

worker的实现。每一个worker都有一个job channel,在启动worker的时候会被注册到work pool中。启动后通过自身的job channel取到job并执行job。

type Worker struct {
    JobChannel JobChan
    quit       chan bool
}

func (w *Worker) Start() {
    go func() {
        for {
            // regist current job channel to worker pool
            WorkerPool <- w.JobChannel
            select {
            case job := <-w.JobChannel:
                if err := job.Do(); err != nil {
                    fmt.printf("excute job failed with err: %v", err)
                }
            // recieve quit event, stop worker
            case <-w.quit:
                return
            }
        }
    }()
}

实现一个分发器(Dispatcher)。分发器包含一个worker的指针数组,启动时实例化并启动最大数目的worker,然后从job队列中不断取job选择可用的worker来执行job。

type Dispatcher struct {
    quit    chan bool
}

func (d *Dispatcher) Run() {
    for i := 0; i < MaxWorkerPoolSize; i++ {
        worker := NewWorker()
        worker.Start()
    }

    for {
        select {
        case job := <-JobQueue:
            go func(job Job) {
                jobChan := <-WorkerPool
                jobChan <- job
            }(job)
        // stop dispatcher
        case <-d.quit:
            return
        }
    }
}

实例源码

package main

import (
    "net/http"
    "fmt"
)

type Job struct {
    request string
}

func (j *Job)Handle(){
    fmt.Println("test")
}

type worker struct {
    work  JobChan
    quit chan bool
}

func (w *worker)start(i int)  {
    fmt.Println("start worker:",i)
    go func(i int) {
        for {
            fmt.Println("add free worklist")
            workList <- w.work
            select {
            case Task := <- w.work:
                fmt.Println("worker",i,"handle job .....")
                Task.Handle()
                fmt.Println("worker",i,"handle over .....")
            case <- w.quit:
                return
            }
        }
    }(i)

}



type schedule struct {
    quit chan bool
}

func newWorker() *worker {
    workchan := make(chan Job,1)
    return &worker{work:workchan}
}


func (s *schedule)schedule() {
    workList = make(chan JobChan,10)
    fmt.Println("start pool")
    for i := 0; i < 10; i++ {
        w := newWorker()
        w.start(i)
    }

    for {
        fmt.Println("get task and get worker")
        select {
        case job := <-queue:
            go func(job Job) {
                fmt.Println("get worker")
                jobChan := <-workList
                fmt.Println("insert task into job")
                jobChan <- job
            }(job)
            // stop dispatcher
        case <-s.quit:
            return
        }
    }


}

//define type queue and work
type JobChan chan Job
type WorkChan chan JobChan

//任务队列
var queue JobChan
//工作池用队列实现
var workList WorkChan

func newschedule() schedule  {
    fmt.Println("newschedule")
    return schedule{}
}


func init(){
    s := newschedule()
    go s.schedule()
}

func main()  {
    fmt.Println("main")
    queue = make(chan Job,1024)

    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        job := Job{"test"}
        queue <- job
    })


    fmt.Println("start sueccess and listen at 9000!!")
    http.ListenAndServe("localhost:9000",nil)
}

异步

异步处理并不算一种并发的使用方式,但是却是并发中经常使用的,在工作池的基础上使用goroutine处理,但是不用等返回,留一个channenl返回,使用select读取channel中的数据,完成处理,这样可以加大处理的速度,也就提高了并发能力。