io包提供了所有需要交互的输入输出模式的基础。

基本概念

stream

我们先介绍一下stream的概念。stream就是数据流,数据流的概念其实非常基础,最早是在通讯领域使用的概念,这个概念最初在 1998 年由 Henzinger 在文献中提出,他将数据流定义为 “只能以事先规定好的顺序被读取一次的数据的一个序列”

数据流就是由数据形成的流,就像由水形成的水流,非常形象,现代语言中,基本上都会有流的支持,比如 C++ 的 iostream,Node.js 的 stream 模块,以及 golang 的 io 包。

Stream in Golang与流密切相关的就是 bufio io io/ioutil 这几个包:

1、io 为 IO 原语(I/O primitives)提供基本的接口
2、io/ioutil 封装一些实用的 I/O 函数
3、fmt 实现格式化 I/O,类似 C 语言中的 printf 和 scanf
4、bufio 实现带缓冲I/O

io

io 包为 I/O 原语提供了基本的接口。在 io 包中最重要的是两个接口:Reader 和 Writer 接口。本章所提到的各种 IO 包,都跟这两个接口有关,也就是说,只要满足这两个接口,它就可以使用 IO 包的功能。

接口

读取器

type Reader interface {
    //Read() 方法有两个返回值,一个是读取到的字节数,一个是发生错误时的错误。如果资源内容已全部读取完毕,应该返回 io.EOF 错误。
    Read(p []byte) (n int, err error)
}

io.Reader 表示一个读取器,它将数据从某个资源读取到传输缓冲区p。在缓冲区中,数据可以被流式传输和使用。

实现这个接口需要实现如下功能

  • Read 将 len(p) 个字节读取到 p 中。它返回读取的字节数 n(0 <= n <= len(p)) 以及任何遇到的错误。
    • 即使 Read 返回的 n < len(p),它也会在调用过程中占用 len(p) 个字节作为暂存空间。
    • 若可读取的数据不到 len(p) 个字节,Read 会返回可用数据,而不是等待更多数据。
    • 当读取的时候没有数据也没有EOF的时候,会阻塞在这边等待。
  • 当 Read 在成功读取 n > 0 个字节后遇到一个错误或 EOF (end-of-file),它会返回读取的字节数。
    • 它可能会同时在本次的调用中返回一个non-nil错误,或在下一次的调用中返回这个错误(且 n 为 0)。
    • 一般情况下, Reader会返回一个非0字节数n, 若 n = len(p) 个字节从输入源的结尾处由 Read 返回,Read可能返回 err == EOF 或者 err == nil。并且之后的 Read() 都应该返回 (n:0, err:EOF)。
  • 调用者在考虑错误之前应当首先处理返回的数据。这样做可以正确地处理在读取一些字节后产生的 I/O 错误,同时允许EOF的出现。

对于要用作读取器的类型,它必须实现 io.Reader 接口的唯一一个方法 Read(p []byte)。换句话说,只要实现了 Read(p []byte) ,那它就是一个读取器,使用标准库中已经实现的读写器,来举例

func main() {
    reader := strings.NewReader("Clear is better than clever")
    p := make([]byte, 4)

    for {
        n, err := reader.Read(p)
        if err != nil{
            if err == io.EOF {
                fmt.Println("EOF:", n)
                break
            }
            fmt.Println(err)
            os.Exit(1)
        }
        fmt.Println(n, string(p[:n]))
    }
}

输出打印的内容:

4 Clea
4 r is
4  bet
4 ter 
4 than
4  cle
3 ver
EOF: 0 

自定义Reader

现在,让我们看看如何自己实现一个。它的功能是从流中过滤掉非字母字符。

type alphaReader struct {
    // 资源
    src string
    // 当前读取到的位置 
    cur int
}

// 创建一个实例
func newAlphaReader(src string) *alphaReader {
    return &alphaReader{src: src}
}

// 过滤函数
func alpha(r byte) byte {
    if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
        return r
    }
    return 0
}

// Read 方法,read函数是阻塞的
func (a *alphaReader) Read(p []byte) (int, error) {
    // 当前位置 >= 字符串长度 说明已经读取到结尾 返回 EOF
    if a.cur >= len(a.src) {
        return 0, io.EOF
    }

    // x 是剩余未读取的长度
    x := len(a.src) - a.cur
    n, bound := 0, 0
    if x >= len(p) {
        // 剩余长度超过缓冲区大小,说明本次可完全填满缓冲区
        bound = len(p)
    } else if x < len(p) {
        // 剩余长度小于缓冲区大小,使用剩余长度输出,缓冲区不补满
        bound = x
    }

    buf := make([]byte, bound)
    for n < bound {
        // 每次读取一个字节,执行过滤函数
        if char := alpha(a.src[a.cur]); char != 0 {
            buf[n] = char
        }
        n++
        a.cur++
    }
    // 将处理后得到的 buf 内容复制到 p 中
    copy(p, buf)
    return n, nil
}

func main() {
    reader := newAlphaReader("Hello! It's 9am, where is the sun?")
    p := make([]byte, 4)
    for {
        n, err := reader.Read(p)
        if err == io.EOF {
            break
        }
        fmt.Print(string(p[:n]))
    }
    fmt.Println()
}
输出打印的内容:
HelloItsamwhereisthesun

TCP粘包拆包

这边讲解一下TCP粘包拆包问题,先看下面这个实例

服务端代码 server/main.go

func main() {
    l, err := net.Listen("tcp", ":4044")
    if err != nil {
        panic(err)
    }
    fmt.Println("listen to 4044")
    for {
        // 监听到新的连接,创建新的 goroutine 交给 handleConn函数 处理
        conn, err := l.Accept()
        if err != nil {
            fmt.Println("conn err:", err)
        } else {
            go handleConn(conn)
        }
    }
}

func handleConn(conn net.Conn) {
    defer conn.Close()
    defer fmt.Println("关闭")
    fmt.Println("新连接:", conn.RemoteAddr())

    result := bytes.NewBuffer(nil)
    var buf [1024]byte
    for {
        n, err := conn.Read(buf[0:])
        result.Write(buf[0:n])
        if err != nil {
            if err == io.EOF {
                continue
            } else {
                fmt.Println("read err:", err)
                break
            }
        } else {
            fmt.Println("recv:", result.String())
        }
        result.Reset()
    }
}

客户端代码 client/main.go

func main() {
    data := []byte("[这里才是一个完整的数据包]")
    conn, err := net.DialTimeout("tcp", "localhost:4044", time.Second*30)
    if err != nil {
        fmt.Printf("connect failed, err : %v\n", err.Error())
        return
    }
    for i := 0; i <1000; i++ {
        _, err = conn.Write(data)
        if err != nil {
            fmt.Printf("write failed , err : %v\n", err)
            break
        }
    }
}

运行结果

listen to 4044
新连接: [::1]:53079
recv: [这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据�
recv: �][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包][这里才是一个完整的数据包][这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
...省略其它的...

从服务端的控制台输出可以看出,存在三种类型的输出:

  • 一种是正常的一个数据包输出。
  • 一种是多个数据包“粘”在了一起,我们定义这种读到的包为粘包。
  • 一种是一个数据包被“拆”开,形成一个破碎的包,我们定义这种包为半包。

为什么会出现半包和粘包?

  • 客户端一段时间内发送包的速度太多,服务端没有全部处理完。于是数据就会积压起来,产生粘包。
  • 定义的读的buffer不够大,而数据包太大或者由于粘包产生,服务端不能一次全部读完,产生半包。

什么时候需要考虑处理半包和粘包?

  • TCP连接是长连接,即一次连接多次发送数据。
  • 每次发送的数据是结构的,比如 JSON格式的数据 或者 数据包的协议是由我们自己定义的(包头部包含实际数据长度、协议魔数等)。

解决思路

  • 定长分隔(每个数据包最大为该长度,不足时使用特殊字符填充) ,但是数据不足时会浪费传输资源
  • 使用特定字符来分割数据包,但是若数据中含有分割字符则会出现Bug
  • 在数据包中添加长度字段,弥补了以上两种思路的不足,推荐使用

通过上述分析,我们最好通过第三种思路来解决拆包粘包问题。

Golang的bufio库中有为我们提供了Scanner,来解决这类分割数据的问题。

type Scanner
Scanner provides a convenient interface for reading data such as a file of newline-delimited lines of text. Successive calls to the Scan method will step through the 'tokens' of a file, skipping the bytes between the tokens. The specification of a token is defined by a split function of type SplitFunc; the default split function breaks the input into lines with line termination stripped. Split functions are defined in this package for scanning a file into lines, bytes, UTF-8-encoded runes, and space-delimited words. The client may instead provide a custom split function.

简单来讲即是:

Scanner为 读取数据 提供了方便的 接口。连续调用Scan方法会逐个得到文件的“tokens”,跳过 tokens 之间的字节。token 的规范由 SplitFunc 类型的函数定义。我们可以改为提供自定义拆分功能。

接下来看看 SplitFunc 类型的函数是什么样子的:

type SplitFunc func(data []byte, atEOF bool) (advance int, token []byte, err error)

例子

func main() {
    // An artificial input source.
    const input = "1234 5678 1234567901234567890"
    scanner := bufio.NewScanner(strings.NewReader(input))
    // Create a custom split function by wrapping the existing ScanWords function.
    split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
        advance, token, err = bufio.ScanWords(data, atEOF)
        if err == nil && token != nil {
            _, err = strconv.ParseInt(string(token), 10, 32)
        }
        return
    }
    // Set the split function for the scanning operation.
    scanner.Split(split)
    // Validate the input
    for scanner.Scan() {
        fmt.Printf("%s\n", scanner.Text())
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Invalid input: %s", err)
    }
}

于是,我们可以这样改写我们的程序:

服务端代码 server/main.go

func main() {
    l, err := net.Listen("tcp", ":4044")
    if err != nil {
        panic(err)
    }
    fmt.Println("listen to 4044")
    for {
        conn, err := l.Accept()
        if err != nil {
            fmt.Println("conn err:", err)
        } else {
            go handleConn2(conn)
        }
    }
}

func packetSlitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
        // 检查 atEOF 参数 和 数据包头部的四个字节是否 为 0x123456(我们定义的协议的魔数)
    if !atEOF && len(data) > 6 && binary.BigEndian.Uint32(data[:4]) == 0x123456 {
        var l int16
                // 读出 数据包中 实际数据 的长度(大小为 0 ~ 2^16)
        binary.Read(bytes.NewReader(data[4:6]), binary.BigEndian, &l)
        pl := int(l) + 6
        if pl <= len(data) {
            return pl, data[:pl], nil
        }
    }
    return
}

func handleConn2(conn net.Conn) {
    defer conn.Close()
    defer fmt.Println("关闭")
    fmt.Println("新连接:", conn.RemoteAddr())
    result := bytes.NewBuffer(nil)
        var buf [65542]byte // 由于 标识数据包长度 的只有两个字节 故数据包最大为 2^16+4(魔数)+2(长度标识)
    for {
        n, err := conn.Read(buf[0:])
        result.Write(buf[0:n])
        if err != nil {
            if err == io.EOF {
                continue
            } else {
                fmt.Println("read err:", err)
                break
            }
        } else {
            scanner := bufio.NewScanner(result)
            scanner.Split(packetSlitFunc)
            for scanner.Scan() {
                fmt.Println("recv:", string(scanner.Bytes()[6:]))
            }
        }
        result.Reset()
    }
}

复制代码客户端代码 client/main.go

func main() {
    data := []byte("[这里才是一个完整的数据包]")
    l := len(data)
    fmt.Println(l)
    magicNum := make([]byte, 4)
    binary.BigEndian.PutUint32(magicNum, 0x123456)
    lenNum := make([]byte, 2)
    binary.BigEndian.PutUint16(lenNum, uint16(l))
    packetBuf := bytes.NewBuffer(magicNum)
    packetBuf.Write(lenNum)
    packetBuf.Write(data)
    conn, err := net.DialTimeout("tcp", "localhost:4044", time.Second*30)
    if err != nil {
        fmt.Printf("connect failed, err : %v\n", err.Error())
                return
    }
    for i := 0; i <1000; i++ {
        _, err = conn.Write(packetBuf.Bytes())
        if err != nil {
            fmt.Printf("write failed , err : %v\n", err)
            break
        }
    }
}

复制代码运行结果

listen to 4044
新连接: [::1]:55738
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
recv: [这里才是一个完整的数据包]
...省略其它的...

编写器

type Writer
type Writer interface {
    //Write() 方法有两个返回值,一个是写入到目标资源的字节数,一个是发生错误时的错误。
    Write(p []byte) (n int, err error)
}

io.Writer 表示一个编写器,它从缓冲区读取数据,并将数据写入目标资源。

实现这个接口就需要实现如下的功能

  • Write 将 len(p) 个字节从 p 中写入到基本数据流中。它返回从 p 中被写入的字节数 n(0 <= n <= len(p))以及任何遇到的引起写入提前停止的错误。若 Write 返回的 n < len(p),它就必须返回一个 非nil 的错误。

对于要用作编写器的类型,必须实现 io.Writer 接口的唯一一个方法 Write(p []byte),同样,只要实现了 Write(p []byte) ,那它就是一个编写器。举例,标准库

func main() {
    proverbs := []string{
        "Channels orchestrate mutexes serialize",
        "Cgo is not Go",
        "Errors are values",
        "Don't panic",
    }
    var writer bytes.Buffer

    for _, p := range proverbs {
        n, err := writer.Write([]byte(p))
        if err != nil {
            fmt.Println(err)
            os.Exit(1)
        }
        if n != len(p) {
            fmt.Println("failed to write data")
            os.Exit(1)
        }
    }

    fmt.Println(writer.String())
}
输出打印的内容:
Channels orchestrate mutexes serializeCgo is not GoErrors are valuesDon't panic

自定义Writer

下面我们来实现一个名为 chanWriter 的自定义 io.Writer ,它将其内容作为字节序列写入 channel 。

type chanWriter struct {
    // ch 实际上就是目标资源
    ch chan byte
}

func newChanWriter() *chanWriter {
    return &chanWriter{make(chan byte, 1024)}
}

func (w *chanWriter) Chan() <-chan byte {
    return w.ch
}

func (w *chanWriter) Write(p []byte) (int, error) {
    n := 0
    // 遍历输入数据,按字节写入目标资源
    for _, b := range p {
        w.ch <- b
        n++
    }
    return n, nil
}

func (w *chanWriter) Close() error {
    close(w.ch)
    return nil
}

func main() {
    writer := newChanWriter()
    go func() {
        defer writer.Close()
        writer.Write([]byte("Stream "))
        writer.Write([]byte("me!"))
    }()
    for c := range writer.Chan() {
        fmt.Printf("%c", c)
    }
    fmt.Println()
}

要使用这个 Writer,只需在函数 main() 中调用 writer.Write()(在单独的goroutine中)。

因为 chanWriter 还实现了接口 io.Closer ,所以调用方法 writer.Close() 来正确地关闭channel,以避免发生泄漏和死锁。

closer

Closer 接口包装了基本的 Close 方法,用于关闭数据读写。Close 一般用于关闭文件,关闭通道,关闭连接,关闭数据库等,在不同的标准库实现中实现。

type Closer interface {
    Close() error
}

seeker

Seeker 接口包装了基本的 Seek 方法,用于移动数据的读写指针。

type Seeker interface {
    Seek(offset int64, whence int) (ret int64, err error)
}

Seek 设置下一次读写操作的指针位置,每次的读写操作都是从指针位置开始的。

  • whence 的含义:
    • 如果 whence 为 0:表示从数据的开头开始移动指针。
    • 如果 whence 为 1:表示从数据的当前指针位置开始移动指针。
    • 如果 whence 为 2:表示从数据的尾部开始移动指针。
  • offset 是指针移动的偏移量。
  • 返回新指针位置和遇到的错误。

whence 的值,在 io 包中定义了相应的常量,应该使用这些常量

const (
  SeekStart   = 0 // seek relative to the origin of the file
  SeekCurrent = 1 // seek relative to the current offset
  SeekEnd     = 2 // seek relative to the end
)

而原先 os 包中的常量已经被标注为Deprecated

// Deprecated: Use io.SeekStart, io.SeekCurrent, and io.SeekEnd.
const (
  SEEK_SET int = 0 // seek relative to the origin of the file
  SEEK_CUR int = 1 // seek relative to the current offset
  SEEK_END int = 2 // seek relative to the end
)

组合接口

type ReadWriter interface {
    Reader
    Writer
}

type ReadSeeker interface {
    Reader
    Seeker
}

type WriteSeeker interface {
    Writer
    Seeker
}

type ReadWriteSeeker interface {
    Reader
    Writer
    Seeker
}

type ReadCloser interface {
    Reader
    Closer
}

type WriteCloser interface {
    Writer
    Closer
}

type ReadWriteCloser interface {
    Reader
    Writer
    Closer
}

这些接口的作用是:有些时候同时需要某两个接口的所有功能,即必须同时实现了某两个接口的类型才能够被传入使用。可见,io 包中有大量的“小接口”,这样方便组合为“大接口”。

其他接口

ReaderFrom

ReaderFrom 接口包装了基本的 ReadFrom 方法,用于从 r 中读取数据存入自身。直到遇到 EOF 或读取出错为止,返回读取的字节数和遇到的错误。

type ReaderFrom interface {
    ReadFrom(r Reader) (n int64, err error)
}

需要实现接口的功能

  • ReadFrom 从 r 中读取数据,直到 EOF 或发生错误。其返回值 n 为读取的字节数。除 io.EOF 之外,在读取过程中遇到的任何错误也将被返回。
  • 如果 ReaderFrom 可用,Copy 函数就会使用它。

实例:将文件中的数据全部读取(显示在标准输出):

file, err := os.Open("writeAt.txt")
if err != nil {
    panic(err)
}
defer file.Close()
writer := bufio.NewWriter(os.Stdout)
writer.ReadFrom(file)
writer.Flush()

当然,我们可以通过 ioutil 包的 ReadFile 函数获取文件全部内容。其实,跟踪一下 ioutil.ReadFile 的源码,会发现其实也是通过 ReadFrom 方法实现(用的是 bytes.Buffer,它实现了 ReaderFrom 接口)。

如果不通过 ReadFrom 接口来做这件事,而是使用 io.Reader 接口,我们有两种思路:

  • 先获取文件的大小(File 的 Stat 方法),之后定义一个该大小的 []byte,通过 Read 一次性读取
  • 定义一个小的 []byte,不断的调用 Read 方法直到遇到 EOF,将所有读取到的 []byte 连接到一起

WriterTo

WriterTo 接口包装了基本的 WriteTo 方法,用于将自身的数据写入 w 中。直到数据全部写入完毕或遇到错误为止,返回写入的字节数和遇到的错误。

type WriterTo interface {
    WriteTo(w Writer) (n int64, err error)
}

需要实现接口的功能

  • WriteTo 将数据写入 w 中,直到没有数据可写或发生错误。其返回值 n 为写入的字节数。 在写入过程中遇到的任何错误也将被返回。
  • 如果 WriterTo 可用,Copy 函数就会使用它。

读者是否发现,其实 ReaderFrom 和 WriterTo 接口的方法接收的参数是 io.Reader 和 io.Writer 类型。根据 io.Reader 和 io.Writer 接口的讲解,对该接口的使用应该可以很好的掌握。

ReaderAt

ReaderAt 接口包装了基本的 ReadAt 方法,用于将自身的数据写入 p 中。ReadAt 忽略之前的读写位置,从起始位置的 off 偏移处开始读取。

type ReaderAt interface {
    ReadAt(p []byte, off int64) (n int, err error)
}

需要实现接口的功能

  • ReadAt 从基本输入源的偏移量 off 处开始,将 len(p) 个字节读取到 p 中。它返回读取的字节数 n(0 <= n <= len(p))以及任何遇到的错误。
  • 当 ReadAt 返回的 n < len(p) 时,它就会返回一个 非nil 的错误来解释 为什么没有返回更多的字节。在这一点上,ReadAt 比 Read 更严格。
  • 即使 ReadAt 返回的 n < len(p),它也会在调用过程中使用 p 的全部作为暂存空间。若可读取的数据不到 len(p) 字节,ReadAt 就会阻塞,直到所有数据都可用或一个错误发生。 在这一点上 ReadAt 不同于 Read。
  • 若 n = len(p) 个字节从输入源的结尾处由 ReadAt 返回,Read可能返回 err == EOF 或者 err == nil
  • 若 ReadAt 携带一个偏移量从输入源读取,ReadAt 应当既不影响偏移量也不被它所影响。
  • 可对相同的输入源并行执行 ReadAt 调用。

标准库上面说的很多都是实现了这个接口,简单示例代码如下:

reader := strings.NewReader("Go语言中文网")
p := make([]byte, 6)
n, err := reader.ReadAt(p, 2)
if err != nil {
    panic(err)
}
fmt.Printf("%s, %d\n", p, n)

输出:

语言, 6

WriterAt

WriterAt 接口包装了基本的 WriteAt 方法,用于将 p 中的数据写入自身。ReadAt 忽略之前的读写位置,从起始位置的 off 偏移处开始写入。

type WriterAt interface {
    WriteAt(p []byte, off int64) (n int, err error)
}

需要实现接口的功能

  • WriteAt 从 p 中将 len(p) 个字节写入到偏移量 off 处的基本数据流中。它返回从 p 中被写入的字节数 n(0 <= n <= len(p))以及任何遇到的引起写入提前停止的错误。若 WriteAt 返回的 n < len(p),它就必须返回一个 非nil 的错误。
  • 若 WriteAt 携带一个偏移量写入到目标中,WriteAt 应当既不影响偏移量也不被它所影响。
  • 若被写区域没有重叠,可对相同的目标并行执行 WriteAt 调用。

os.File 实现了 WriterAt 接口,实例如下

file, err := os.Create("writeAt.txt")
if err != nil {
    panic(err)
}
defer file.Close()
file.WriteString("Golang中文社区——这里是多余")
n, err := file.WriteAt([]byte("Go语言中文网"), 24)
if err != nil {
    panic(err)
}
fmt.Println(n)

输出

Golang中文社区——Go语言中文网。

分析:file.WriteString(“Golang中文社区——这里是多余”) 往文件中写入 Golang中文社区——这里是多余,之后 file.WriteAt([]byte(“Go语言中文网”), 24) 在文件流的 offset=24 处写入 Go语言中文网(会覆盖该位置的内容)。

ByteReader和ByteWriter

ByteReader 接口包装了基本的 ReadByte 方法,用于从自身读出一个字节。返回读出的字节和遇到的错误。

type ByteReader interface {
    ReadByte() (c byte, err error)
}

ByteWriter 接口包装了基本的 WriteByte 方法,用于将一个字节写入自身返回遇到的错误

type ByteWriter interface {
    WriteByte(c byte) error
}

这组接口在标准库中也有实现

bufio.Reader/Writer 分别实现了io.ByteReader 和 io.ByteWriter
bytes.Buffer 同时实现了 io.ByteReader 和 io.ByteWriter
bytes.Reader 实现了 io.ByteReader
strings.Reader 实现了 io.ByteReader

实例

var ch byte
fmt.Scanf("%c\n", &ch)

buffer := new(bytes.Buffer)
err := buffer.WriteByte(ch)
if err == nil {
    fmt.Println("写入一个字节成功!准备读取该字节……")
    newCh, _ := buffer.ReadByte()
    fmt.Printf("读取的字节:%c\n", newCh)
} else {
    fmt.Println("写入错误")
}

ByteScanner

ByteScanner 在 ByteReader 的基础上增加了一个 UnreadByte 方法,用于撤消最后一次的 ReadByte 操作,以便下次的 ReadByte 操作可以读出与前一次一样的数据。UnreadByte 之前必须是 ReadByte 才能撤消成功,否则可能会返回一个错误信息(根据不同的需求,UnreadByte 也可能返回 nil,允许随意调用 UnreadByte,但只有最后一次的 ReadByte 可以被撤销,其它 UnreadByte 不执行任何操作)。

type ByteScanner interface {
    ByteReader
    UnreadByte() error
}

RuneReader

RuneReader 接口包装了基本的 ReadRune 方法,用于从自身读取一个 UTF-8 编码的字符到 r 中。返回读取的字符、字符的编码长度和遇到的错误。

type RuneReader interface {
    ReadRune() (r rune, size int, err error)
}

RuneScanner

RuneScanner 在 RuneReader 的基础上增加了一个 UnreadRune 方法,用于撤消最后一次的 ReadRune 操作,以便下次的 ReadRune 操作可以读出与前一次一样的数据。UnreadRune 之前必须是 ReadRune 才能撤消成功,否则可能会返回一个错误信息(根据不同的需求,UnreadRune 也可能返回 nil,允许随意调用 UnreadRune,但只有最后一次的 ReadRune 可以被撤销,其它 UnreadRune 不执行任何操作)。

type RuneScanner interface {
    RuneReader
    UnreadRune() error
}

实例

bytes.NewBuffer 实现了很多基本的接口,可以通过 bytes 包学习接口的实现

func main() {
    buf := bytes.NewBuffer([]byte("Hello World!"))
    b := make([]byte, buf.Len())

    n, err := buf.Read(b)
    fmt.Printf("%s   %v\n", b[:n], err)
    // Hello World!   <nil>

    buf.WriteString("ABCDEFG\n")
    buf.WriteTo(os.Stdout)
    // ABCDEFG

    n, err = buf.Write(b)
    fmt.Printf("%d   %s   %v\n", n, buf.String(), err)
    // 12   Hello World!   <nil>

    c, err := buf.ReadByte()
    fmt.Printf("%c   %s   %v\n", c, buf.String(), err)
    // H   ello World!   <nil>

    c, err = buf.ReadByte()
    fmt.Printf("%c   %s   %v\n", c, buf.String(), err)
    // e   llo World!   <nil>

    err = buf.UnreadByte()
    fmt.Printf("%s   %v\n", buf.String(), err)
    // ello World!   <nil>

    err = buf.UnreadByte()
    fmt.Printf("%s   %v\n", buf.String(), err)
    // ello World!   bytes.Buffer: UnreadByte: previous operation was not a read
}

类型

io包中定义了很多原生的类型。都是实现了上面的接口,可以直接创建使用的类型。

SectionReader 类型

SectionReader 是一个 struct(没有任何导出的字段),实现了 Read, Seek 和 ReadAt,同时,内嵌了 ReaderAt 接口。结构定义如下:

type SectionReader struct {
    r     ReaderAt    // 该类型最终的 Read/ReadAt 最终都是通过 r 的 ReadAt 实现
    base  int64        // NewSectionReader 会将 base 设置为 off
    off   int64        // 从 r 中的 off 偏移处开始读取数据
    limit int64        // limit - off = SectionReader 流的长度
}

从名称我们可以猜到,该类型读取数据流中部分数据。看一下常见的创建函数

func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader

NewSectionReader 返回一个 SectionReader,它从 r 中的偏移量 off 处读取 n 个字节后以 EOF 停止。也就是说,SectionReader 只是内部(内嵌)ReaderAt 表示的数据流的一部分:从 off 开始后的 n 个字节。这个类型的作用是:方便重复操作某一段 (section) 数据流;或者同时需要 ReadAt 和 Seek 的功能。

LimitedReader 类型

LimitedReader 结构定义如下:

type LimitedReader struct {
    R Reader // underlying reader,最终的读取操作通过 R.Read 完成
    N int64  // max bytes remaining
}

从 R 读取但将返回的数据量限制为 N 字节。每调用一次 Read 都将更新 N 来反应新的剩余数量。也就是说,最多只能返回 N 字节数据。LimitedReader 只实现了 Read 方法(Reader 接口)。

使用示例如下:

content := "This Is LimitReader Example"
reader := strings.NewReader(content)
limitReader := &io.LimitedReader{R: reader, N: 8}
for limitReader.N > 0 {
    tmp := make([]byte, 2)
    limitReader.Read(tmp)
    fmt.Printf("%s", tmp)
}

输出:

This Is

可见,通过该类型可以达到 只允许读取一定长度数据 的目的。

在 io 包中,LimitReader 函数的实现其实就是调用 LimitedReader:

func LimitReader(r Reader, n int64) Reader { return &LimitedReader{r, n} }

PipeReader 和 PipeWriter 类型

PipeReader

PipeReader(一个没有任何导出字段的 struct)是管道的读取端。它实现了 io.Reader 和 io.Closer 接口。结构定义如下:

type PipeReader struct {
    p *pipe
}

关于 PipeReader.Read 方法的说明:从管道中读取数据。该方法会堵塞,直到管道写入端开始写入数据或写入端被关闭。如果写入端关闭时带有 error(即调用 CloseWithError 关闭),该Read返回的 err 就是写入端传递的error;否则 err 为 EOF。

PipeWriter

PipeWriter(一个没有任何导出字段的 struct)是管道的写入端。它实现了 io.Writer 和 io.Closer 接口。结构定义如下:

type PipeWriter struct {
    p *pipe
}

关于 PipeWriter.Write 方法的说明:写数据到管道中。该方法会堵塞,直到管道读取端读完所有数据或读取端被关闭。如果读取端关闭时带有 error(即调用 CloseWithError 关闭),该Write返回的 err 就是读取端传递的error;否则 err 为 ErrClosedPipe。

Pipe

io.Pipe() 用于创建一个同步的内存管道 (synchronous in-memory pipe),函数签名:

func Pipe() (*PipeReader, *PipeWriter)

它将 io.Reader 连接到 io.Writer。一端的读取匹配另一端的写入,直接在这两端之间复制数据;它没有内部缓存。它对于并行调用 Read 和 Write 以及其它函数或 Close 来说都是安全的。一旦等待的 I/O 结束,Close 就会完成。并行调用 Read 或并行调用 Write 也同样安全:同种类的调用将按顺序进行控制。

正因为是同步的,因此不能在一个 goroutine 中进行读和写。

读关闭管道

func (r *PipeReader) Close() error

读关闭管道并传入错误信息。

func (r *PipeReader) CloseWithError(err error) error

从管道中读取数据,如果管道被关闭,则会返会一个错误信息:

  • 1、如果写入端通过 CloseWithError 方法关闭了管道,则返回关闭时传入的错误信息。
  • 2、如果写入端通过 Close 方法关闭了管道,则返回 io.EOF。
  • 3、如果是读取端关闭了管道,则返回 io.ErrClosedPipe。

示例:管道(读取端关闭)

func main() {
    r, w := io.Pipe()
    // 启用一个例程进行读取
    go func() {
        buf := make([]byte, 5)
        for n, err := 0, error(nil); err == nil; {
            n, err = r.Read(buf)
            r.CloseWithError(errors.New("管道被读取端关闭"))
            fmt.Printf("读取:%d, %v, %s\n", n, err, buf[:n])
        }
    }()
    // 主例程进行写入
    n, err := w.Write([]byte("Hello World !"))
    fmt.Printf("写入:%d, %v\n", n, err)
}

写关闭管道

func (w *PipeWriter) Close() error

写关闭管道并传入错误信息。

func (w *PipeWriter) CloseWithError(err error) error

向管道中写入数据,如果管道被关闭,则会返会一个错误信息:

  • 1、如果读取端通过 CloseWithError 方法关闭了管道,则返回关闭时传入的错误信息。
  • 2、如果读取端通过 Close 方法关闭了管道,则返回 io.ErrClosedPipe。
  • 3、如果是写入端关闭了管道,则返回 io.ErrClosedPipe。

示例:管道(写入端关闭)

func main() {
    r, w := io.Pipe()
    // 启用一个例程进行读取
    go func() {
        buf := make([]byte, 5)
        for n, err := 0, error(nil); err == nil; {
            n, err = r.Read(buf)
            fmt.Printf("读取:%d, %v, %s\n", n, err, buf[:n])
        }
    }()
    // 主例程进行写入
    n, err := w.Write([]byte("Hello World !"))
    fmt.Printf("写入:%d, %v\n", n, err)

    w.CloseWithError(errors.New("管道被写入端关闭"))
    n, err = w.Write([]byte("Hello World !"))
    fmt.Printf("写入:%d, %v\n", n, err)
    time.Sleep(time.Second * 1)
}

综合使用实例:

func main() {
    pipeReader, pipeWriter := io.Pipe()
    go PipeWrite(pipeWriter)
    go PipeRead(pipeReader)
    time.Sleep(30 * time.Second)
}

func PipeWrite(writer *io.PipeWriter){
    data := []byte("Go语言中文网")
    for i := 0; i < 3; i++{
        n, err := writer.Write(data)
        if err != nil{
            fmt.Println(err)
            return
        }
        fmt.Printf("写入字节 %d\n",n)
    }
    writer.CloseWithError(errors.New("写入段已关闭"))
}

func PipeRead(reader *io.PipeReader){
    buf := make([]byte, 128)
    for{
        fmt.Println("接口端开始阻塞5秒钟...")
        time.Sleep(5 * time.Second)
        fmt.Println("接收端开始接受")
        n, err := reader.Read(buf)
        if err != nil{
            fmt.Println(err)
            return
        }
        fmt.Printf("收到字节: %d\n buf内容: %s\n",n,buf)
    }
}

函数

io包中也有一下原生实现可以使用的函数。其实都是直接操作结构体的函数。

WriteString

WriteString 将字符串 s 写入到 w 中,返回写入的字节数和遇到的错误。如果 w 实现了 WriteString 方法,则优先使用该方法将 s 写入 w 中。否则,将 s 转换为 []byte,然后调用 w.Write 方法将数据写入 w 中。

func WriteString(w Writer, s string) (n int, err error)

ReadAtLeast

ReadAtLeast

ReadAtLeast 从 r 中读取数据到 buf 中,要求至少读取 min 个字节。返回读取的字节数和遇到的错误。

func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error)

如果 min 超出了 buf 的容量,则 err 返回 io.ErrShortBuffer,否则:

  • 1、读出的数据长度 == 0 ,则 err 返回 EOF。
  • 2、读出的数据长度 < min,则 err 返回 io.ErrUnexpectedEOF。
  • 3、读出的数据长度 >= min,则 err 返回 nil。

ReadFull

ReadFull 的功能和 ReadAtLeast 一样,只不过 min = len(buf)

func ReadFull(r Reader, buf []byte) (n int, err error)

示例:WriteString、ReadAtLeast、ReadFull

func main() {
    io.WriteString(os.Stdout, "Hello World!\n")
    // Hello World!

    r := strings.NewReader("Hello World!")
    b := make([]byte, 15)

    n, err := io.ReadAtLeast(r, b, 20)
    fmt.Printf("%q   %d   %v\n", b[:n], n, err)
    // ""   0   short buffer

    r.Seek(0, 0)
    b = make([]byte, 15)

    n, err = io.ReadFull(r, b)
    fmt.Printf("%q   %d   %v\n", b[:n], n, err)
    // "Hello World!"   12   unexpected EOF
}

LimitReader

LimitReader 对 r 进行封装,使其最多只能读取 n 个字节的数据。相当于对 r 做了一个切片 r[:n] 返回。底层实现是一个 *LimitedReader(只有一个 Read 方法)。

func LimitReader(r Reader, n int64) Reader

MultiReader

MultiReader 将多个 Reader 封装成一个单独的 Reader,多个 Reader 会按顺序读取,当多个 Reader 都返回 EOF 之后,单独的 Reader 才返回 EOF,否则返回读取过程中遇到的任何错误。

func MultiReader(readers ...Reader) Reader

MultiWriter

MultiWriter 将向自身写入的数据同步写入到所有 writers 中。

func MultiWriter(writers ...Writer) Writer

TeeReader

TeeReader 对 r 进行封装,使 r 在读取数据的同时,自动向 w 中写入数据。它是一个无缓冲的 Reader,所以对 w 的写入操作必须在 r 的 Read 操作结束之前完成。所有写入时遇到的错误都会被作为 Read 方法的 err 返回。

func TeeReader(r Reader, w Writer) Reader

示例 LimitReader

func main() {
    r := strings.NewReader("Hello World!")
    lr := io.LimitReader(r, 5)

    n, err := io.Copy(os.Stdout, lr)  // Hello
    fmt.Printf("\n%d   %v\n", n, err) // 5   <nil>
}

示例 MultiReader

func main() {
    r1 := strings.NewReader("Hello World!")
    r2 := strings.NewReader("ABCDEFG")
    r3 := strings.NewReader("abcdefg")
    b := make([]byte, 15)
    mr := io.MultiReader(r1, r2, r3)

    for n, err := 0, error(nil); err == nil; {
        n, err = mr.Read(b)
        fmt.Printf("%q\n", b[:n])
    }
    // "Hello World!"
    // "ABCDEFG"
    // "abcdefg"
    // ""

    r1.Seek(0, 0)
    r2.Seek(0, 0)
    r3.Seek(0, 0)
    mr = io.MultiReader(r1, r2, r3)
    io.Copy(os.Stdout, mr)
    // Hello World!ABCDEFGabcdefg
}

示例 MultiWriter

func main() {
    r := strings.NewReader("Hello World!\n")
    mw := io.MultiWriter(os.Stdout, os.Stdout, os.Stdout)

    r.WriteTo(mw)
    // Hello World!
    // Hello World!
    // Hello World!
}

示例 TeeReader

func main() {
    r := strings.NewReader("Hello World!")
    b := make([]byte, 15)
    tr := io.TeeReader(r, os.Stdout)

    n, err := tr.Read(b)                  // Hello World!
    fmt.Printf("\n%s   %v\n", b[:n], err) // Hello World!   <nil>
}

Copy

CopyN

CopyN 从 src 中复制 n 个字节的数据到 dst 中,返回复制的字节数和遇到的错误。只有当 written = n 时,err 才返回 nil。如果 dst 实现了 ReadFrom 方法,则优先调用该方法执行复制操作。

func CopyN(dst Writer, src Reader, n int64) (written int64, err error)

Copy

Copy 从 src 中复制数据到 dst 中,直到所有数据都复制完毕,返回复制的字节数和遇到的错误。如果复制过程成功结束,则 err 返回 nil,而不是 EOF,因为 Copy 的定义为“直到所有数据都复制完毕”,所以不会将 EOF 视为错误返回。如果 src 实现了 WriteTo 方法,则调用 src.WriteTo(dst) 复制数据,否则如果 dst 实现了 ReadeFrom 方法,则调用 dst.ReadeFrom(src) 复制数据

func Copy(dst Writer, src Reader) (written int64, err error)

CopyBuffer

CopyBuffer 相当于 Copy,只不 Copy 在执行的过程中会创建一个临时的缓冲区来中转数据,而 CopyBuffer 则可以单独提供一个缓冲区,让多个复制操作共用同一个缓冲区,避免每次复制操作都创建新的缓冲区。如果 buf == nil,则 CopyBuffer 会自动创建缓冲区。

func CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)

示例:CopyN、Copy、CopyBuffer

func main() {
    r := strings.NewReader("Hello World!")
    buf := make([]byte, 32)

    n, err := io.CopyN(os.Stdout, r, 5) // Hello
    fmt.Printf("\n%d   %v\n\n", n, err) // 5   <nil>

    r.Seek(0, 0)
    n, err = io.Copy(os.Stdout, r)      // Hello World!
    fmt.Printf("\n%d   %v\n\n", n, err) // 12   <nil>

    r.Seek(0, 0)
    r2 := strings.NewReader("ABCDEFG")
    r3 := strings.NewReader("abcdefg")

    n, err = io.CopyBuffer(os.Stdout, r, buf) // Hello World!
    fmt.Printf("\n%d   %v\n", n, err)         // 12   <nil>

    n, err = io.CopyBuffer(os.Stdout, r2, buf) // ABCDEFG
    fmt.Printf("\n%d   %v\n", n, err)          // 7   <nil>

    n, err = io.CopyBuffer(os.Stdout, r3, buf) // abcdefg
    fmt.Printf("\n%d   %v\n", n, err)          // 7   <nil>
}

这个函数还是我们在网络消息流量转发的时候还是经常使用的。

场景举例

base64编码成字符串

encoding/base64包中:

func NewEncoder(enc *Encoding, w io.Writer) io.WriteCloser

这个用来做base64编码,但是仔细观察发现,它需要一个io.Writer作为输出目标,并用返回的WriteCloser的Write方法将结果写入目标,下面是Go官方文档的例子

input := []byte("foo\x00bar")
encoder := base64.NewEncoder(base64.StdEncoding, os.Stdout)
encoder.Write(input)

这个例子是将结果写入到Stdout,如果我们希望得到一个字符串呢?可以用bytes.Buffer作为目标io.Writer:

input := []byte("foo\x00bar")
buffer := new(bytes.Buffer)
encoder := base64.NewEncoder(base64.StdEncoding, buffer)
encoder.Write(input)
fmt.Println(string(buffer.Bytes())

[]byte和struct之间正反序列化

这种场景经常用在基于字节的协议上,比如有一个具有固定长度的结构:

type Protocol struct {
    Version     uint8
    BodyLen     uint16
    Reserved    [2]byte
    Unit        uint8
    Value       uint32
}

通过一个[]byte来反序列化得到这个Protocol,一种思路是遍历这个[]byte,然后逐一赋值。其实在encoding/binary包中有个方便的方法:

func Read(r io.Reader, order ByteOrder, data interface{}) error

这个方法从一个io.Reader中读取字节,并已order指定的端模式,来给填充data(data需要是fixed-sized的结构或者类型)。要用到这个方法首先要有一个io.Reader,从上面的图中不难发现,我们可以这么写:

var p Protocol
var bin []byte
//...
binary.Read(bytes.NewReader(bin), binary.LittleEndian, &p)

换句话说,我们将一个[]byte转成了一个io.Reader。

反过来,我们需要将Protocol序列化得到[]byte,使用encoding/binary包中有个对应的Write方法:

func Write(w io.Writer, order ByteOrder, data interface{}) error

通过将[]byte转成一个io.Writer即可:

var p Protocol
buffer := new(bytes.Buffer)
//...
binary.Writer(buffer, binary.LittleEndian, p)
bin := buffer.Bytes()

从流中按行读取

比如对于常见的基于文本行的HTTP协议的读取,我们需要将一个流按照行来读取。本质上,我们需要一个基于缓冲的读写机制(读一些到缓冲,然后遍历缓冲中我们关心的字节或字符)。在Go中有一个bufio的包可以实现带缓冲的读写:

func NewReader(rd io.Reader) *Reader
func (b *Reader) ReadString(delim byte) (string, error)

这个ReadString方法从io.Reader中读取字符串,直到delim,就返回delim和之前的字符串。如果将delim设置为\n,相当于按行来读取了:

var conn net.Conn
//...
reader := NewReader(conn)
for {
    line, err := reader.ReadString([]byte('\n'))
    //...
}

string to byte

花式技(zuo)巧(si)

string转[]byte
a := "Hello, playground"
fmt.Println([]byte(a))

等价于

a := "Hello, playground"
buf := new(bytes.Buffer)
buf.ReadFrom(strings.NewReader(a))
fmt.Println(buf.Bytes())

标准库中实现的读取器和编写器的实例

目前,Go 文档中还没有直接列出实现了某个接口的所有类型。不过,我们可以通过查看标准库文档,列出实现了 io.Reader 或 io.Writer 接口的类型(导出的类型):(注:godoc 命令支持额外参数 -analysis ,能列出都有哪些类型实现了某个接口,相关参考 godoc -h 或 Static analysis features of godoc。另外,还有一个地址:http://docs.studygolang.com。

  • os.File 同时实现了 io.Reader 和 io.Writer
  • strings.Reader 实现了 io.Reader
  • bufio.Reader/Writer 分别实现了 io.Reader 和 io.Writer
  • bytes.Buffer 同时实现了 io.Reader 和 io.Writer
  • bytes.Reader 实现了 io.Reader
  • compress/gzip.Reader/Writer 分别实现了 io.Reader 和 io.Writer
  • crypto/cipher.StreamReader/StreamWriter 分别实现了 io.Reader 和 io.Writer
  • crypto/tls.Conn 同时实现了 io.Reader 和 io.Writer
  • encoding/csv.Reader/Writer 分别实现了 io.Reader 和 io.Writer
  • mime/multipart.Part 实现了 io.Reader
  • net.Conn 分别实现了 io.Reader 和 io.Writer(Conn接口定义了Read/Write)

除此之外,io 包本身也有这两个接口的实现类型。如:

以上类型中,常用的类型有,文件IO,缓冲IO,网络IO,在标准库中都有实现

网络io/文件io/标准io–其实就是操作网络数据和文件中的数据 - net.Conn, os.Stdin, os.File: 网络、标准输入输出、文件的流读取,对应—frp就是基于这个基础上实现的

缓存io–其实就是操作缓存中的string,[]byte - strings.Readerstrings.Builder: 把字符串抽象成Reader - bytes.Reader: 把[]byte抽象成Reader - bytes.Buffer: 把[]byte抽象成Reader和Writer

缓存io–还是使用缓存,但是主要是对io.reader实例进行读写 - bufio.Reader/Writer: 抽象成带缓冲的流读取(比如按行读写)