http包提供了HTTP协议的客户端和服务端的实现。
HTTP客户端
直接使用http方法
直接使用http方法,其实就是使用标准库默认的结构体client,transport等来实现请求。
http包中封装了Get、Head、Post和PostForm函数可以直接发出HTTP/ HTTPS请求。
resp, err := http.Get("http://example.com/")
...
resp, err := http.Post("http://example.com/upload", "image/jpeg", &buf)
...
resp, err := http.PostForm("http://example.com/form",
url.Values{"key": {"Value"}, "id": {"123"}})
程序在使用完回复后必须关闭回复的主体。
resp, err := http.Get("http://example.com/")
if err != nil {
// handle error
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
// ...
原理解析
http直接提供的Post等方法实现在client.go文件中,以Post为例,其他都是一样
func Post(url, contentType string, body io.Reader) (resp *Response, err error) {
return DefaultClient.Post(url, contentType, body)
}
实际是调用了默认结构体client的Post方法
var DefaultClient = &Client{}
再来看Post方法
func (c *Client) Post(url, contentType string, body io.Reader) (resp *Response, err error) {
req, err := NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
return c.Do(req)
}
根据url和请求体body新建一个reqest,然后调用DefaultClient的Do方法
func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}
调用内部的do方法
func (c *Client) do(req *Request) (retres *Response, reterr error) {
if testHookClientDoResult != nil {
defer func() { testHookClientDoResult(retres, reterr) }()
}
if req.URL == nil {
req.closeBody()
return nil, &url.Error{
Op: urlErrorOp(req.Method),
Err: errors.New("http: nil Request.URL"),
}
}
var (
deadline = c.deadline()
reqs []*Request
resp *Response
copyHeaders = c.makeHeadersCopier(req)
reqBodyClosed = false // have we closed the current req.Body?
// Redirect behavior:
redirectMethod string
includeBody bool
)
uerr := func(err error) error {
// the body may have been closed already by c.send()
if !reqBodyClosed {
req.closeBody()
}
var urlStr string
if resp != nil && resp.Request != nil {
urlStr = stripPassword(resp.Request.URL)
} else {
urlStr = stripPassword(req.URL)
}
return &url.Error{
Op: urlErrorOp(reqs[0].Method),
URL: urlStr,
Err: err,
}
}
for {
// For all but the first request, create the next
// request hop and replace req.
if len(reqs) > 0 {
loc := resp.Header.Get("Location")
if loc == "" {
resp.closeBody()
return nil, uerr(fmt.Errorf("%d response missing Location header", resp.StatusCode))
}
u, err := req.URL.Parse(loc)
if err != nil {
resp.closeBody()
return nil, uerr(fmt.Errorf("failed to parse Location header %q: %v", loc, err))
}
host := ""
if req.Host != "" && req.Host != req.URL.Host {
// If the caller specified a custom Host header and the
// redirect location is relative, preserve the Host header
// through the redirect. See issue #22233.
if u, _ := url.Parse(loc); u != nil && !u.IsAbs() {
host = req.Host
}
}
ireq := reqs[0]
req = &Request{
Method: redirectMethod,
Response: resp,
URL: u,
Header: make(Header),
Host: host,
Cancel: ireq.Cancel,
ctx: ireq.ctx,
}
if includeBody && ireq.GetBody != nil {
req.Body, err = ireq.GetBody()
if err != nil {
resp.closeBody()
return nil, uerr(err)
}
req.ContentLength = ireq.ContentLength
}
// Copy original headers before setting the Referer,
// in case the user set Referer on their first request.
// If they really want to override, they can do it in
// their CheckRedirect func.
copyHeaders(req)
// Add the Referer header from the most recent
// request URL to the new one, if it's not https->http:
if ref := refererForURL(reqs[len(reqs)-1].URL, req.URL); ref != "" {
req.Header.Set("Referer", ref)
}
err = c.checkRedirect(req, reqs)
// Sentinel error to let users select the
// previous response, without closing its
// body. See Issue 10069.
if err == ErrUseLastResponse {
return resp, nil
}
// Close the previous response's body. But
// read at least some of the body so if it's
// small the underlying TCP connection will be
// re-used. No need to check for errors: if it
// fails, the Transport won't reuse it anyway.
const maxBodySlurpSize = 2 << 10
if resp.ContentLength == -1 || resp.ContentLength <= maxBodySlurpSize {
io.CopyN(ioutil.Discard, resp.Body, maxBodySlurpSize)
}
resp.Body.Close()
if err != nil {
// Special case for Go 1 compatibility: return both the response
// and an error if the CheckRedirect function failed.
// See https://golang.org/issue/3795
// The resp.Body has already been closed.
ue := uerr(err)
ue.(*url.Error).URL = loc
return resp, ue
}
}
reqs = append(reqs, req)
var err error
var didTimeout func() bool
//调用 send
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
// TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancelation/
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}
}
调用send方法
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}
这边需要确定实现transport的结构体
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
使用默认的DefaultTransport(如果transport自定义了,就使用自定义的,否则使用默认的),这边这个接口调用就是DefaultTransport,也就是Transport.go中的Transport结构体
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
再看Transport结构体
type Transport struct {
idleMu sync.Mutex
wantIdle bool // user has requested to close all idle conns
idleConn map[connectMethodKey][]*persistConn // most recently used at end
idleConnCh map[connectMethodKey]chan *persistConn
idleLRU connLRU
reqMu sync.Mutex
reqCanceler map[*Request]func(error)
altMu sync.Mutex // guards changing altProto only
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
connCountMu sync.Mutex
connPerHostCount map[connectMethodKey]int
connPerHostAvailable map[connectMethodKey]chan struct{}
// Proxy specifies a function to return a proxy for a given
// Request. If the function returns a non-nil error, the
// request is aborted with the provided error.
//
// The proxy type is determined by the URL scheme. "http",
// "https", and "socks5" are supported. If the scheme is empty,
// "http" is assumed.
//
// If Proxy is nil or returns a nil *URL, no proxy is used.
Proxy func(*Request) (*url.URL, error)
// DialContext specifies the dial function for creating unencrypted TCP connections.
// If DialContext is nil (and the deprecated Dial below is also nil),
// then the transport dials using package net.
//
// DialContext runs concurrently with calls to RoundTrip.
// A RoundTrip call that initiates a dial may end up using
// a connection dialed previously when the earlier connection
// becomes idle before the later DialContext completes.
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
// Dial specifies the dial function for creating unencrypted TCP connections.
//
// Dial runs concurrently with calls to RoundTrip.
// A RoundTrip call that initiates a dial may end up using
// a connection dialed previously when the earlier connection
// becomes idle before the later Dial completes.
//
// Deprecated: Use DialContext instead, which allows the transport
// to cancel dials as soon as they are no longer needed.
// If both are set, DialContext takes priority.
Dial func(network, addr string) (net.Conn, error)
// DialTLS specifies an optional dial function for creating
// TLS connections for non-proxied HTTPS requests.
//
// If DialTLS is nil, Dial and TLSClientConfig are used.
//
// If DialTLS is set, the Dial hook is not used for HTTPS
// requests and the TLSClientConfig and TLSHandshakeTimeout
// are ignored. The returned net.Conn is assumed to already be
// past the TLS handshake.
DialTLS func(network, addr string) (net.Conn, error)
// TLSClientConfig specifies the TLS configuration to use with
// tls.Client.
// If nil, the default configuration is used.
// If non-nil, HTTP/2 support may not be enabled by default.
TLSClientConfig *tls.Config
// TLSHandshakeTimeout specifies the maximum amount of time waiting to
// wait for a TLS handshake. Zero means no timeout.
TLSHandshakeTimeout time.Duration
// DisableKeepAlives, if true, disables HTTP keep-alives and
// will only use the connection to the server for a single
// HTTP request.
//
// This is unrelated to the similarly named TCP keep-alives.
DisableKeepAlives bool
// DisableCompression, if true, prevents the Transport from
// requesting compression with an "Accept-Encoding: gzip"
// request header when the Request contains no existing
// Accept-Encoding value. If the Transport requests gzip on
// its own and gets a gzipped response, it's transparently
// decoded in the Response.Body. However, if the user
// explicitly requested gzip it is not automatically
// uncompressed.
DisableCompression bool
// MaxIdleConns controls the maximum number of idle (keep-alive)
// connections across all hosts. Zero means no limit.
MaxIdleConns int
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
// (keep-alive) connections to keep per-host. If zero,
// DefaultMaxIdleConnsPerHost is used.
MaxIdleConnsPerHost int
// MaxConnsPerHost optionally limits the total number of
// connections per host, including connections in the dialing,
// active, and idle states. On limit violation, dials will block.
//
// Zero means no limit.
//
// For HTTP/2, this currently only controls the number of new
// connections being created at a time, instead of the total
// number. In practice, hosts using HTTP/2 only have about one
// idle connection, though.
MaxConnsPerHost int
// IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
// Zero means no limit.
IdleConnTimeout time.Duration
// ResponseHeaderTimeout, if non-zero, specifies the amount of
// time to wait for a server's response headers after fully
// writing the request (including its body, if any). This
// time does not include the time to read the response body.
ResponseHeaderTimeout time.Duration
// ExpectContinueTimeout, if non-zero, specifies the amount of
// time to wait for a server's first response headers after fully
// writing the request headers if the request has an
// "Expect: 100-continue" header. Zero means no timeout and
// causes the body to be sent immediately, without
// waiting for the server to approve.
// This time does not include the time to send the request header.
ExpectContinueTimeout time.Duration
// TLSNextProto specifies how the Transport switches to an
// alternate protocol (such as HTTP/2) after a TLS NPN/ALPN
// protocol negotiation. If Transport dials an TLS connection
// with a non-empty protocol name and TLSNextProto contains a
// map entry for that key (such as "h2"), then the func is
// called with the request's authority (such as "example.com"
// or "example.com:1234") and the TLS connection. The function
// must return a RoundTripper that then handles the request.
// If TLSNextProto is not nil, HTTP/2 support is not enabled
// automatically.
TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
// ProxyConnectHeader optionally specifies headers to send to
// proxies during CONNECT requests.
ProxyConnectHeader Header
// MaxResponseHeaderBytes specifies a limit on how many
// response bytes are allowed in the server's response
// header.
//
// Zero means to use a default limit.
MaxResponseHeaderBytes int64
// nextProtoOnce guards initialization of TLSNextProto and
// h2transport (via onceSetNextProtoDefaults)
nextProtoOnce sync.Once
h2transport h2Transport // non-nil if http2 wired up
}
中文讲解
type Transport struct {
// Proxy指定一个对给定请求返回代理的函数。
// 如果该函数返回了非nil的错误值,请求的执行就会中断并返回该错误。
// 如果Proxy为nil或返回nil的*URL置,将不使用代理。
Proxy func(*Request) (*url.URL, error)
// Dial指定创建TCP连接的拨号函数。如果Dial为nil,会使用net.Dial。
//Dial获取一个tcp 连接,也就是net.Conn结构,你就记住可以往里面写request
//然后从里面搞到response就行了
Dial func(network, addr string) (net.Conn, error)
// TLSClientConfig指定用于tls.Client的TLS配置信息。
// 如果该字段为nil,会使用默认的配置信息。
TLSClientConfig *tls.Config
// TLSHandshakeTimeout指定等待TLS握手完成的最长时间。零值表示不设置超时。
TLSHandshakeTimeout time.Duration
// 如果DisableKeepAlives为真,会禁止不同HTTP请求之间TCP连接的重用。
DisableKeepAlives bool
// 如果DisableCompression为真,会禁止Transport在请求中没有Accept-Encoding头时,
// 主动添加"Accept-Encoding: gzip"头,以获取压缩数据。
// 如果Transport自己请求gzip并得到了压缩后的回复,它会主动解压缩回复的主体。
// 但如果用户显式的请求gzip压缩数据,Transport是不会主动解压缩的。
DisableCompression bool
// 如果MaxIdleConnsPerHost!=0,会控制每个主机下的最大闲置连接。
// 如果MaxIdleConnsPerHost==0,会使用DefaultMaxIdleConnsPerHost。
MaxIdleConnsPerHost int
// ResponseHeaderTimeout指定在发送完请求(包括其可能的主体)之后,
// 等待接收服务端的回复的头域的最大时间。零值表示不设置超时。
// 该时间不包括获取回复主体的时间。
ResponseHeaderTimeout time.Duration
// 内含隐藏或非导出字段
//保存从 connectMethodKey (代表着不同的协议 不同的host,也就是不同的请求)到 persistConn 的映射
idleConn map[connectMethodKey][]*persistConn // most recently used at end
//用来在并发http请求的时候在多个 goroutine 里面相互发送持久连接,也就是说, 这些持久连接是可以重复利用的, 你的http请求用某个persistConn用完了,通过这个channel发送给其他http请求使用这个persistConn,然后我们找到transport的RoundTrip方法
idleConnCh map[connectMethodKey]chan *persistConn
}
调用内部send方法
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
req := ireq // req is either the original request, or a modified fork
if rt == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: no Client.Transport or DefaultTransport")
}
if req.URL == nil {
req.closeBody()
return nil, alwaysFalse, errors.New("http: nil Request.URL")
}
if req.RequestURI != "" {
req.closeBody()
return nil, alwaysFalse, errors.New("http: Request.RequestURI can't be set in client requests.")
}
// forkReq forks req into a shallow clone of ireq the first
// time it's called.
forkReq := func() {
if ireq == req {
req = new(Request)
*req = *ireq // shallow clone
}
}
// Most the callers of send (Get, Post, et al) don't need
// Headers, leaving it uninitialized. We guarantee to the
// Transport that this has been initialized, though.
if req.Header == nil {
forkReq()
req.Header = make(Header)
}
if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
username := u.Username()
password, _ := u.Password()
forkReq()
req.Header = ireq.Header.clone()
req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
}
if !deadline.IsZero() {
forkReq()
}
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
resp, err = rt.RoundTrip(req)
if err != nil {
stopTimer()
if resp != nil {
log.Printf("RoundTripper returned a response & error; ignoring response")
}
if tlsErr, ok := err.(tls.RecordHeaderError); ok {
// If we get a bad TLS record header, check to see if the
// response looks like HTTP and give a more helpful error.
// See golang.org/issue/11111.
if string(tlsErr.RecordHeader[:]) == "HTTP/" {
err = errors.New("http: server gave HTTP response to HTTPS client")
}
}
return nil, didTimeout, err
}
if !deadline.IsZero() {
resp.Body = &cancelTimerBody{
stop: stopTimer,
rc: resp.Body,
reqDidTimeout: didTimeout,
}
}
return resp, nil, nil
}
调用DefaultTransport也就是Transport.go中的Transport结构体的RoundTrip方法(当出现自定义的时候,就调用对应的Transport的RoundTrip方法,这边直接使用这个借口就是DefaultTransport),可见使用golang net/http库发送http请求,最后都是调用 http transport的 RoundTrip方法。
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
if t.useRegisteredProtocol(req) {
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
if altRT := altProto[scheme]; altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
}
}
if !isHTTP {
req.closeBody()
return nil, &badStringError{"unsupported protocol scheme", scheme}
}
if req.Method != "" && !validMethod(req.Method) {
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host
t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
return resp, nil
}
if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()
// Rewind the body if we're able to.
if req.GetBody != nil {
newReq := *req
var err error
newReq.Body, err = req.GetBody()
if err != nil {
return nil, err
}
req = &newReq
}
}
}
前面对输入的错误处理部分我们忽略, 其实就2步,先获取一个TCP长连接,所谓TCP长连接就是三次握手建立连接后不close而是一直保持重复使用(节约环保) 然后调用这个持久连接persistConn 这个struct的roundTrip方法。我们先看获取连接
func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) {
if pc := t.getIdleConn(cm); pc != nil {
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(req, func() {})
return pc, nil
}
type dialRes struct {
pc *persistConn
err error
}
dialc := make(chan dialRes)
//定义了一个发送 persistConn的channel
prePendingDial := prePendingDial
postPendingDial := postPendingDial
handlePendingDial := func() {
if prePendingDial != nil {
prePendingDial()
}
go func() {
if v := <-dialc; v.err == nil {
t.putIdleConn(v.pc)
}
if postPendingDial != nil {
postPendingDial()
}
}()
}
cancelc := make(chan struct{})
t.setReqCanceler(req, func() { close(cancelc) })
// 启动了一个goroutine, 这个goroutine 获取里面调用dialConn搞到
// persistConn, 然后发送到上面建立的channel dialc里面,
go func() {
pc, err := t.dialConn(cm)
dialc <- dialRes{pc, err}
}()
idleConnCh := t.getIdleConnCh(cm)
select {
case v := <-dialc:
// dialc 我们的 dial 方法先搞到通过 dialc通道发过来了
return v.pc, v.err
case pc := <-idleConnCh:
// 这里代表其他的http请求用完了归还的persistConn通过idleConnCh这个
// channel发送来的
handlePendingDial()
return pc, nil
case <-req.Cancel:
handlePendingDial()
return nil, errors.New("net/http: request canceled while waiting for connection")
case <-cancelc:
handlePendingDial()
return nil, errors.New("net/http: request canceled while waiting for connection")
}
}
这里面的代码写的很有讲究 , 上面代码里面我也注释了, 定义了一个发送 persistConn的channel dialc, 启动了一个goroutine, 这个goroutine 获取里面调用dialConn搞到persistConn, 然后发送到dialc里面,主协程goroutine在 select里面监听多个channel,看看哪个通道里面先发过来 persistConn,就用哪个,然后return。
这里要注意的是 idleConnCh 这个通道里面发送来的是其他的http请求用完了归还的persistConn, 如果从这个通道里面搞到了,dialc这个通道也等着发呢,不能浪费,就通过handlePendingDial这个方法把dialc通道里面的persistConn也发到idleConnCh,等待后续给其他http请求使用。
每个新建的persistConn的时候都把tcp连接里地输入流,和输出流用br(br *bufio.Reader),和bw(bw *bufio.Writer)包装了一下,往bw写就写到tcp输入流里面了,读输出流也是通过br读,并启动了读循环和写循环
pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
pconn.bw = bufio.NewWriter(pconn.conn)
go pconn.readLoop()
go pconn.writeLoop()
我们再看pconn.roundTrip 调用这个持久连接persistConn 这个struct的roundTrip方法。先瞄一下 persistConn 这个struct
type persistConn struct {
t *Transport
cacheKey connectMethodKey
conn net.Conn
tlsState *tls.ConnectionState
br *bufio.Reader // 从tcp输出流里面读
sawEOF bool // whether we've seen EOF from conn; owned by readLoop
bw *bufio.Writer // 写到tcp输入流
reqch chan requestAndChan // 主goroutine 往channnel里面写,读循环从
// channnel里面接受
writech chan writeRequest // 主goroutine 往channnel里面写
// 写循环从channel里面接受
closech chan struct{} // 通知关闭tcp连接的channel
writeErrCh chan error
lk sync.Mutex // guards following fields
numExpectedResponses int
closed bool // whether conn has been closed
broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
mutateHeaderFunc func(Header)
}
里面是各种channel, 用的是出神入化, 各位要好好理解一下,这里有三个goroutine,有两个channel writeRequest 和 requestAndChan
type writeRequest struct {
req *transportRequest
ch chan<- error
}
主goroutine 往writeRequest里面写,写循环从writeRequest里面接受
type responseAndError struct {
res *Response
err error
}
type requestAndChan struct {
req *Request
ch chan responseAndError
addedGzip bool
}
主goroutine 往requestAndChan里面写,读循环从requestAndChan里面接受。
注意这里的channel都是双向channel,也就是channel 的struct里面有一个chan类型的字段, 比如 reqch chan requestAndChan 这里的 requestAndChan 里面的 ch chan responseAndError。
这个是很牛叉,主 goroutine 通过 reqch 发送requestAndChan 给读循环,然后读循环搞到response后通过 requestAndChan 里面的通道responseAndError把response返给主goroutine,所以我画了一个双向箭头。
我们研究一下代码,我理解下来其实就是三个goroutine通过channel互相协作的过程。
主循环:
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
... 忽略
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh}
//把request发送给写循环
resc := make(chan responseAndError, 1)
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
//发送给读循环
var re responseAndError
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
WaitResponse:
for {
select {
case err := <-writeErrCh:
if isNetWriteError(err) {
//写循环通过这个channel报告错误
select {
case re = <-resc:
pc.close()
break WaitResponse
case <-time.After(50 * time.Millisecond):
// Fall through.
}
}
if err != nil {
re = responseAndError{nil, err}
pc.close()
break WaitResponse
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
case <-pc.closech:
// 如果长连接挂了, 这里的channel有数据, 进入这个case, 进行处理
select {
case re = <-resc:
if fn := testHookPersistConnClosedGotRes; fn != nil {
fn()
}
default:
re = responseAndError{err: errClosed}
if pc.isCanceled() {
re = responseAndError{err: errRequestCanceled}
}
}
break WaitResponse
case <-respHeaderTimer:
pc.close()
re = responseAndError{err: errTimeout}
break WaitResponse
// 如果timeout,这里的channel有数据, break掉for循环
case re = <-resc:
break WaitResponse
// 获取到读循环的response, break掉 for循环
case <-cancelChan:
pc.t.CancelRequest(req.Request)
cancelChan = nil
}
}
if re.err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
return re.res, re.err
}
这段代码主要就干了三件事
主goroutine ->requestAndChan -> 读循环goroutine
主goroutine ->writeRequest-> 写循环goroutine
主goroutine 通过select 监听各个channel上的数据, 比如请求取消, timeout,长连接挂了,写流出错,读流出错, 都是其他goroutine 发送过来的, 跟中断一样,然后相应处理,上面也提到了,有些channel是主goroutine通过channel发送给其他goroutine的struct里面包含的channel, 比如 case err := <-writeErrCh: case re = <-resc:
读循环代码:
func (pc *persistConn) readLoop() {
... 忽略
alive := true
for alive {
... 忽略
rc := <-pc.reqch
var resp *Response
if err == nil {
resp, err = ReadResponse(pc.br, rc.req)
if err == nil && resp.StatusCode == 100 {
//100 Continue 初始的请求已经接受,客户应当继续发送请求的其
// 余部分
resp, err = ReadResponse(pc.br, rc.req)
// 读pc.br(tcp输出流)中的数据,这里的代码在response里面
//解析statusCode,头字段, 转成标准的内存中的response 类型
// http在tcp数据流里面,head和body以 /r/n/r/n分开, 各个头
// 字段 以/r/n分开
}
}
if resp != nil {
resp.TLS = pc.tlsState
}
...忽略
//上面处理一些http协议的一些逻辑行为,
rc.ch <- responseAndError{resp, err} //把读到的response返回给
//主goroutine
.. 忽略
//忽略部分, 处理cancel req中断, 发送idleConnCh归还pc(持久连接)到持久连接池中(map)
pc.close()
}
无关代码忽略,这段代码主要干了一件事情
读循环goroutine 通过channel requestAndChan 接受主goroutine发送的request(rc := <-pc.reqch), 并从tcp输出流中读取response, 然后反序列化到结构体中, 最后通过channel 返给主goroutine (rc.ch <- responseAndError{resp, err} )
func (pc *persistConn) writeLoop() {
for {
select {
case wr := <-pc.writech: //接受主goroutine的 request
if pc.isBroken() {
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) //写入tcp输入流
if err == nil {
err = pc.bw.Flush()
}
if err != nil {
pc.markBroken()
wr.req.Request.closeBody()
}
pc.writeErrCh <- err
wr.ch <- err // 出错的时候返给主goroutineto
case <-pc.closech:
return
}
}
}
写循环就更简单了,select channel中主gouroutine的request,然后写入tcp输入流,如果出错了,channel 通知调用者。
整体看下来,过程都很简单,但是代码中有很多值得我们学习的地方,比如高并发请求如何复用tcp连接,这里是连接池的做法,如果使用多个 goroutine相互协作完成一个http请求,出现错误的时候如何通知调用者中断错误,代码风格也有很多可以借鉴的地方。
总结
http.Client 表示一个http client端,用来处理HTTP相关的工作,例如cookies, redirect, timeout等工作,其内部包含一个Transport,tranport用来建立一个连接,其中维护了一个空闲连接池idleConn map[connectMethodKey][]*persistConn,其中的每个成员都是一个persistConn对象,persistConn是个具体的连接实例,包含了连接的上下文,会启动两个groutine分别执行readLoop和writeLoop, 每当transport调用roundTrip的时候,就会从连接池中选择一个空闲的persistConn,然后调用其roundTrip方法,将读写请求通过channel分别发送到readLoop和writeLoop中,然后会进行select各个channel的信息,包括连接关闭,请求超时,writeLoop出错, readLoop返回读取结果等。在writeLoop中发送请求,在readLoop中获取response并通过channe返回给roundTrip函数中,并再次将自己加入到idleConn中,等待下次请求到来。
自定义client
在上面我们说到调用结构体的成员函数都是默认的结构体的成员函数,但是如果我们有一些特殊的需求,我们就需要重新定义这些结构体,然后实现自己的逻辑,整个http请求也就会按着我们的逻辑进行处理,这也是我们实现一些功能的必要手段。最基本的就是自定义client,也是我们编程常用的,深入一些就需要了解一些传输transport等。
1、要管理HTTP客户端的头域、重定向策略和其他设置,创建一个Client:
client := &http.Client{
CheckRedirect: redirectPolicyFunc,
}
resp, err := client.Get("http://example.com")
// ...
req, err := http.NewRequest("GET", "http://example.com", nil)
// ...
req.Header.Add("If-None-Match", `W/"wyzzy"`)
resp, err := client.Do(req)
// ...
这个就是在上面的基础上增加了对client结构体的设置,而不是使用DefaultClient,我们来看一下client的结构体
type Client struct {
// Transport指定执行独立、单次HTTP请求的机制。
// 如果Transport为nil,则使用DefaultTransport。
Transport RoundTripper
// CheckRedirect指定处理重定向的策略。
// 如果CheckRedirect不为nil,客户端会在执行重定向之前调用本函数字段。
// 参数req和via是将要执行的请求和已经执行的请求(切片,越新的请求越靠后)。
// 如果CheckRedirect返回一个错误,本类型的Get方法不会发送请求req,
// 而是返回之前得到的最后一个回复和该错误。(包装进url.Error类型里)
//
// 如果CheckRedirect为nil,会采用默认策略:连续10此请求后停止。
CheckRedirect func(req *Request, via []*Request) error
// Jar指定cookie管理器。
// 如果Jar为nil,请求中不会发送cookie,回复中的cookie会被忽略。
Jar CookieJar
// Timeout指定本类型的值执行请求的时间限制。
// 该超时限制包括连接时间、重定向和读取回复主体的时间。
// 计时器会在Head、Get、Post或Do方法返回后继续运作并在超时后中断回复主体的读取。
//
// Timeout为零值表示不设置超时。
//
// Client实例的Transport字段必须支持CancelRequest方法,
// 否则Client会在试图用Head、Get、Post或Do方法执行请求时返回错误。
// 本类型的Transport字段默认值(DefaultTransport)支持CancelRequest方法。
Timeout time.Duration
}
主要是对这些结构体中的成员的如何运用才是重点,然后就调用client的Get,Do等方法就是上面的执行逻辑,这边只是简单的client的处理,后面的逻辑依然使用的是默认的Transport。
2、要管理代理、TLS配置、keep-alive、压缩和其他设置,创建一个Transport:
tr := &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: pool},
DisableCompression: true,
}
client := &http.Client{Transport: tr}
resp, err := client.Get("https://example.com")
Client和Transport类型都可以安全的被多个go程同时使用。出于效率考虑,应该一次建立、尽量重用。
这边在client的基础上对client的transport的管理代理、TLS配置、keep-alive、压缩和其他设置,然后后面的逻辑中主要是切换到自定义的transport的逻辑运行。
http服务端
http status
const (
StatusContinue = 100
StatusSwitchingProtocols = 101
StatusOK = 200
StatusCreated = 201
StatusAccepted = 202
StatusNonAuthoritativeInfo = 203
StatusNoContent = 204
StatusResetContent = 205
StatusPartialContent = 206
StatusMultipleChoices = 300
StatusMovedPermanently = 301
StatusFound = 302
StatusSeeOther = 303
StatusNotModified = 304
StatusUseProxy = 305
StatusTemporaryRedirect = 307
StatusBadRequest = 400
StatusUnauthorized = 401
StatusPaymentRequired = 402
StatusForbidden = 403
StatusNotFound = 404
StatusMethodNotAllowed = 405
StatusNotAcceptable = 406
StatusProxyAuthRequired = 407
StatusRequestTimeout = 408
StatusConflict = 409
StatusGone = 410
StatusLengthRequired = 411
StatusPreconditionFailed = 412
StatusRequestEntityTooLarge = 413
StatusRequestURITooLong = 414
StatusUnsupportedMediaType = 415
StatusRequestedRangeNotSatisfiable = 416
StatusExpectationFailed = 417
StatusTeapot = 418
StatusInternalServerError = 500
StatusNotImplemented = 501
StatusBadGateway = 502
StatusServiceUnavailable = 503
StatusGatewayTimeout = 504
StatusHTTPVersionNotSupported = 505
)
我们比较常用的就是404(服务未发现),503(服务不可用)等。
http header
Header代表HTTP头域的键值对。
type Header map[string][]string
基本操作
func (h Header) Get(key string) string
Get返回键对应的第一个值,如果键不存在会返回”“。如要获取该键对应的值切片,请直接用规范格式的键访问map。
func (h Header) Set(key, value string)
Set添加键值对到h,如键已存在则会用只有新值一个元素的切片取代旧值切片。
func (h Header) Add(key, value string)
Add添加键值对到h,如键已存在则会将新的值附加到旧值切片后面。
func (h Header) Del(key string)
Del删除键值对。
func (h Header) Write(w io.Writer) error
Write以有线格式将头域写入w。
用于http客户端和服务端的结构体
type Request
type Request struct {
// Method指定HTTP方法(GET、POST、PUT等)。对客户端,""代表GET。
Method string
// URL在服务端表示被请求的URI,在客户端表示要访问的URL。
//
// 在服务端,URL字段是解析请求行的URI(保存在RequestURI字段)得到的,
// 对大多数请求来说,除了Path和RawQuery之外的字段都是空字符串。
// (参见RFC 2616, Section 5.1.2)
//
// 在客户端,URL的Host字段指定了要连接的服务器,
// 而Request的Host字段(可选地)指定要发送的HTTP请求的Host头的值。
URL *url.URL
// 接收到的请求的协议版本。本包生产的Request总是使用HTTP/1.1
Proto string // "HTTP/1.0"
ProtoMajor int // 1
ProtoMinor int // 0
// Header字段用来表示HTTP请求的头域。如果头域(多行键值对格式)为:
// accept-encoding: gzip, deflate
// Accept-Language: en-us
// Connection: keep-alive
// 则:
// Header = map[string][]string{
// "Accept-Encoding": {"gzip, deflate"},
// "Accept-Language": {"en-us"},
// "Connection": {"keep-alive"},
// }
// HTTP规定头域的键名(头名)是大小写敏感的,请求的解析器通过规范化头域的键名来实现这点。
// 在客户端的请求,可能会被自动添加或重写Header中的特定的头,参见Request.Write方法。
Header Header
// Body是请求的主体。
//
// 在客户端,如果Body是nil表示该请求没有主体买入GET请求。
// Client的Transport字段会负责调用Body的Close方法。
//
// 在服务端,Body字段总是非nil的;但在没有主体时,读取Body会立刻返回EOF。
// Server会关闭请求的主体,ServeHTTP处理器不需要关闭Body字段。
Body io.ReadCloser
// ContentLength记录相关内容的长度。
// 如果为-1,表示长度未知,如果>=0,表示可以从Body字段读取ContentLength字节数据。
// 在客户端,如果Body非nil而该字段为0,表示不知道Body的长度。
ContentLength int64
// TransferEncoding按从最外到最里的顺序列出传输编码,空切片表示"identity"编码。
// 本字段一般会被忽略。当发送或接受请求时,会自动添加或移除"chunked"传输编码。
TransferEncoding []string
// Close在服务端指定是否在回复请求后关闭连接,在客户端指定是否在发送请求后关闭连接。
Close bool
// 在服务端,Host指定URL会在其上寻找资源的主机。
// 根据RFC 2616,该值可以是Host头的值,或者URL自身提供的主机名。
// Host的格式可以是"host:port"。
//
// 在客户端,请求的Host字段(可选地)用来重写请求的Host头。
// 如过该字段为"",Request.Write方法会使用URL字段的Host。
Host string
// Form是解析好的表单数据,包括URL字段的query参数和POST或PUT的表单数据。
// 本字段只有在调用ParseForm后才有效。在客户端,会忽略请求中的本字段而使用Body替代。
Form url.Values
// PostForm是解析好的POST或PUT的表单数据。
// 本字段只有在调用ParseForm后才有效。在客户端,会忽略请求中的本字段而使用Body替代。
PostForm url.Values
// MultipartForm是解析好的多部件表单,包括上传的文件。
// 本字段只有在调用ParseMultipartForm后才有效。
// 在客户端,会忽略请求中的本字段而使用Body替代。
MultipartForm *multipart.Form
// Trailer指定了会在请求主体之后发送的额外的头域。
//
// 在服务端,Trailer字段必须初始化为只有trailer键,所有键都对应nil值。
// (客户端会声明哪些trailer会发送)
// 在处理器从Body读取时,不能使用本字段。
// 在从Body的读取返回EOF后,Trailer字段会被更新完毕并包含非nil的值。
// (如果客户端发送了这些键值对),此时才可以访问本字段。
//
// 在客户端,Trail必须初始化为一个包含将要发送的键值对的映射。(值可以是nil或其终值)
// ContentLength字段必须是0或-1,以启用"chunked"传输编码发送请求。
// 在开始发送请求后,Trailer可以在读取请求主体期间被修改,
// 一旦请求主体返回EOF,调用者就不可再修改Trailer。
//
// 很少有HTTP客户端、服务端或代理支持HTTP trailer。
Trailer Header
// RemoteAddr允许HTTP服务器和其他软件记录该请求的来源地址,一般用于日志。
// 本字段不是ReadRequest函数填写的,也没有定义格式。
// 本包的HTTP服务器会在调用处理器之前设置RemoteAddr为"IP:port"格式的地址。
// 客户端会忽略请求中的RemoteAddr字段。
RemoteAddr string
// RequestURI是被客户端发送到服务端的请求的请求行中未修改的请求URI
// (参见RFC 2616, Section 5.1)
// 一般应使用URI字段,在客户端设置请求的本字段会导致错误。
RequestURI string
// TLS字段允许HTTP服务器和其他软件记录接收到该请求的TLS连接的信息
// 本字段不是ReadRequest函数填写的。
// 对启用了TLS的连接,本包的HTTP服务器会在调用处理器之前设置TLS字段,否则将设TLS为nil。
// 客户端会忽略请求中的TLS字段。
TLS *tls.ConnectionState
}
Request类型代表一个服务端接受到的或者客户端发送出去的HTTP请求。
Request各字段的意义和用途在服务端和客户端是不同的。除了字段本身上方文档,还可参见Request.Write方法和RoundTripper接口的文档。
type Response struct {
Status string // 例如"200 OK"
StatusCode int // 例如200
Proto string // 例如"HTTP/1.0"
ProtoMajor int // 例如1
ProtoMinor int // 例如0
// Header保管头域的键值对。
// 如果回复中有多个头的键相同,Header中保存为该键对应用逗号分隔串联起来的这些头的值
// (参见RFC 2616 Section 4.2)
// 被本结构体中的其他字段复制保管的头(如ContentLength)会从Header中删掉。
//
// Header中的键都是规范化的,参见CanonicalHeaderKey函数
Header Header
// Body代表回复的主体。
// Client类型和Transport类型会保证Body字段总是非nil的,即使回复没有主体或主体长度为0。
// 关闭主体是调用者的责任。
// 如果服务端采用"chunked"传输编码发送的回复,Body字段会自动进行解码。
Body io.ReadCloser
// ContentLength记录相关内容的长度。
// 其值为-1表示长度未知(采用chunked传输编码)
// 除非对应的Request.Method是"HEAD",其值>=0表示可以从Body读取的字节数
ContentLength int64
// TransferEncoding按从最外到最里的顺序列出传输编码,空切片表示"identity"编码。
TransferEncoding []string
// Close记录头域是否指定应在读取完主体后关闭连接。(即Connection头)
// 该值是给客户端的建议,Response.Write方法的ReadResponse函数都不会关闭连接。
Close bool
// Trailer字段保存和头域相同格式的trailer键值对,和Header字段相同类型
Trailer Header
// Request是用来获取此回复的请求
// Request的Body字段是nil(因为已经被用掉了)
// 这个字段是被Client类型发出请求并获得回复后填充的
Request *Request
// TLS包含接收到该回复的TLS连接的信息。 对未加密的回复,本字段为nil。
// 返回的指针是被(同一TLS连接接收到的)回复共享的,不应被修改。
TLS *tls.ConnectionState
}
Response代表一个HTTP请求的回复
type ResponseWriter
type ResponseWriter interface {
// Header返回一个Header类型值,该值会被WriteHeader方法发送。
// 在调用WriteHeader或Write方法后再改变该对象是没有意义的。
Header() Header
// WriteHeader该方法发送HTTP回复的头域和状态码。
// 如果没有被显式调用,第一次调用Write时会触发隐式调用WriteHeader(http.StatusOK)
// WriterHeader的显式调用主要用于发送错误码。
WriteHeader(int)
// Write向连接中写入作为HTTP的一部分回复的数据。
// 如果被调用时还未调用WriteHeader,本方法会先调用WriteHeader(http.StatusOK)
// 如果Header中没有"Content-Type"键,
// 本方法会使用包函数DetectContentType检查数据的前512字节,将返回值作为该键的值。
Write([]byte) (int, error)
}
ResponseWriter接口被HTTP处理器用于构造HTTP回复。这个一般用于服务端处理请求
实例
正常我们使用的返回方式
package main
import (
"net/http"
)
func main() {
http.HandleFunc("/", func (w http.ResponseWriter, r *http.Request){
w.Header().Set("name", "my name is smallsoup")
w.WriteHeader(500)
w.Write([]byte("hello world\n"))
})
http.ListenAndServe(":8080", nil)
}
type CloseNotifier
type CloseNotifier interface {
// CloseNotify返回一个通道,该通道会在客户端连接丢失时接收到唯一的值
CloseNotify() <-chan bool
}
HTTP处理器ResponseWriter接口参数的下层如果实现了CloseNotifier接口,可以让用户检测下层的连接是否停止。如果客户端在回复准备好之前关闭了连接,该机制可以用于取消服务端耗时较长的操作。
http 服务端使用和原理解析
ListenAndServe使用指定的监听地址和处理器启动一个HTTP服务端。处理器参数通常是nil,这表示采用包变量DefaultServeMux作为处理器。Handle和HandleFunc函数可以向DefaultServeMux添加处理器。如下
http.Handle("/foo", fooHandler)
http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %q", html.EscapeString(r.URL.Path))
})
log.Fatal(http.ListenAndServe(":8080", nil))
ListenAndServe该方法用于在指定的TCP网络地址addr进行监听,然后调用服务端处理程序来处理传入的连接请求。该方法有两个参数:第一个参数addr 即监听地址;第二个参数表示服务端处理程序,通常为空,这意味着服务端调用 http.DefaultServeMux 进行处理,而服务端编写的业务逻辑处理程序 http.Handle() 或 http.HandleFunc() 默认注入 http.DefaultServeMux 中。
理解HTTP相关的网络应用,主要关注两个地方-客户端(client)和服务端(server),两者的交互主要是client的request以及server的response,主要就在于如何接受client的request并向client返回response。
接收request的过程中,最重要的莫过于路由(router),即实现一个Multiplexer器。Go http中既可以使用内置的mutilplexer — DefautServeMux,也可以自定义。Multiplexer路由的目的就是为了找到处理器函数(handler),后者将对request进行处理,同时构建response
流程为:
Clinet -> Requests -> Multiplexer(router) -> handler -> Response -> Clinet
对于一个http服务,大致需要理解这两个封装的过程就可以理解上面的实现了
1.首先需要注册路由,即提供url模式和handler函数的映射.
2.其次就是实例化一个server对象,并开启对客户端的监听。
再看go http服务的代码
http.HandleFunc("/", indexHandler) -----即是注册路由。
http.ListenAndServe("127.0.0.1:8000", nil)---启动server
或者
server := &Server{Addr: addr, Handler: handler}
server.ListenAndServe()
注册路由
net/http包暴露的注册路由的api很简单
http.HandleFunc("/", indexHandler) -----即是注册路由。
HandlerFunc是一个函数类型,如下定义,同时实现了Handler接口的ServeHTTP方法。使用HandlerFunc类型包装一下路由定义的indexHandler函数,其目的就是为了让这个函数也实现ServeHTTP方法,即转变成一个handler处理器(函数)。
type HandlerFunc func(ResponseWriter, *Request)
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
我们最开始写的例子中
http.HandleFunc("/",Indexhandler)
这样 IndexHandler 函数也有了ServeHTTP方法。
ServeMux和handler处理器函数的连接桥梁就是Handler接口。ServeMux的ServeHTTP方法实现了寻找注册路由的handler的函数(可以看下面的监控服务流程),并调用该handler的ServeHTTP方法。ServeHTTP方法就是真正处理请求和构造响应的地方。
Go其实支持外部实现的路由器 ListenAndServe的第二个参数就是 用以配置外部路由器的,它是一个Handler接口,即外部路由器只要实现了Handler接口就可以,我们可以在自己实现 的路由器的ServHTTP里面实现自定义路由功能。
如下代码所示,我们自己实现了一个简易的路由器
package main
import (
"fmt"
"net/http"
)
type MyMux struct { }
func (p *MyMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/" {
sayhelloName(w, r)
return
}
http.NotFound(w, r)
return
}
func sayhelloName(w http.ResponseWriter, r *http.Request) { f
mt.Fprintf(w, "Hello myroute!")
}
func main() {
mux := &MyMux{}
http.ListenAndServe(":9090", mux)
}
multiplexer
http.HandleFunc选取了DefaultServeMux作为multiplexer:
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
DefaultServeMux是ServeMux的一个实例。当然http包也提供了NewServeMux方法创建一个ServeMux实例,默认则创建一个DefaultServeMux:
// NewServeMux allocates and returns a new ServeMux.
func NewServeMux() *ServeMux { return new(ServeMux) }
DefaultServeMux的代码定义
// DefaultServeMux is the default ServeMux used by Serve.
var DefaultServeMux = &defaultServeMux
var defaultServeMux ServeMux
当然也可以是其他可以实现的实例 ,比如上面实现的mux。
路由结构体ServeMux
ServeMux的源码:
type ServeMux struct {
mu sync.RWMutex //锁,由于请求涉及到并发处理,因此这里需要一个锁机制
m map[string]muxEntry // 路由规则,一个string对应一个mux实体,这里的string就是注册的路由
hosts bool
}
type muxEntry struct {
explicit bool // 是否精确匹配
h Handler // 这个路由表达式对应哪个handler
pattern string
}
ServeMux结构中最重要的字段为m,这是一个map,key是一些url模式,value是一个muxEntry结构,后者里定义存储了具体的url模式和handler。
当然,所谓的ServeMux也实现了ServeHTTP接口,也算是一个handler,不过ServeMux的ServeHTTP方法不是用来处理request和respone,而是用来找到路由注册的handler,可以看服务监听时候的调用过程。
// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
if handler == nil {
panic("http: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}
ServeMux的Handle方法,将会对pattern和handler函数做一个map映射:
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()
if pattern == "" {
panic("http: invalid pattern " + pattern)
}
if handler == nil {
panic("http: nil handler")
}
if mux.m[pattern].explicit {
panic("http: multiple registrations for " + pattern)
}
if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
mux.m[pattern] = muxEntry{explicit: true, h: handler, pattern: pattern}
if pattern[0] != '/' {
mux.hosts = true
}
// Helpful behavior:
// If pattern is /tree/, insert an implicit permanent redirect for /tree.
// It can be overridden by an explicit registration.
n := len(pattern)
if n > 0 && pattern[n-1] == '/' && !mux.m[pattern[0:n-1]].explicit {
// If pattern contains a host name, strip it and use remaining
// path for redirect.
path := pattern
if pattern[0] != '/' {
// In pattern, at least the last character is a '/', so
// strings.Index can't be -1.
path = pattern[strings.Index(pattern, "/"):]
}
url := &url.URL{Path: path}
mux.m[pattern[0:n-1]] = muxEntry{h: RedirectHandler(url.String(), StatusMovedPermanently), pattern: pattern}
}
}
Handle函数的主要目的在于把handler和pattern模式绑定到map[string]muxEntry的map上,其中muxEntry保存了更多pattern和handler的信息,还记得前面讨论的Server结构吗?Server的m字段就是map[string]muxEntry这样一个map。
此时,pattern和handler的路由注册完成。接下来就是如何开始server的监听,以接收客户端的请求。
启动服务
http.ListenAndServe("127.0.0.1:8000", nil)---启动server
或者
server := &Server{Addr: addr, Handler: handler}
server.ListenAndServe()
注册好路由之后,启动web服务还需要开启服务器监听。http的ListenAndServer方法中可以看到创建了一个Server对象,并调用了Server对象的同名方法:
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
// ListenAndServe listens on the TCP network address srv.Addr and then
// calls Serve to handle requests on incoming connections.
// Accepted connections are configured to enable TCP keep-alives.
// If srv.Addr is blank, ":http" is used.
// ListenAndServe always returns a non-nil error.
func (srv *Server) ListenAndServe() error {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}
Server的ListenAndServe方法中,会初始化监听地址Addr,同时调用Listen方法设置监听。最后将监听的TCP对象传入Serve方法:
// Serve accepts incoming connections on the Listener l, creating a
// new service goroutine for each. The service goroutines read requests and
// then call srv.Handler to reply to them.
//
// For HTTP/2 support, srv.TLSConfig should be initialized to the
// provided listener's TLS Config before calling Serve. If
// srv.TLSConfig is non-nil and doesn't include the string "h2" in
// Config.NextProtos, HTTP/2 support is not enabled.
//
// Serve always returns a non-nil error. After Shutdown or Close, the
// returned error is ErrServerClosed.
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
if fn := testHookServerServe; fn != nil {
fn(srv, l)
}
var tempDelay time.Duration // how long to sleep on accept failure
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
srv.trackListener(l, true)
defer srv.trackListener(l, false)
baseCtx := context.Background() // base is always background, per Issue 16220
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx)
}
}
监听开启之后,一旦客户端请求到达,创建一个conn结构体,这个conn中保留了这次请求的信息,go就开启一个协程serve处理请求,主要逻辑都在serve方法之中。
serve方法比较长,其主要职能就是,创建一个上下文对象,然后调用Listener的Accept方法用来 获取连接数据并使用newConn方法创建连接对象。最后使用goroutein协程的方式处理连接请求。因为每一个连接都开起了一个协程,请求的上下文都不同,同时又保证了go的高并发。serve也是一个长长的方法:
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if !c.hijacked() {
c.close()
c.setState(c.rwc, StateClosed)
}
}()
if tlsConn, ok := c.rwc.(*tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {
c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
return
}
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initNPNRequest{tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}
}
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive)
}
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
if err == errTooLarge {
// Their HTTP client may or may not be
// able to read this if we're
// responding to them and hanging up
// while they're still writing their
// request. Undefined behavior.
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
}
if isCommonNetReadError(err) {
return // don't reply
}
publicErr := "400 Bad Request"
if v, ok := err.(badRequestError); ok {
publicErr = publicErr + ": " + string(v)
}
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
// Expect 100 Continue support
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// Wrap the Body reader with one that replies on the connection
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
if w.conn.bufr.Buffered() > 0 {
w.conn.r.closeNotifyFromPipelinedRequest()
}
w.conn.r.startBackgroundRead()
}
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))
if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.rwc.SetReadDeadline(time.Time{})
}
}
使用defer定义了函数退出时,连接关闭相关的处理。然后就是读取连接的网络数据,并处理读取完毕时候的状态。接下来就是调用serverHandler{c.server}.ServeHTTP(w, w.req)方法处理请求了。最后就是请求处理完毕的逻辑。serverHandler是一个重要的结构,它近有一个字段,即Server结构,同时它也实现了Handler接口方法ServeHTTP,并在该接口方法中做了一个重要的事情,初始化multiplexer路由多路复用器。如果server对象没有指定Handler,则使用默认的DefaultServeMux作为路由Multiplexer。并调用初始化Handler的ServeHTTP方法。
// serverHandler delegates to either the server's Handler or
// DefaultServeMux and also handles "OPTIONS *" requests.
type serverHandler struct {
srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}
这里DefaultServeMux的ServeHTTP方法其实也是定义在ServeMux结构中的,相关代码如下:
// Find a handler on a handler map given a path string.
// Most-specific (longest) pattern wins.
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
// Check for exact match first.
v, ok := mux.m[path]
if ok {
return v.h, v.pattern
}
// Check for longest valid match.
var n = 0
for k, v := range mux.m {
if !pathMatch(k, path) {
continue
}
if h == nil || len(k) > n {
n = len(k)
h = v.h
pattern = v.pattern
}
}
return
}
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
// CONNECT requests are not canonicalized.
if r.Method == "CONNECT" {
return mux.handler(r.Host, r.URL.Path)
}
// All other requests have any port stripped and path cleaned
// before passing to mux.handler.
host := stripHostPort(r.Host)
path := cleanPath(r.URL.Path)
if path != r.URL.Path {
_, pattern = mux.handler(host, path)
url := *r.URL
url.Path = path
return RedirectHandler(url.String(), StatusMovedPermanently), pattern
}
return mux.handler(host, r.URL.Path)
}
// handler is the main implementation of Handler.
// The path is known to be in canonical form, except for CONNECT methods.
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path)
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}
mux的ServeHTTP方法通过调用其Handler方法寻找注册到路由上的handler函数,并调用该函数的ServeHTTP方法,本例则是IndexHandler函数。
mux的Handler方法对URL简单的处理,然后调用handler方法,后者会创建一个锁,同时调用match方法返回一个handler和pattern。
在match方法中,mux的m字段是map[string]muxEntry图,后者存储了pattern和handler处理器函数,因此通过迭代m寻找出注册路由的patten模式与实际url匹配的handler函数并返回。
返回的结构一直传递到mux的ServeHTTP方法,接下来调用handler函数的ServeHTTP方法,即IndexHandler函数,然后把response写到http.RequestWirter对象返回给客户端。
上述函数运行结束即serverHandler{c.server}.ServeHTTP(w, w.req)运行结束。接下来就是对请求处理完毕之后上希望和连接断开的相关逻辑。
至此,Golang中一个完整的http服务介绍完毕,包括注册路由,开启监听,处理连接,路由处理函数。 多数的web应用基于HTTP协议,客户端和服务器通过request-response的方式交互。一个server并不可少的两部分莫过于路由注册和连接处理。Golang通过一个ServeMux实现了的multiplexer路由多路复用器来管理路由。同时提供一个Handler接口提供ServeHTTP用来实现handler处理其函数,后者可以处理实际request并构造response。
总结
理解go中的http服务,最重要就是要理解Multiplexer和handler,Golang中的Multiplexer基于ServeMux结构,同时也实现了Handler接口。下面对几个重要概念说明,两个重要的结构体和一个接口
Handler类型
Golang没有继承,类多态的方式可以通过接口实现。所谓接口则是定义声明了函数签名,任何结构只要实现了与接口函数签名相同的方法,就等同于实现了接口。go的http服务都是基于handler进行处理。
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
任何结构体,只要实现了ServeHTTP方法,这个结构就可以称之为handler对象。ServeMux会使用handler并调用其ServeHTTP方法处理请求并返回响应。
handler处理器(函数)
handler处理器(函数)-就是HandleFunc的第二个参数,是一个函数: 具有func(w http.ResponseWriter, r *http.Requests)签名的函数,经过HandlerFunc结构包装的handler函数,它实现了ServeHTTP接口方法的函数。调用handler处理器的ServeHTTP方法时,即调用handler函数本身。
handler对象:实现了Handler接口ServeHTTP方法的结构。
ServeMux和handler处理器函数的连接桥梁就是Handler接口。ServeMux的ServeHTTP方法实现了寻找注册路由的handler的函数,并调用该handler的ServeHTTP方法。ServeHTTP方法就是真正处理请求和构造响应的地方。
Server结构体
从http.ListenAndServe的源码可以看出,它还是创建了一个server对象,并调用server对象的ListenAndServe方法来实现监听路由:
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
查看server的结构如下,其实上面已经解释过:
type Server struct {
Addr string
Handler Handler
ReadTimeout time.Duration
WriteTimeout time.Duration
TLSConfig *tls.Config
MaxHeaderBytes int
TLSNextProto map[string]func(*Server, *tls.Conn, Handler)
ConnState func(net.Conn, ConnState)
ErrorLog *log.Logger
disableKeepAlives int32 nextProtoOnce sync.Once
nextProtoErr error
}
server结构存储了服务器处理请求常见的字段。其中Handler字段也保留Handler接口。如果Server接口没有提供Handler结构对象,那么会使用DefautServeMux做multiplexer,后面再做分析。
路由结构体ServeMux
ServeMux的源码:
type ServeMux struct {
mu sync.RWMutex //锁,由于请求涉及到并发处理,因此这里需要一个锁机制
m map[string]muxEntry // 路由规则,一个string对应一个mux实体,这里的string就是注册的路由
hosts bool
}
type muxEntry struct {
explicit bool // 是否精确匹配
h Handler // 这个路由表达式对应哪个handler
pattern string
}
ServeMux结构中最重要的字段为m,这是一个map,key是一些url模式,value是一个muxEntry结构,后者里定义存储了具体的url模式和handler。
当然,所谓的ServeMux也实现了ServeHTTP接口,也算是一个handler,不过ServeMux的ServeHTTP方法不是用来处理request和respone,而是用来找到路由注册的handler
Go代码的执行流程
过对http包的分析之后,现在让我们来梳理一下整个的代码执行过程。
1、首先调用Http.HandleFunc
按顺序做了几件事:
1 调用了DefaultServerMux的HandleFunc
2 调用了DefaultServerMux的Handle
3 往DefaultServeMux的map[string]muxEntry中增加对应的handler和路由规则
2、其次调用http.ListenAndServe(”:9090”, nil)
按顺序做了几件事情:
1 实例化Server
2 调用Server的ListenAndServe()
3 调用net.Listen("tcp", addr)监听端口
4 启动一个for循环,在循环体中Accept请求
5 对每个请求实例化一个Conn,并且开启一个goroutine为这个请求进行服务go c.serve()
6 读取每个请求的内容w, err := c.readRequest()
7 判断handler是否为空,如果没有设置handler(这个例子就没有设置handler),handler就设置为 DefaultServeMux
8 调用handler的ServeHttp
9 在这个例子中,下面就进入到DefaultServerMux.ServeHttp
10 根据request选择handler,并且进入到这个handler的ServeHTTP mux.handler(r).ServeHTTP(w, r)
11 选择handler:
A 判断是否有路由能满足这个request(循环遍历ServerMux的muxEntry)
B 如果有路由满足,调用这个路由handler的ServeHttp
C 如果没有路由满足,调用NotFoundHandler的ServeHttp
自定义server
要管理服务端的行为,可以创建一个自定义的Server:
s := &http.Server{
Addr: ":8080",
Handler: myHandler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.ListenAndServe())
也是上面的流程,就是新增了一个server结构体的,做对应的操作,来看一下server
type Server struct {
Addr string // TCP address to listen on, ":http" if empty
Handler Handler // handler to invoke, http.DefaultServeMux if nil
ReadTimeout time.Duration // maximum duration before timing out read of the request
WriteTimeout time.Duration // maximum duration before timing out write of the response
MaxHeaderBytes int // maximum size of request headers, DefaultMaxHeaderBytes if 0
TLSConfig *tls.Config // optional TLS config, used by ListenAndServeTLS
// TLSNextProto optionally specifies a function to take over
// ownership of the provided TLS connection when an NPN
// protocol upgrade has occurred. The map key is the protocol
// name negotiated. The Handler argument should be used to
// handle HTTP requests and will initialize the Request's TLS
// and RemoteAddr if not already set. The connection is
// automatically closed when the function returns.
TLSNextProto map[string]func(*Server, *tls.Conn, Handler)
// ConnState specifies an optional callback function that is
// called when a client connection changes state. See the
// ConnState type and associated constants for details.
ConnState func(net.Conn, ConnState)
// ErrorLog specifies an optional logger for errors accepting
// connections and unexpected behavior from handlers.
// If nil, logging goes to os.Stderr via the log package's
// standard logger.
ErrorLog *log.Logger
// contains filtered or unexported fields
}
都是什么作用
type Server struct {
Addr string // 监听的TCP地址,如果为空字符串会使用":http"
Handler Handler // 调用的处理器,如为nil会调用http.DefaultServeMux
ReadTimeout time.Duration // 请求的读取操作在超时前的最大持续时间
WriteTimeout time.Duration // 回复的写入操作在超时前的最大持续时间
MaxHeaderBytes int // 请求的头域最大长度,如为0则用DefaultMaxHeaderBytes
TLSConfig *tls.Config // 可选的TLS配置,用于ListenAndServeTLS方法
// TLSNextProto(可选地)指定一个函数来在一个NPN型协议升级出现时接管TLS连接的所有权。
// 映射的键为商谈的协议名;映射的值为函数,该函数的Handler参数应处理HTTP请求,
// 并且初始化Handler.ServeHTTP的*Request参数的TLS和RemoteAddr字段(如果未设置)。
// 连接在函数返回时会自动关闭。
TLSNextProto map[string]func(*Server, *tls.Conn, Handler)
// ConnState字段指定一个可选的回调函数,该函数会在一个与客户端的连接改变状态时被调用。
// 参见ConnState类型和相关常数获取细节。
ConnState func(net.Conn, ConnState)
// ErrorLog指定一个可选的日志记录器,用于记录接收连接时的错误和处理器不正常的行为。
// 如果本字段为nil,日志会通过log包的标准日志记录器写入os.Stderr。
ErrorLog *log.Logger
// 内含隐藏或非导出字段
}
主要函数
type Server
func (s *Server) SetKeepAlivesEnabled(v bool)
func (srv *Server) Serve(l net.Listener) error
func (srv *Server) ListenAndServe() error
func (srv *Server) ListenAndServeTLS(certFile, keyFile string) error
func (*Server) SetKeepAlivesEnabled
func (s *Server) SetKeepAlivesEnabled(v bool)
SetKeepAlivesEnabled控制是否允许HTTP闲置连接重用(keep-alive)功能。默认该功能总是被启用的。只有资源非常紧张的环境或者服务端在关闭进程中时,才应该关闭该功能。
func (*Server) Serve
func (srv *Server) Serve(l net.Listener) error
Serve会接手监听器l收到的每一个连接,并为每一个连接创建一个新的服务go程。该go程会读取请求,然后调用srv.Handler回复请求。
func (*Server) ListenAndServe
func (srv *Server) ListenAndServe() error
ListenAndServe监听srv.Addr指定的TCP地址,并且会调用Serve方法接收到的连接。如果srv.Addr为空字符串,会使用”:http”。
func (*Server) ListenAndServeTLS
func (srv *Server) ListenAndServeTLS(certFile, keyFile string) error
ListenAndServeTLS监听srv.Addr确定的TCP地址,并且会调用Serve方法处理接收到的连接。必须提供证书文件和对应的私钥文件。如果证书是由权威机构签发的,certFile参数必须是顺序串联的服务端证书和CA证书。如果srv.Addr为空字符串,会使用”:https”。