小黄

黄小黄的幸福生活!


  • 首页

  • 标签

  • 分类

  • 归档

  • Java

go源码解读-database/driver包

发表于 2020-10-28 | 分类于 go , 源码解读 , 1.13.11
  • driver包定义了一系列需要由不同驱动实现的接口,供sql包调用

driver中定义的类型和接口

必选接口

  • Value的值需要能被具体的数据库驱动实现所处理
  • Value的值包括:
    • nil
    • NamedValueChecker可以处理的类型
    • int64
    • float64
    • bool
    • []byte
    • string
    • time.Time
  • 如果数据库驱动实现cursor,返回的Value类型对象需要实现Rows接口
  • NamedValue包含了Value类型的值和其名称
1
2
3
4
5
6
7
type Value interface{}
type NamedValue struct {
Name string // Value的名称
Ordinal int // 参数次序
Value Value // 参数值
}
  • Driver接口是数据库驱动实现必须实现的接口之一
  • 数据库实际上可以实现DriverContext接口来将连接放到连接池中,避免每次操作建立一次连接
  • Driver接口的Open方法返回一个新的数据库连接
  • Open方法可能返回一个之前关闭的缓存连接,但是sql包中实际上是有连接池来处理空闲连接并提高连接的使用率,因此没太必要
  • 返回的Conn连接在同一时间只能被一个线程使用
1
2
3
type Driver interface {
Open(name string) (Conn, error) // 创建连接
}
  • Conn表示一个数据库连接,不能被多个线程同时使用
    • Conn是有状态的
    • Prepare方法返回一个prepared statement,并和当前的连接绑定
    • Close方法会关闭ps语句和事物,释放当前连接
    • sql包中对于连接的管理十分完备,因此不推荐驱动自己管理连接缓存,连接的管理交由sql包完成
    • Conn接口包含Begin方法来开启一个事物,返回Tx对象
  • Stmt接口和一个具体的数据库连接相绑定,不能同时被多个线程使用,preparedStatement
    • Close方法完成statement语句的关闭,基本不调用,语句的关闭由sql包管理
    • NumInput方法返回stmt语句中的占位符的个数
    • Exec方法执行不返回rows数据的数据库操作,比如insert,update操作等,返回Result,目前已经被弃用,driver通过实现StmtExecContext来实现该操作
    • Query方法执行返回rows数据的数据库操作,比如select操作,返回Rows,被弃用,通过实现StmtQueryContext来实现该操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 连接
type Conn interface {
Prepare(query string) (Stmt, error)
Close() error
Begin() (Tx, error)
}
// preparedStatement语句
type Stmt interface {
Close() error
NumInput() int
Exec(args []Value) (Result, error)
Query(args []Value) (Rows, error)
}
// 执行结果,insert,updated
type Result interface {
// 返回上次插入的数据的自增ID
LastInsertId() (int64, error)
// 返回数据库语句被影响的行数
RowsAffected() (int64, error)
}
// Rows是个可迭代的返回结果
type Rows interface {
// 返回列的名字信息
Columns() []string
Close() error
// 获取Rows的下一列数据
Next(dest []Value) error
}
// Tx表示一个数据库事务
type Tx interface {
Commit() error // 数据库提交
Rollback() error // 回滚
}
  • 如果数据库实现了DriverContext接口,则sql.DB会通过调用OpenConnector方法获取一个Connector对象,之后调用Connector的Connect方法来获取数据库连接
  • 实现DriverContext的好处在于可以解析一次连接名称并可以复用之前创建的连接,效率更高
  • OpenConnector解析的数据库连接格式和Driver.Open方法是一致的
  • Connector持有固定的连接,可以创建多个等价的数据库连接支持多线程来使用
1
2
3
4
5
6
7
8
9
10
11
type DriverContext interface {
// OpenConnector解析的数据库连接格式和Driver.Open方法是一致的
OpenConnector(name string) (Connector, error)
}
type Connector interface {
// 创建数据库连接,创建出的连接同时支持一个线程使用
Connect(context.Context) (Conn, error)
// 返回Connector底层的Driver对象,供sql.DB使用
Driver() Driver
}
  • ConnPrepareContext将Conn接口和context结合
1
2
3
type ConnPrepareContext interface {
PrepareContext(ctx context.Context, query string) (Stmt, error)
}
  • IsolationLevel表示事务隔离级别,和sql.IsolationLevel相关联
  • TxOptions结构体持有事务相关选项,取值参考sql.TxOptions
  • ConnBeginTx将Conn和context关联起来
1
2
3
4
5
6
7
8
type IsolationLevel int
type TxOptions struct {
Isolation IsolationLevel
ReadOnly bool
}
type ConnBeginTx interface {
BeginTx(ctx context.Context, opts TxOptions) (Tx, error)
}
  • SessionResetter来实现一个连接的重置
1
2
3
type SessionResetter interface {
ResetSession(ctx context.Context) error
}
  • StmtExecContext,StmtQueryContext接口也是与Context相关联起来
1
2
3
4
5
6
7
type StmtExecContext interface {
ExecContext(ctx context.Context, args []NamedValue) (Result, error)
}
type StmtQueryContext interface {
QueryContext(ctx context.Context, args []NamedValue) (Rows, error)
}
  • NamedValueChecker来实现对NamedValue的检查
1
2
3
type NamedValueChecker interface {
CheckNamedValue(*NamedValue) error
}
  • RowsNextResultSet扩展Rows对象,提供了HasNextResultSet和NextResultSet来实现对结果的多种操作
  • RowsColumnTypeScanType返回Column的Type类型
  • RowsColumnTypeDatabaseTypeName返回Column对应的数据库类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type RowsNextResultSet interface {
Rows
HasNextResultSet() bool
NextResultSet() error
}
type RowsColumnTypeScanType interface {
Rows
ColumnTypeScanType(index int) reflect.Type
}
type RowsColumnTypeDatabaseTypeName interface {
Rows
ColumnTypeDatabaseTypeName(index int) string
}
type RowsColumnTypeLength interface {
Rows
ColumnTypeLength(index int) (length int64, ok bool)
}
type RowsColumnTypeNullable interface {
Rows
ColumnTypeNullable(index int) (nullable, ok bool)
}
type RowsColumnTypePrecisionScale interface {
Rows
ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool)
}

可选接口

  • Pinger是一个可选接口,一般由Conn来实现
  • Pinger由sql.DB来调用,根据其返回值来判定是否连接仍然可用,否则将其移除出连接池
1
2
3
type Pinger interface {
Ping(ctx context.Context) error
}
  • ExecerContext由sql.DB.Exec来调用
  • 未实现该接口,会调用Execer的Exec方法,如果仍然没有实现,则会调用stmt,exce, close
  • QueryerContext 和ExecerContext类似
1
2
3
4
5
6
type ExecerContext interface {
ExecContext(ctx context.Context, query string, args []NamedValue) (Result, error)
}
type QueryerContext interface {
QueryContext(ctx context.Context, query string, args []NamedValue) (Rows, error)
}

go源码解读-database包

发表于 2020-10-28 | 分类于 go , 源码解读 , 1.13.11

database包简介

  • database包设定了一些目标
    • database包旨在为多种SQL语言或者类SQL语言提供统一的API调用接口
    • 对sql的操作要像go语言的风格
    • 专注于通用的情况,极端情况不是十分关心
    • 将database驱动的实现进行分离,只提供必要的接口和类型作为基础
    • 类型转换要使用与所有的数据库语言,因此类型转换多发生在sql包内,而不是在驱动的实现中发生类型转换
    • 类型转换需要灵活,但是也要追求类型的准确性
    • 处理好多线程,提供线程池给用户使用,并发问题由sql包解决
    • 对底层驱动实现做好封装
    • 为底层驱动提供可选的接口,来实习那一些特殊的例子和快捷方法

DB结构体

  • DB结构提供了连接池,可以支持多个协程共同使用,是线程安全的结构体
  • sql包会自动的创建和释放数据库连接,连接池会持有一些空闲的连接
  • DB支持事物,当db.Begin调用的时候,tx会和一个连接进行绑定,当执行提交或者回滚之后,连接会回到连接池中待用
  • db中连接池的大小由SetMaxIdleConns方法来控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type DB struct {
waitDuration int64 // 等待新连接到达的时间
connector driver.Connector // 驱动的连接接口,实现了Driver方法和Connect方法
numClosed uint64 // 记录关闭的连接数量
mu sync.Mutex // 互斥量,支持多线程操作
freeConn []*driverConn // idel连接列表
connRequests map[uint64]chan connRequest // 连接请求
nextRequest uint64 // Next key to use in connRequests.
numOpen int // 已经建立的连接数量,包含使用中的和空闲的
openerCh chan struct{} // 是否开启新的连接
resetterCh chan *driverConn
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdle int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{}
waitCount int64 // Total number of connections waited for.
maxIdleClosed int64 // Total number of connections closed due to idle.
maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
stop func() // stop cancels the connection opener and the session resetter.
}

go源码解读-log包

发表于 2020-10-28 | 分类于 go , 源码解读 , 1.13.11

log包概述

  • go源码提供了一个简单的日志包,包含Logger类型及其相关方法
  • log包提供了一系列便捷方法来打印简单日子,如Println,Fatalln,Panicln等
  • Fatal方法打印完日志后会调用os.Exit(1)进行退出
  • Panic方法打印完日志后会调用panic方法退出

Logger结构体

  • Logger结构体可以理解为一个对象实例,可以逐行向io.Writer对象写入数据
  • Logger结构体是线程安全的,sync.Mutex互斥量保证原子写入
1
2
3
4
5
6
7
8
9
10
11
12
type Logger struct {
mu sync.Mutex // 互斥量保证原子写入
prefix string // 日志前缀
flag int // 日志格式flag
out io.Writer // 写入的目标
buf []byte // 缓存要写入的内容
}
// 有参数构造方法
func New(out io.Writer, prefix string, flag int) *Logger {
return &Logger{out: out, prefix: prefix, flag: flag}
}
  • flag定义来决定日志打印的格式化,采用比特位来节省空间
1
2
3
4
5
6
7
8
9
const (
Ldate = 1 << iota // 日期 00000001 2009/01/23
Ltime // 时间 00000010 01:23:23
Lmicroseconds // 显示毫秒 00000100
Llongfile // 显示打印文件的全路径 00001000 /a/b/c/d.go:23
Lshortfile // 显示打印文件的短路径,00010000 d.go, 会覆盖Llongfile
LUTC // 使用UTC时区
LstdFlags = Ldate | Ltime // 默认flag
)

Logger的相关方法

  • SetOutput指定日志的输出,参数为实现io.Writer接口的方法
    -
1
2
3
4
5
6
// SetOutput sets the output destination for the logger.
func (l *Logger) SetOutput(w io.Writer) {
l.mu.Lock()
defer l.mu.Unlock()
l.out = w
}
  • formatHeader方法为内部方法
  • formatHeader方法将日志头部写入缓存,依次为prefix信息,日期信息,打印日志文件信息
  • itoa完成输入的数字转换成指定长度的字符串并写入buf中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (l *Logger) formatHeader(buf *[]byte, t time.Time, file string, line int) {
// 写入prefix信息
*buf = append(*buf, l.prefix...)
// 判定flag的日期标志位不为0 => 00000111 => 也就是flag的二进制的后三位不都为0,写入日期信息到缓存
if l.flag&(Ldate|Ltime|Lmicroseconds) != 0 {
// 时区判定
if l.flag&LUTC != 0 {
t = t.UTC()
}
// 写入日期信息 2020/11/11
if l.flag&Ldate != 0 {
year, month, day := t.Date()
itoa(buf, year, 4)
*buf = append(*buf, '/')
itoa(buf, int(month), 2)
*buf = append(*buf, '/')
itoa(buf, day, 2)
*buf = append(*buf, ' ')
}
// 写入时间信息 11:11:11
if l.flag&(Ltime|Lmicroseconds) != 0 {
hour, min, sec := t.Clock()
itoa(buf, hour, 2)
*buf = append(*buf, ':')
itoa(buf, min, 2)
*buf = append(*buf, ':')
itoa(buf, sec, 2)
if l.flag&Lmicroseconds != 0 {
*buf = append(*buf, '.')
itoa(buf, t.Nanosecond()/1e3, 6)
}
*buf = append(*buf, ' ')
}
}
// 写入文件信息
if l.flag&(Lshortfile|Llongfile) != 0 {
if l.flag&Lshortfile != 0 {
short := file
for i := len(file) - 1; i > 0; i-- {
if file[i] == '/' {
short = file[i+1:]
break
}
}
file = short
}
*buf = append(*buf, file...)
*buf = append(*buf, ':')
itoa(buf, line, -1)
*buf = append(*buf, ": "...)
}
}
// itoa 写入指定位数到buf中
func itoa(buf *[]byte, i int, wid int) {
var b [20]byte
// 从最后一个字节开始写入
bp := len(b) - 1
for i >= 10 || wid > 1 {
wid--
q := i / 10
// 写入i的最后一位数字
b[bp] = byte('0' + i - q*10)
bp--
i = q
}
// i < 10
b[bp] = byte('0' + i)
// 将wid位字符写入buf缓存中
*buf = append(*buf, b[bp:]...)
}
  • Output方法完成日志的写入实现
  • s包含要输出的内容,calldepth记录调用深度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (l *Logger) Output(calldepth int, s string) error {
now := time.Now() // get this early.
var file string
var line int
l.mu.Lock()
defer l.mu.Unlock()
// flag包含文件的bit位不全是0,则获取文件信息
if l.flag&(Lshortfile|Llongfile) != 0 {
// Release lock while getting caller info - it's expensive.
l.mu.Unlock()
var ok bool
// runtime.Caller方法获取函数调用的行数和文件名
_, file, line, ok = runtime.Caller(calldepth)
if !ok {
file = "???"
line = 0
}
l.mu.Lock()
}
// 缓存置空
l.buf = l.buf[:0]
// 写入前缀,日期,文件信息到buf中
l.formatHeader(&l.buf, now, file, line)
// 写入打印的日志到buf中
l.buf = append(l.buf, s...)
// 自动换行
if len(s) == 0 || s[len(s)-1] != '\n' {
l.buf = append(l.buf, '\n')
}
// 将缓存写入到out对象中
_, err := l.out.Write(l.buf)
return err
}
  • Logger结构体的常见输出日志的方法
  • Logger的getter和setter方法,不列出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//格式化输出和普通输出
func (l *Logger) Printf(format string, v ...interface{}) {
l.Output(2, fmt.Sprintf(format, v...))
}
func (l *Logger) Print(v ...interface{}) { l.Output(2, fmt.Sprint(v...)) }
func (l *Logger) Println(v ...interface{}) { l.Output(2, fmt.Sprintln(v...)) }
// Fatal 方法,会调用os.Exit(1)退出,多用于脚本中
func (l *Logger) Fatal(v ...interface{}) {
l.Output(2, fmt.Sprint(v...))
os.Exit(1)
}
func (l *Logger) Fatalf(format string, v ...interface{}) {
l.Output(2, fmt.Sprintf(format, v...))
os.Exit(1)
}
func (l *Logger) Fatalln(v ...interface{}) {
l.Output(2, fmt.Sprintln(v...))
os.Exit(1)
}
// Panic方法会调用panic进行退出,多用于错误处理,和recover联合使用
func (l *Logger) Panic(v ...interface{}) {
s := fmt.Sprint(v...)
l.Output(2, s)
panic(s)
}
func (l *Logger) Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
l.Output(2, s)
panic(s)
}
func (l *Logger) Panicln(v ...interface{}) {
s := fmt.Sprintln(v...)
l.Output(2, s)
panic(s)
}
  • 为了方便快捷的使用log包,提供了一些公有方法采用打印输出到标准输出中,log.Println()即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 标准错误,flag为 Ldate | Ltime => 00000011
var std = New(os.Stderr, "", LstdFlags)
func Print(v ...interface{}) {
std.Output(2, fmt.Sprint(v...))
}
func Fatal(v ...interface{}) {
std.Output(2, fmt.Sprint(v...))
os.Exit(1)
}
func Panic(v ...interface{}) {
s := fmt.Sprint(v...)
std.Output(2, s)
panic(s)
}

未命名

发表于 2020-08-06

go源码解读-unsafe包

发表于 2020-05-29 | 分类于 go , 源码解读

unsafe包

  • unsafe包中的操作是不安全的操作,会绕过go原因的内存保护机制
  • 引用有风险,使用须谨慎
  • unsafe包提供了直接操作内存的能力

unsafe源码

  • unsafe包中只包含两个结构体和三个函数
1
2
3
4
5
type ArbitraryType int
type Pointer *ArbitraryType
func Sizeof(x ArbitraryType) uintptr
func Offsetof(x ArbitraryType) uintptr
func Alignof(x ArbitraryType) uintptr

两个变量

1
2
type ArbitraryType int
type Pointer *ArbitraryType
  • ArbitraryType实际上并不是unsafe包的一部分,只是为了文档展示
  • ArbitraryType虽然类型时int,但是其实质上代表的是任意类型的go表达式
    -
  • Pointer变量代表一个指向ArbitraryType对象的指针,实际上就是一个指向任意类型的指针
  • Pointer结构体有四个其他类型的结构体不具有的操作,这些能力使得go可以无视类型的限制来读写任意的内存,使用的时候需要谨慎
    • 一个任意结构体类型的指针可以转换为Pointer类型
    • 一个Pointer类型的结构体可以转换成一个指针
    • 一个uintptr可以转换成Pointer类型
    • 一个Pointer类型的结构体可以转换成一个uintptr指针,uintptr是一个能存储指针的整形值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Human struct {
sex bool
age uint8
min int
name string
}
func main() {
human := Human{
true,
30,
1,
"hello",
}
// 结构体转换
fmt.Printf("address os h is %p \n", &human)
pointer := unsafe.Pointer(&human)
fmt.Printf("convert pointer human to Pointer type: %p \n", pointer)
// 直接操作对象的内存
u1 := (*int16)(unsafe.Pointer(uintptr(unsafe.Pointer(&human)) + unsafe.Offsetof(human.age)))
fmt.Println(human.age)
*u1 = 12
fmt.Println(human.age)
}
/*
address os h is 0xc00000c060
convert pointer human to Pointer type: 0xc00000c060
0xc00000c061
30
12
*/
  • unsafe包规定了一些Pointer合理的使用模式,不符合的可能已经不可用或者即将不可用
    • 将*T1类型的指针转换成Pointer类型,然后再讲Pointer类型转换成T2类型指针
      • 这要求T1占用的内存要大于T2占用的内存
      • 如math.Float64bits的实现
    • 将一个Pointer类型转换成一个uintptr类型,也就是将指向的值的地址作为一个uintptr变量的值,
      • 将uintptr转换成Pointer通常来说会产生问题
      • uintptr是个值,不是指针,因此其没有任何语义
      • uintptr即使有某个对象的地址,但是在gc的时候是不会影响该对象的gc操作的
    • Pointer -> uintptr -> Pointer有时候是合法的,如进行算数运算的
      • p = unsafe.Pointer(uintptr(p)+offset)
      • 最常用的地方是获取结构体中的元素指针或者数组中的元素
      • f := unsafe.Pointer(uintptr(unsafe.Pointer(&s)) + unsafe.Offsetof(s.f)) 等价于 f := unsafe.Pointer(&s.f)
      • e := unsafe.Pointer(uintptr(unsafe.Pointer(&x[0])) + i*unsafe.Sizeof(x[0])) 等价于 e := unsafe.Pointer(&x[i])
      • 但是如果计算完指向了未分配的内存,则是不合法的
    • 调用syscall.Syscall的时候将Pointer类型转换成uintptr
    • 将reflect.Value.Pointer或者reflect.Value.UnsafeAddr的结果从uintptr转换成Pointer类型时合法的
      • 这是因为reflect返回uintptr可以让上层调用不必引入unsafe包
      • p :+ (*int)(unsafe.Pointer(reflect.ValueOf(new)))
    • Pointer可以与reflect.SliceHeader或者reflect.StringHeader的Data字段进行互相转换
      • hdr.Data = uintptr(unsafe.Pointer(p))
1
2
3
4
// Float64bits returns the IEEE 754 binary representation of f,
// with the sign bit of f and the result in the same bit position,
// and Float64bits(Float64frombits(x)) == x.
func Float64bits(f float64) uint64 { return *(*uint64)(unsafe.Pointer(&f)) }

三个函数

1
2
3
func Sizeof(x ArbitraryType) uintptr
func Offsetof(x ArbitraryType) uintptr
func Alignof(x ArbitraryType) uintptr
  • 三个函数传入的参数为ArbitraryType类型变量,也就是可以穿任意类型的数据的地址进来
  • SizeOf函数返回x类型占据的字节数的
1
2
3
4
5
6
func main() {
slice := []int{2, 3, 4, 5, 6, 7, 8, 9}
i := int(6)
fmt.Println(int(unsafe.Sizeof(i)) * len(slice)) // 64
fmt.Println(unsafe.Sizeof(slice)) // 24
}
  • Offsetof实际上返回的是当前结构体开始的位置到当前字段的位置之间的偏移量
  • Alignof返回变量对其字节数量ß

go源码解读-sync.WaitGroup

发表于 2020-05-21 | 分类于 go , 源码解读

Pool

  • 一个Pool是一群对象的集合,集合可能是单独存储
  • Pool中存储的对象可以自动在池中移除而不用通知,当只有Pool持有对象的引用时,对象有可能会释放内存
  • Pool是线程安全的
  • Pool的创建用意是缓存已经分配内存且已经用完的对象,方便后续复用的时候不必申请内存,同时减轻gc的压力
  • Pool适用于管理一组对象,可以被多个独立的客户端复用,从而避免每个客户端都要申请内存。
  • 如果对象存在的声明周期很短,则不适合用Pool来管理

Pool结构体

1
2
3
4
5
6
7
8
type Pool struct {
noCopy noCopy // 确保pool在使用之后不会被复制
local unsafe.Pointer // 指向[]poolLocal的指针
localSize uintptr // poolLocal的长度
victim unsafe.Pointer // victim一次GC之后的幸存者,这个地方也是指向[]poolLocal的指针,local经过一次GC就变为了victim
victimSize uintptr // victim的长度
New func() interface{} // 当无法从Pool中获取到新的对象的时候,会调用New函数来创建个对象来返回
}
  • poolLocal是每个调度器P存储对象的结构体
  • pad防止伪共享
  • private为每个调度器的私有空间
  • shared空间当前调度器可以pushHead和popTail,所有调度器都可以popTail
1
2
3
4
5
6
7
8
9
10
11
12
13
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
  • poolChain为动态版的poolDequeue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type poolChain struct {
// push节点,由当前P来push,不需要加锁
head *poolChainElt
// get节点,所有调度器都可以操作,需要用原子方法写入和读取
tail *poolChainElt
}
// poolChain的一个节点
type poolChainElt struct {
poolDequeue // 为一个无锁,固定大小的单生产者多消费穿者的唤醒队列
next, prev *poolChainElt
}
type poolDequeue struct {
// headTail表示下标,高32位表示头下标,低32位表示尾下标,poolDequeue定义了,head tail的pack和unpack函数方便转化,实际用的时候都会mod ( len(vals) - 1 ) 来防止溢出
headTail uint64
// vals的大小必须是2的幂,因为go的内存管理策略是将内存分为2的幂大小的链表,申请2的幂大小的内存可以有效减小分配内存的开销
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}

Pool结构体的方法

  • Put方法将对象添加到Pool中
    • pin函数将当前goroutine绑定到指定的调度器P上,同时禁止抢占,返回poolLocal对象和绑定的Pid
    • 从Pool的local中获取调度器P的poolLocal,没有需要新建
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      func (p *Pool) Put(x interface{}) {
      if x == nil { // 放入的对象为空,直接返回
      return
      }
      if race.Enabled { // 关闭竞争检测
      if fastrand()%4 == 0 {
      // Randomly drop x on floor.
      return
      }
      race.ReleaseMerge(poolRaceAddr(x))
      race.Disable() // 关闭操作
      }
      l, _ := p.pin()
      // 私有空间为空,则将x放置到私有空间
      if l.private == nil {
      l.private = x
      x = nil
      }
      // 私有空间已满,则将x放到公有空间中
      if x != nil {
      l.shared.pushHead(x)
      }
      runtime_procUnpin()
      if race.Enabled {
      race.Enable()
      }
      }
      func (p *Pool) pin() (*poolLocal, int) {
      pid := runtime_procPin() // 关闭抢占,这个goroutine工作完才释放时间片
      s := atomic.LoadUintptr(&p.localSize) // load-acquire
      l := p.local // load-consume
      if uintptr(pid) < s { // 如果p.local的长度大于pid,则直接取数据即可
      return indexLocal(l, pid), pid
      }
      // pinSlow函数来完成p.local的新增
      return p.pinSlow()
      }
      func indexLocal(l unsafe.Pointer, i int) *poolLocal {
      lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
      return (*poolLocal)(lp)
      }
      func (p *Pool) pinSlow() (*poolLocal, int) {
      // 尝试重新将goroutine绑定到其他的goroutine,查看是否有poolLocal使用
      runtime_procUnpin()
      allPoolsMu.Lock()
      defer allPoolsMu.Unlock()
      pid := runtime_procPin()
      // poolCleanup won't be called while we are pinned.
      s := p.localSize
      l := p.local
      // 新绑定的P有poolLocal空间,则直接获取返回
      if uintptr(pid) < s {
      return indexLocal(l, pid), pid
      }
      // Pool为空,则将pool添加到allPools中
      if p.local == nil {
      allPools = append(allPools, p)
      }
      // 创建一个cpu数量大小的[]poolLocal并返回
      size := runtime.GOMAXPROCS(0)
      local := make([]poolLocal, size)
      atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
      atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
      return &local[pid], pid
      }

Get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (p *Pool) Get() interface{} {
// 关闭竞争检测
if race.Enabled {
race.Disable()
}
// 将goroutine固定到P上,并获取其localPool和pid
l, pid := p.pin()
x := l.private
l.private = nil
// localPool的私有空间为空,则从共享头空间中pop数据
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
// pop数据为空,则调用getSlow方法来获取
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 如果没有获取到,则调用pool的New函数来完成创建
if x == nil && p.New != nil {
x = p.New()
}
return x
}
// 懒获取
func (p *Pool) getSlow(pid int) interface{} {
// 获取Pool的localSize和local
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 遍历其他调度器的polLocal,看起poptail中是否可以取出对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 到pool的victim中查询
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
// 私有空间有直接返回
if x := l.private; x != nil {
l.private = nil
return x
}
// 遍历victim中其他调度的poolLocal,看是否可以通过popTail获取到对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 没有查到,编辑victim为空,下次就不查找victim
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}

go源码解读-sync.WaitGroup

发表于 2020-05-18 | 分类于 go , 源码解读

WaitGroup

  • WaitGroup结构体会等待一组goroutines结束
  • Add方法会设置等待的goroutine的数量
  • goroutine结束之后调用done即可
  • WaitGroup可以用于所有线程都结束之后才执行的逻辑
  • 使用之后不可以再复制
  • state1为64位结构的值
    • 高位的32位为计数器
    • 低位的32位为等待线程数
1
2
3
4
type WaitGroup struct {
noCopy noCopy // 第一次使用之后,不可以用copy函数进行复制
state1 [3]uint32 // 计数器,高4个字节记录要等待的线程总数,低位4个字节记录还需要等待完成的线程数量
}

state方法

  • state方法返回计数和信号量的地址
1
2
3
4
5
6
7
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

Add方法

  • Add方法添加计数的goroutine数到wg中
  • 添加完计数器变为0,所有的阻塞的线程会被释放
  • 添加完计数器为负数,会panic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (wg *WaitGroup) Add(delta int) {
// 获取当前计数和信号量的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// 高32位来进行计数器的CAS加法
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 当前需要等待的线总数
v := int32(state >> 32)
// 获取需要等待结束的线程数
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v或者w为0直接返回即可
if v > 0 || w == 0 {
return
}
// Add和Wait同步发生,此时会报错
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 将等待完成的线程数设置为0,逐一释放之前等待的线程
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false)
}
}

Done方法

  • Done方法完成counter–
1
2
3
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait方法

  • Wait方法完成线程阻塞,直到counter为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (wg *WaitGroup) Wait() {
// 获取计数器和信号量的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
// 循环等待counter为0
for {
state := atomic.LoadUint64(statep)
// 获取counter和waiter的值
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// 如果counter为0,则直接返回,进行后续操作
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 否则waiter计数++
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(semap))
}
// 排队休眠,等待信号量唤醒
runtime_Semacquire(semap)
// 休眠过程中,wg被重用会导致state不一致,从而panic
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}

go源码解读-sync.RWMutex

发表于 2020-05-18 | 分类于 go , 源码解读

RWMutex

  • RWMutex是一个互斥读写锁
  • 可以被多个读线程持有或者被一个写线程持有
  • 零值为未上锁的mutex
  • 第一次使用之后不可复制
  • 多个线程的读取操作不会被阻塞,读写,写读,读读会被阻塞
1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 互斥量
writerSem uint32 // 写阻塞等待的信号量,最后一个读线程释放锁时会释放信号量
readerSem uint32 // 读阻塞等待信号量,持有写锁的线程完成释放锁后释放的信号量
readerCount int32 // 记录读者的数量
readerWait int32 // 记录写阻塞时读者的个数
}

RWMutex的方法

  • RLock方法完成读锁的加锁,如果是个写线程请求锁将会被阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 完成当前持有该锁的读者的个数的+1,如果没有持有该锁的读线程,则会唤醒等待中的写信号量
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
  • RUnlock方法完成读锁的解锁操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
race.Enable()
}
}
  • Lock完成写锁的加锁
  • 写锁实际上是加到w中的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
  • Unlock方法完成写锁的解锁
  • 逻辑与Mutex的解锁是一致的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
  • 封装了专门的读锁
1
2
3
4
5
6
7
8
9
10
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

go源码解读-sync.Cond

发表于 2020-05-18 | 分类于 go , 源码解读

Cond

  • Cond结构体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Cond struct {
noCopy noCopy // noCopy保证Cond第一次被使用之后不能被copy复制
L Locker // L为获取Cond的值或者改变Cond的值的时候需要获取的锁
notify notifyList // notifyList记录Cond将要通知的信号量列表
checker copyChecker // 用于检查copy复制的状态
}
// 初始化方法
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

Cond实现的方法

  • Wait方法会对c.L进行解锁,并挂起当前执行的线程,之后当前线程的执行
  • Wait操作实际上是会释放当前线程的锁,进入等待队列,等待被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *Cond) Wait() {
// 检查是否被复制
c.checker.check()
// 将当前线程加入到等待队列中
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 等待锁
runtime_notifyListWait(&c.notify, t)
// 加锁继续执行
c.L.Lock()
}
// 将调用线程加入到等待队列中
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
  • Signal将会唤醒一个等待c的线程
1
2
3
4
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
  • Broadcast将会唤醒所有等待c的线程
1
2
3
4
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func TestCondSignal(t *testing.T) {
var m sync.Mutex
c := sync.NewCond(&m)
n := 10
running := make(chan bool, n)
awake := make(chan bool, n)
for i := 0; i < n; i++ {
go func(i int) {
m.Lock()
running <- true
t.Logf("goroutine %d is waiting \n", i+1)
c.Wait()
awake <- true
t.Logf("goroutine %d is awake\n", i+1)
m.Unlock()
}(i)
}
for i := 0; i < n; i++ {
<-running // Wait for everyone to run.
}
for n > 5 {
select {
case <-awake:
t.Fatal("goroutine not asleep")
default:
}
m.Lock()
c.Signal()
m.Unlock()
<-awake // Will deadlock if no goroutine wakes up
select {
case <-awake:
t.Fatal("too many goroutines awake")
default:
}
n--
}
c.Signal()
}
/*
=== RUN TestCondSignal
--- PASS: TestCondSignal (0.00s)
cond_test.go:18: goroutine 10 is waiting
cond_test.go:18: goroutine 3 is waiting
cond_test.go:18: goroutine 1 is waiting
cond_test.go:18: goroutine 8 is waiting
cond_test.go:18: goroutine 2 is waiting
cond_test.go:18: goroutine 7 is waiting
cond_test.go:18: goroutine 9 is waiting
cond_test.go:18: goroutine 4 is waiting
cond_test.go:18: goroutine 5 is waiting
cond_test.go:18: goroutine 6 is waiting
cond_test.go:21: goroutine 10 is awake
cond_test.go:21: goroutine 3 is awake
cond_test.go:21: goroutine 1 is awake
cond_test.go:21: goroutine 8 is awake
cond_test.go:21: goroutine 2 is awake
cond_test.go:21: goroutine 7 is awake
PASS
*/

kubernates简介

发表于 2020-05-13 | 分类于 kubernates , go , docker

Kubernates

  • Kubernates需要使用k8s API对象来完成对集群的操作,包括但不限于运行程序,使用容器镜像,设定备份,设置网络和磁盘配置。完成这些操作需要使用k8s API来创建对象来实现,通常是通过命令行命令kubectl。同样也可以使用k8s API与集群直接交互。
  • 当你设定了你想要的集群状态之后,Kubernates Control Plane会通过Pod Lifecycle Event Generator来完成集群当前状态到预期状态的转变,
12…11
cdx

cdx

Be a better man!

110 日志
36 分类
31 标签
GitHub E-Mail
© 2020 cdx
由 Hexo 强力驱动
|
主题 — NexT.Mist v5.1.2