go源码解读-io包

io包中的接口定义

  • io包中提供了IO操作相关的一系列接口
  • io包可以理解对os包中底层操作的封装
  • 由于是对底层进行封装,除非特殊提及,否则io包下的函数是线程不安全的

Reader接口

  • Reader接口只包含一个Read方法
  • Read方法最多读取len(p)个字节到p中,返回读取到p中的字节数及过程中的错误
  • Read方法读取过程中出错,会返回已经读取到的字节数
  • Read出错的时候可以返回已经读取到的字节数和nil错误或者返回遇到的错误,而再下次调用Read方法的时候返回0, err即可
  • Read方法的调用者需要先处理n>0的情况下的数据,而后考虑err的值,防止读取的数据丢失
  • Read方法不鼓励返回0,nil样式的返回值,除非len(p) == 0
  • p不能作为实现Reader接口的结构体的成员变量
1
2
3
type Reader interface {
Read(p []byte) (n int, err error)
}

Writer接口

  • Writer接口只包含Write方法
  • Write方法完成从p中写入len(p)个字节到数据流中,返回的是写入的字节数及遇到的错误
  • Write方法如果返回的n<len(p),则必须返回个非空的错误
  • Write方法不允许调整p中的数据,临时调整也不可以
  • p不能作为实现Writer接口的结构体的成员变量
1
2
3
type Writer interface {
Write(p []byte) (n int, err error)
}

Closer接口

  • Closer接口只包含Close方法
  • Closer接口返回关闭对象中遇到的错误
1
2
3
type Closer interface {
Close() error
}

Seeker接口

  • Seeker接口只含有一个Seek方法
  • Seek方法根据whence的值计算出下一个读取或者写入的偏移量
    • SeekStart对起始文件的偏移量
    • SeekCurrent对当前索引的偏移量
    • SeekEnd对结尾的偏移量
  • Seek方法返回相对于起始位置的偏移量,和遇到的错误
  • 实际上返回的偏移量小于文件的起始索引会产生错误
1
2
3
4
5
6
7
8
9
const (
SeekStart = 0 // seek relative to the origin of the file
SeekCurrent = 1 // seek relative to the current offset
SeekEnd = 2 // seek relative to the end
)
type Seeker interface {
Seek(offset int64, whence int) (int64, error)
}

组合接口

  • 对于上述的接口进行组合会产生一系列新的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ReadWriter is the interface that groups the basic Read and Write methods.
type ReadWriter interface {
Reader
Writer
}
// ReadCloser is the interface that groups the basic Read and Close methods.
type ReadCloser interface {
Reader
Closer
}
// WriteCloser is the interface that groups the basic Write and Close methods.
type WriteCloser interface {
Writer
Closer
}
// ReadWriteCloser is the interface that groups the basic Read, Write and Close methods.
type ReadWriteCloser interface {
Reader
Writer
Closer
}
// ReadSeeker is the interface that groups the basic Read and Seek methods.
type ReadSeeker interface {
Reader
Seeker
}
// WriteSeeker is the interface that groups the basic Write and Seek methods.
type WriteSeeker interface {
Writer
Seeker
}
// ReadWriteSeeker is the interface that groups the basic Read, Write and Seek methods.
type ReadWriteSeeker interface {
Reader
Writer
Seeker
}

ReaderFrom接口

  • ReaderFrom接口只包含ReadFrom方法
  • ReadFrom方法从r中读取数据直到遇到EOF或者error
  • ReadFrom方法返回读取到的字节数和遇到的错误
  • io.Copy函数会优先使用
1
2
3
type ReaderFrom interface {
ReadFrom(r Reader) (n int64, err error)
}

WriteTo接口

  • WriteTo接口只包含WriteTo方法
  • WriteTo方法向w中写入数据,直到没有数据可以写入或者遇到了错误
  • WriteTo方法返回写入的字节数和遇到的问题
  • io.Copy函数会优先使用
1
2
3
type WriterTo interface {
WriteTo(w Writer) (n int64, err error)
}

ReaderAt接口

  • ReaderAt接口只包含ReadAt方法
  • ReadAt方法在实现的结构体的输入流中,在偏移量offset的位置读取len(p)个字节数据导p中
  • ReadAt方法在读取到的字节小于len(p)时会返回一个非空错误来解释为什么没有读取完全,这个地方的要求比Reader接口更加严格
  • ReadAt方法在调用过程中会占据p的全部内存空间,即使读取到的n小于p的长度
  • ReadAt如果读取到的数据小于len(p),则ReadAt方法会等待更多的数据到来或者返回一个错误,这个地方要求也是比Reader更加严格
  • ReadAt如果读取完最后一个数据之后,刚好读取了len(p)个字节,则可以返回的err为nil或者EOF
  • ReadAt方法如果读取的是一个实现了Seek接口的结构体,那么ReadAt的offset和seek的offset不应该相互影响
  • 读取的数据流可以支持多个ReadAt方法并发读取,也就是ReadAt是线程安全的
1
2
3
type ReaderAt interface {
ReadAt(p []byte, off int64) (n int, err error)
}

WriteAt接口

  • WriterAt接口只包含WriteAt方法
  • WriteAt方法完成向数据流的写入,写入的起点为输入流的offset位置
  • WriteAt方法返回向数据流写入的数据和遇到的错误
  • WriteAt方法当写入的数据小于len(p)时,返回的错误不能为nil
  • WriteAt方法写入的数据流实现了Seek方法,则Seek方法的offset和WriteAt方法的offset互不影响
  • 在数据流不溢出的情况下,WriteAt方法可以并发写入,是线程安全的
    -
1
2
3
type WriterAt interface {
WriteAt(p []byte, off int64) (n int, err error)
}

ByteReader接口

  • ByteReader接口只包含ReadByte方法
  • ReadByte方法读取一个字节,返回从input数据流中读取到的下一个字节或者遇到的错误
  • 如果返回了一个错误,则input数据流没有字节被读取消费掉,并且返回的字节值为undefined
1
2
3
type ByteReader interface {
ReadByte() (byte, error)
}

ByteScanner接口

  • ByteScanner接口封装了ByteReader接口和UnreadByte方法
  • UnreadByte方法在ReadByte方法之后调用,会重置ReadByte方法的操作
  • 如果在没调用ReadByte方法时,调用UnreadByte方法会报错
1
2
3
4
type ByteScanner interface {
ByteReader
UnreadByte() error
}

ByteWriter接口

  • ByteWriter接口只包含WriteByte方法
  • WriteByte方法完成对数据流的字节写入
1
2
3
type ByteWriter interface {
WriteByte(c byte) error
}

RuneReader接口

  • RuneReader接口包含ReadRune方法
  • Readrune方法从数据流中读取一个UTF-8编码的Unicode字符,返回读取到的字符,字符的大小和遇到的错误
1
2
3
type RuneReader interface {
ReadRune() (r rune, size int, err error)
}

RuneScanner接口

  • RuneScanner接口封装了RuneReader接口和UnreadRune方法
  • UnreadRune方法实际上是重置ReadRune方法的操作,需要在ReadRune方法之后调用
1
2
3
4
type RuneScanner interface {
RuneReader
UnreadRune() error
}

StringWriter接口

1
2
3
type StringWriter interface {
WriteString(s string) (n int, err error)
}

io包中的错误定义

1
2
3
4
5
6
7
8
9
var ErrShortWrite = errors.New("short write")
var ErrShortBuffer = errors.New("short buffer")
var EOF = errors.New("EOF")
var ErrUnexpectedEOF = errors.New("unexpected EOF")
var ErrNoProgress = errors.New("multiple Read calls return no data or error")

io包中定义的结构体

LimitedReader

  • LimitedReader读入数据到R中,限制读入的字节数为N
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type LimitedReader struct {
R Reader // underlying reader
N int64 // max bytes remaining
}
// 初始化方法
func LimitReader(r Reader, n int64) Reader { return &LimitedReader{r, n} }
// 实现Reader接口
func (l *LimitedReader) Read(p []byte) (n int, err error) {
if l.N <= 0 {
return 0, EOF
}
if int64(len(p)) > l.N {
p = p[0:l.N]
}
n, err = l.R.Read(p)
l.N -= int64(n)
return
}

SectionReader

  • SectionReader完成数据的部分读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
type SectionReader struct {
r ReaderAt
base int64
off int64
limit int64
}
// 初始化
func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader {
return &SectionReader{r, off, off, off + n}
}
// 实现Reader接口
func (s *SectionReader) Read(p []byte) (n int, err error) {
if s.off >= s.limit {
return 0, EOF
}
if max := s.limit - s.off; int64(len(p)) > max {
p = p[0:max]
}
n, err = s.r.ReadAt(p, s.off)
s.off += int64(n)
return
}
// 实现Seeker接口
func (s *SectionReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
default:
return 0, errWhence
case SeekStart:
offset += s.base
case SeekCurrent:
offset += s.off
case SeekEnd:
offset += s.limit
}
if offset < s.base {
return 0, errOffset
}
s.off = offset
return offset - s.base, nil
}
// 实现ReaderAt接口
func (s *SectionReader) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 || off >= s.limit-s.base {
return 0, EOF
}
off += s.base
if max := s.limit - off; int64(len(p)) > max {
p = p[0:max]
n, err = s.r.ReadAt(p, off)
if err == nil {
err = EOF
}
return n, err
}
return s.r.ReadAt(p, off)
}
// 返回section的字节尺寸
func (s *SectionReader) Size() int64 { return s.limit - s.base }

常用工具类方法

  • WriteString函数将字符串s写入到w中
1
2
3
4
5
6
func WriteString(w Writer, s string) (n int, err error) {
if sw, ok := w.(StringWriter); ok {
return sw.WriteString(s)
}
return w.Write([]byte(s))
}
  • ReadAtLeast函数从r中读取数据导buf中,同时规定了最少读取的字节数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == EOF {
err = ErrUnexpectedEOF
}
return
}
  • ReadFull一次性读取len(buf)个数到buf中,实际上是对ReadAtLeast的封装
1
2
3
func ReadFull(r Reader, buf []byte) (n int, err error) {
return ReadAtLeast(r, buf, len(buf))
}
  • Copy方法完成将src的数据复制到dst中,直到遇到EOF或者错误
  • Copy成功的话会返回nil,而不是EOF
  • 如果src实现了WriteTo方法,则会优先调用,否则会调用dst的ReadFrom方法
1
2
3
func Copy(dst Writer, src Reader) (written int64, err error) {
return copyBuffer(dst, src, nil)
}
  • CopyBuffer和Copy是一致的,只是增加了缓冲区
1
2
3
4
5
6
func CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
if buf != nil && len(buf) == 0 {
panic("empty buffer in io.CopyBuffer")
}
return copyBuffer(dst, src, buf)
}
  • copyBuffer是实际上的Copy和CopyBuffer的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
// 如果src实现了WriteTo接口,则调用其WroteTo方法完成写入
if wt, ok := src.(WriterTo); ok {
return wt.WriteTo(dst)
}
// 如果dst实现了ReaderFrom接口,则调用ReadFrom方法完成写入
if rt, ok := dst.(ReaderFrom); ok {
return rt.ReadFrom(src)
}
// 缓冲区大小确定
if buf == nil {
size := 32 * 1024
if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
if l.N < 1 {
size = 1
} else {
size = int(l.N)
}
}
buf = make([]byte, size)
}
for {
// 读取到buf中
nr, er := src.Read(buf)
if nr > 0 {
// 写入操作
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = ErrShortWrite
break
}
}
if er != nil {
if er != EOF {
err = er
}
break
}
}
return written, err
}
  • CopyN完成从src复制N个字节数据到dst
1
2
3
4
5
6
7
8
9
10
func CopyN(dst Writer, src Reader, n int64) (written int64, err error) {
written, err = Copy(dst, LimitReader(src, n))
if written == n {
return n, nil
}
if written < n && err == nil {
err = EOF
}
return
}

ioutil中的工具类

ReadAll函数

  • ReadAll函数将r中的数据读取出来并返回
  • 成功读取返回的err为nil
  • readAll函数实际上是利用bytes.Buffer完成数据的读取的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func ReadAll(r io.Reader) ([]byte, error) {
return readAll(r, bytes.MinRead)
}
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
var buf bytes.Buffer
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
// 防止capacity溢出
if int64(int(capacity)) == capacity {
buf.Grow(int(capacity))
}
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}

ReadFile函数

  • ReadFile完成对指定文件的读取并且返回其数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func ReadFile(filename string) ([]byte, error) {
// 打开文件
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
var n int64 = bytes.MinRead
if fi, err := f.Stat(); err == nil {
// 确定读取的size,比文件的大小稍微多一些
if size := fi.Size() + bytes.MinRead; size > n {
n = size
}
}
// 调用readAll方法来完成数据的读取
return readAll(f, n)
}

WriteFile函数

  • WriteFile将数据写入到名称为filename的文件中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func WriteFile(filename string, data []byte, perm os.FileMode) error {
// 打开文件,如果不存在,则创建
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
// 数据的追加写入
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
// 关闭文件
if err1 := f.Close(); err == nil {
err = err1
}
return err
}

ReadDir函数

  • ReadDir函数读取dirname下的文件或者路径并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
func ReadDir(dirname string) ([]os.FileInfo, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
list, err := f.Readdir(-1)
f.Close()
if err != nil {
return nil, err
}
sort.Slice(list, func(i, j int) bool { return list[i].Name() < list[j].Name() })
return list, nil
}

NopCloser

  • NopCloser返回带Close方法的Reader
1
2
3
4
5
6
7
8
9
func NopCloser(r io.Reader) io.ReadCloser {
return nopCloser{r}
}
type nopCloser struct {
io.Reader
}
func (nopCloser) Close() error { return nil }

Discard

  • Discard的写入操作都会被忽略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Discard实际上是通过devNull来构建的
var Discard io.Writer = devNull(0)
// devNull结构体本身是个int数据
type devNull int
// 实现了Writer接口
func (devNull) Write(p []byte) (int, error) {
return len(p), nil
}
// 实现了StringWriter接口
func (devNull) WriteString(s string) (int, error) {
return len(s), nil
}
// 实现了ReaderFrom接口
func (devNull) ReadFrom(r io.Reader) (n int64, err error) {
bufp := blackHolePool.Get().(*[]byte)
readSize := 0
for {
readSize, err = r.Read(*bufp)
n += int64(readSize)
if err != nil {
blackHolePool.Put(bufp)
if err == io.EOF {
return n, nil
}
return
}
}
}
var blackHolePool = sync.Pool{
New: func() interface{} {
b := make([]byte, 8192)
return &b
},
}

Pipe函数

  • Pipe在内存中创建了个管道,可以用于连接需要io.Reader和io.Writer的代码
  • 读写会相互阻塞,除非一个写入被多次读取的场景存在
  • Pipe是并发安全的
1
2
3
4
5
6
7
8
func Pipe() (*PipeReader, *PipeWriter) {
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}

PipeReader和PipeWriter

  • PipeReader是管道读的半边
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type PipeReader struct {
p *pipe
}
// 实现了io.Reader接口
func (r *PipeReader) Read(data []byte) (n int, err error) {
return r.p.Read(data)
}
// 实现了io.Closer接口
func (r *PipeReader) Close() error {
return r.CloseWithError(nil)
}
func (r *PipeReader) CloseWithError(err error) error {
return r.p.CloseRead(err)
}
  • PipeWriter是管道写的半边
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type PipeWriter struct {
p *pipe
}
// 实现了io.Writer接口
func (w *PipeWriter) Write(data []byte) (n int, err error) {
return w.p.Write(data)
}
// 实现了io。Close方法
func (w *PipeWriter) Close() error {
return w.CloseWithError(nil)
}
func (w *PipeWriter) CloseWithError(err error) error {
return w.p.CloseWrite(err)
}

pipe结构体

  • 实际上Pipe是对pipe的封装
1
2
3
4
5
6
7
8
9
10
type pipe struct {
wrMu sync.Mutex // 保障写入的安全
wrCh chan []byte // 写入的数据
rdCh chan int // 读取到的数量的byte数
once sync.Once // 保证pipe只被关闭一次
done chan struct{} // pipe关闭通道信息
rerr atomicError // 读错误
werr atomicError // 写错误
}
  • pipe实现了io.Reader接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *pipe) Read(b []byte) (n int, err error) {
// 判断要读取的通道是否已经关闭
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
// 将数据写入b中,并将写入的字节数写入rdCh通道中
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
  • pipe实现了io.Writer接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (p *pipe) Write(b []byte) (n int, err error) {
// 保证写入的时候安全,且pipe没有关闭
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}
// 写入数据
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
  • pipe的Close方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (p *pipe) CloseRead(err error) error {
if err == nil {
err = ErrClosedPipe
}
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}
func (p *pipe) CloseWrite(err error) error {
if err == nil {
err = EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}

multi

MultiReader 将多个Reader的内容写入p

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func MultiReader(readers ...Reader) Reader {
r := make([]Reader, len(readers))
copy(r, readers)
return &multiReader{r}
}
type multiReader struct {
readers []Reader
}
func (mr *multiReader) Read(p []byte) (n int, err error) {
for len(mr.readers) > 0 {
// flat嵌套
if len(mr.readers) == 1 {
if r, ok := mr.readers[0].(*multiReader); ok {
mr.readers = r.readers
continue
}
}
n, err = mr.readers[0].Read(p)
if err == EOF {
// 避免nl panic
mr.readers[0] = eofReader{} // permit earlier GC
mr.readers = mr.readers[1:]
}
if n > 0 || err != EOF {
if err == EOF && len(mr.readers) > 0 {
// Don't return EOF yet. More readers remain.
err = nil
}
return
}
}
return 0, EOF
}
// 避免nil panic
type eofReader struct{}
func (eofReader) Read([]byte) (int, error) {
return 0, EOF
}

MultiWriter 将p的内容写入多个数据流中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func MultiWriter(writers ...Writer) Writer {
allWriters := make([]Writer, 0, len(writers))
for _, w := range writers {
if mw, ok := w.(*multiWriter); ok {
allWriters = append(allWriters, mw.writers...)
} else {
allWriters = append(allWriters, w)
}
}
return &multiWriter{allWriters}
}
type multiWriter struct {
writers []Writer
}
func (t *multiWriter) Write(p []byte) (n int, err error) {
for _, w := range t.writers {
n, err = w.Write(p)
if err != nil {
return
}
if n != len(p) {
err = ErrShortWrite
return
}
}
return len(p), nil
}
func (t *multiWriter) WriteString(s string) (n int, err error) {
var p []byte // lazily initialized if/when needed
for _, w := range t.writers {
if sw, ok := w.(StringWriter); ok {
n, err = sw.WriteString(s)
} else {
if p == nil {
p = []byte(s)
}
n, err = w.Write(p)
}
if err != nil {
return
}
if n != len(s) {
err = ErrShortWrite
return
}
}
return len(s), nil
}
Donate comment here