蕪湖做網(wǎng)站的客戶百度官網(wǎng)平臺(tái)
前言
redis的核心是數(shù)據(jù)的快速存儲(chǔ),下面就來(lái)分析一下godis的底層存儲(chǔ)是如何實(shí)現(xiàn),先分析單機(jī)服務(wù)。
此文采用抓大放小原則,先大的流程方向,再抓細(xì)節(jié)。
流程圖
源碼分析
現(xiàn)在以客戶端連接,并發(fā)起set key val命令為例子
在單機(jī)部署的時(shí)候,服務(wù)啟動(dòng),會(huì)創(chuàng)建一個(gè)處理實(shí)例,并創(chuàng)建一個(gè)單機(jī)的db
// redis/server.go
// 創(chuàng)建一個(gè)處理實(shí)例
// MakeHandler creates a Handler instance
func MakeHandler() *Handler {// redis的一個(gè)存儲(chǔ)引擎var db database.DB// 創(chuàng)建是集群還是單例if config.Properties.ClusterEnable {db = cluster.MakeCluster()} else {db = database2.NewStandaloneServer()}return &Handler{db: db,}
}
有客戶端連接,會(huì)生成一個(gè)異步方法處理每個(gè)客戶端,一旦有客戶端的消息,都會(huì)進(jìn)入Handle方法。
// redis/server/server.go
// 處理接收到客戶端的命令
// Handle receives and executes redis commands
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {if h.closing.Get() {// closing handler refuse new connection_ = conn.Close()return}client := connection.NewConn(conn)// 存儲(chǔ)一個(gè)客戶端h.activeConn.Store(client, struct{}{})// 獲取字符串ch := parser.ParseStream(conn)// 接收客戶端數(shù)據(jù)for payload := range ch {// 遍歷消息體// ......... 經(jīng)過(guò)各種校驗(yàn)// 獲取到客戶端信息r, ok := payload.Data.(*protocol.MultiBulkReply)if !ok {logger.Error("require multi bulk protocol")continue}// 執(zhí)行結(jié)果result := h.db.Exec(client, r.Args)// 結(jié)果回復(fù)if result != nil {_, _ = client.Write(result.ToBytes())} else {_, _ = client.Write(unknownErrReplyBytes)}}
}
客戶端的各種命令進(jìn)行判斷,set是屬于正常的數(shù)據(jù)操作命令,直接通過(guò)判斷,獲取數(shù)據(jù)庫(kù),并在當(dāng)前數(shù)據(jù)庫(kù)中執(zhí)行
// database/server.go
func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {defer func() {if err := recover(); err != nil {logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))result = &protocol.UnknownErrReply{}}}()cmdName := strings.ToLower(string(cmdLine[0]))// pingif cmdName == "ping" {return Ping(c, cmdLine[1:])}// authenticateif cmdName == "auth" {return Auth(c, cmdLine[1:])}// ........// 各種各樣的判斷,暫時(shí)不管// 獲取當(dāng)前的數(shù)據(jù)索引// normal commandsdbIndex := c.GetDBIndex()// 獲取當(dāng)前數(shù)據(jù)庫(kù)selectedDB, errReply := server.selectDB(dbIndex)if errReply != nil {return errReply}// 以當(dāng)前數(shù)據(jù)庫(kù),執(zhí)行命令return selectedDB.Exec(c, cmdLine)
}
命令名稱解析出來(lái)后,從cmdTable獲取對(duì)應(yīng)的執(zhí)行方法,如prepare、executor
// Exec executes command within one database
func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply {// transaction control commands and other commands which cannot execute within transactioncmdName := strings.ToLower(string(cmdLine[0]))// ...return db.execNormalCommand(cmdLine)
}func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {// 獲取到正常的執(zhí)行命令cmdName := strings.ToLower(string(cmdLine[0]))// 獲取到commondcmd, ok := cmdTable[cmdName]if !ok {return protocol.MakeErrReply("ERR unknown command '" + cmdName + "'")}if !validateArity(cmd.arity, cmdLine) {return protocol.MakeArgNumErrReply(cmdName)}prepare := cmd.preparewrite, read := prepare(cmdLine[1:])db.addVersion(write...)// 數(shù)據(jù)庫(kù)上鎖db.RWLocks(write, read)// 命令執(zhí)行完后解鎖defer db.RWUnLocks(write, read)// 執(zhí)行命令方法fun := cmd.executorreturn fun(db, cmdLine[1:])
}
set命令對(duì)應(yīng)的方法,從代碼可以發(fā)現(xiàn),其實(shí)數(shù)據(jù)是存儲(chǔ)在定義的map結(jié)構(gòu)的集合中,自此,命令已經(jīng)執(zhí)行完畢,返回執(zhí)行結(jié)果。
func execSet(db *DB, args [][]byte) redis.Reply {// 提取keykey := string(args[0])// 提取valvalue := args[1]// 提取策略policy := upsertPolicy// 提取過(guò)期時(shí)間ttl := unlimitedTTL// parse options// 如何參數(shù)大于2個(gè),說(shuō)明有其他參數(shù),需要做其他處理// .....entity := &database.DataEntity{Data: value,}var result int// 更新策略switch policy {case upsertPolicy:// 默認(rèn)策略db.PutEntity(key, entity)result = 1case insertPolicy:result = db.PutIfAbsent(key, entity)case updatePolicy:result = db.PutIfExists(key, entity)}if result > 0 {if ttl != unlimitedTTL {expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)// 設(shè)置過(guò)期時(shí)間db.Expire(key, expireTime)db.addAof(CmdLine{[]byte("SET"),args[0],args[1],})db.addAof(aof.MakeExpireCmd(key, expireTime).Args)} else {db.Persist(key) // override ttldb.addAof(utils.ToCmdLine3("set", args...))}}if result > 0 {return &protocol.OkReply{}}return &protocol.NullBulkReply{}
}// database.go
// 將數(shù)據(jù)放入DB
// PutEntity a DataEntity into DB
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {// 當(dāng)前數(shù)據(jù)庫(kù)的數(shù)據(jù)字段ret := db.data.PutWithLock(key, entity)// db.insertCallback may be set as nil, during `if` and actually callback// so introduce a local variable `cb`if cb := db.insertCallback; ret > 0 && cb != nil {cb(db.index, key, entity)}return ret
}// datastruct/dict/concurrent.go
// ConcurrentDict is thread safe map using sharding lock
// 這里可以看出,數(shù)據(jù)其實(shí)就是存在map集合里面
type ConcurrentDict struct {table []*shardcount int32shardCount int
}type shard struct {m map[string]interface{}mutex sync.RWMutex
}// datastruct/dict/concurrent.go
func (dict *ConcurrentDict) PutWithLock(key string, val interface{}) (result int) {if dict == nil {panic("dict is nil")}hashCode := fnv32(key)index := dict.spread(hashCode)s := dict.getShard(index)// 將數(shù)據(jù)放入map中if _, ok := s.m[key]; ok {s.m[key] = valreturn 0}dict.addCount()// 存儲(chǔ)kv結(jié)構(gòu)數(shù)據(jù),完成s.m[key] = valreturn 1
}
其實(shí)還有一個(gè)問(wèn)題,就是cmdTable怎么來(lái)的,為什么fun(db, cmdLine[1:])就完成了?
在router.go這個(gè)代碼中,是生成一個(gè)新的cmdTable的map集合;registerCommand這個(gè)函數(shù)是將各種命令塞入cmdTable里面。每個(gè)數(shù)據(jù)結(jié)構(gòu)如string等都有定義的方法。
main啟動(dòng)前都會(huì)調(diào)用init(),這個(gè)是golang特殊的函數(shù),順序按照文件的順序執(zhí)行。
這里就是在服務(wù)啟動(dòng)前,將所有命令注冊(cè)到cmdTable集合。
// database/router.go
// 命令集
var cmdTable = make(map[string]*command)
// ....
// 注冊(cè)命令,將命令存放在cmdTable集合里面
// registerCommand registers a normal command, which only read or modify a limited number of keys
func registerCommand(name string, executor ExecFunc, prepare PreFunc, rollback UndoFunc, arity int, flags int) *command {name = strings.ToLower(name)cmd := &command{name: name,executor: executor,prepare: prepare,undo: rollback,arity: arity,flags: flags,}cmdTable[name] = cmdreturn cmd
}//========================================// database/string.gofunc execSet(db *DB, args [][]byte) redis.Reply {
//....
}// execSetNX sets string if not exists
func execSetNX(db *DB, args [][]byte) redis.Reply {// .....
}// execSetEX sets string and its ttl
func execSetEX(db *DB, args [][]byte) redis.Reply {// ...
}func init() {// 調(diào)用注冊(cè)命令函數(shù),注冊(cè)方法,如Set則是執(zhí)行execSet方法registerCommand("Set", execSet, writeFirstKey, rollbackFirstKey, -3, flagWrite).attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM}, 1, 1, 1)registerCommand("SetNx", execSetNX, writeFirstKey, rollbackFirstKey, 3, flagWrite).attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM, redisFlagFast}, 1, 1, 1)registerCommand("SetEX", execSetEX, writeFirstKey, rollbackFirstKey, 4, flagWrite).attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM}, 1, 1, 1)// .....
}
ExecFunc是規(guī)范方法,每個(gè)命令對(duì)應(yīng)的執(zhí)行都按照規(guī)范定義。
// database/router.gotype command struct {// 命令名稱name string// 執(zhí)行方法executor ExecFunc// prepare returns related keys commandprepare PreFunc// undo generates undo-log before command actually executed, in case the command needs to be rolled backundo UndoFunc// arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity.// for example: the arity of `get` is 2, `mget` is -2arity intflags intextra *commandExtra
}// ========================================// database/database.go
// 執(zhí)行方法接口
// ExecFunc is interface for command executor
// args don't include cmd line
type ExecFunc func(db *DB, args [][]byte) redis.Reply