go-mysql-server是基于内存的mysql server,使用方法分下面五步,创建engine,设置root账户,初始化配置,初始化server,开启服务。具体如下:
engine := sqle.NewDefault(sql.NewDatabaseProvider(createTestDatabase(),information_schema.NewInformationSchemaDatabase(),))engine.Analyzer.Catalog.MySQLDb.AddRootAccount()config := server.Config{Protocol: "tcp",Address: "localhost:3306",}s, err := server.NewDefaultServer(config, engine)if err != nil {panic(err)}s.Start()
下面我们开始依次分析:
1,创建engine:
engine := sqle.NewDefault(sql.NewDatabaseProvider(createTestDatabase(),information_schema.NewInformationSchemaDatabase(),))
源码位于github.com/dolthub/go-mysql-server@v0.14.0/engine.go
func NewDefault(pro sql.DatabaseProvider) *Engine {a := analyzer.NewDefault(pro)return New(a, nil)}
func New(a *analyzer.Analyzer, cfg *Config) *Engine {ls := sql.NewLockSubsystem()a.Catalog.RegisterFunction(emptyCtx, sql.FunctionN{Name: "version",Fn: function.NewVersion(cfg.VersionPostfix),})a.Catalog.RegisterFunction(emptyCtx, function.GetLockingFuncs(ls)...)return &Engine{Analyzer: a,MemoryManager: sql.NewMemoryManager(sql.ProcessMemory),ProcessList: NewProcessList(),LS: ls,BackgroundThreads: sql.NewBackgroundThreads(),IsReadOnly: cfg.IsReadOnly,IsServerLocked: cfg.IsServerLocked,PreparedData: make(map[uint32]PreparedData),mu: &sync.Mutex{},}
Engine的定义如下:
type Engine struct {Analyzer *analyzer.AnalyzerLS *sql.LockSubsystemProcessList sql.ProcessListMemoryManager *sql.MemoryManagerBackgroundThreads *sql.BackgroundThreadsIsReadOnly boolIsServerLocked boolPreparedData map[uint32]PreparedDatamu *sync.Mutex}
其中ProcessList处理逻辑位于github.com/dolthub/go-mysql-server@v0.14.0/processlist.go
func NewProcessList() *ProcessList {return &ProcessList{procs: make(map[uint64]*sql.Process),}}
github.com/dolthub/go-mysql-server@v0.14.0/sql/analyzer/analyzer.go
func NewDefault(provider sql.DatabaseProvider) *Analyzer {return NewBuilder(provider).Build()}
func NewBuilder(pro sql.DatabaseProvider) *Builder {return &Builder{provider: pro,
其中builder定义如下:
type Builder struct {preAnalyzeRules []RulepostAnalyzeRules []RulepreValidationRules []RulepostValidationRules []RuleonceBeforeRules []RuledefaultRules []RuleonceAfterRules []RulevalidationRules []RuleafterAllRules []Ruleprovider sql.DatabaseProviderdebug boolparallelism int}
func (ab *Builder) Build() *Analyzer {
type Analyzer struct {// Whether to log various debugging messagesDebug bool// Whether to output the query plan at each step of the analyzerVerbose bool// A stack of debugger context. See PushDebugContext, PopDebugContextcontextStack []stringParallelism int// Batches of Rules to apply.Batches []*Batch// Catalog of databases and registered functions.Catalog *Catalog}
github.com/dolthub/go-mysql-server@v0.14.0/sql/provider.go
func NewDatabaseProvider(dbs ...Database) DatabaseProvider {dbMap := make(map[string]Database, len(dbs))for _, db := range dbs {dbMap[strings.ToLower(db.Name())] = db}return databaseProvider{dbs: dbMap,mu: &sync.RWMutex{},}}
type databaseProvider struct {dbs map[string]Databasemu *sync.RWMutex}
github.com/dolthub/go-mysql-server@v0.14.0/sql/core.go
type DatabaseProvider interface {// Database gets a Database from the provider.Database(ctx *Context, name string) (Database, error)// HasDatabase checks if the Database exists in the provider.HasDatabase(ctx *Context, name string) bool// AllDatabases returns a slice of all Databases in the provider.AllDatabases(ctx *Context) []Database}
type RowInserter interface {TableEditor// Insert inserts the row given, returning an error if it cannot. Insert will be called once for each row to process// for the insert operation, which may involve many rows. After all rows in an operation have been processed, Close// is called.Insert(*Context, Row) error// Close finalizes the insert operation, persisting its result.Closer}
初始化provider的参数就是我们自定义创建数据库的函数,返回的一个内存数据库对象
func createTestDatabase() *memory.Database {
github.com/dolthub/go-mysql-server@v0.14.0/memory/database.go
type Database struct {*BaseDatabaseviews map[string]string}
type BaseDatabase struct {name stringtables map[string]sql.TablefkColl *ForeignKeyCollectiontriggers []sql.TriggerDefinitionstoredProcedures []sql.StoredProcedureDetailsprimaryKeyIndexes boolcollation sql.CollationID}
func NewDatabase(name string) *Database {return &Database{BaseDatabase: NewViewlessDatabase(name),views: make(map[string]string),}}
func NewViewlessDatabase(name string) *BaseDatabase {return &BaseDatabase{name: name,tables: map[string]sql.Table{},fkColl: newForeignKeyCollection(),}}
func (d *BaseDatabase) AddTable(name string, t sql.Table) {d.tables[name] = t}
通过map实现根据表名定位表的数据。下面看下如何新建一个数据库:
db := memory.NewDatabase(dbName)table := memory.NewTable(tableName, sql.NewPrimaryKeySchema(sql.Schema{}), &memory.ForeignKeyCollection{})
github.com/dolthub/go-mysql-server@v0.14.0/memory/table.go
func NewTable(name string, schema sql.PrimaryKeySchema, fkColl *ForeignKeyCollection) *Table {return NewPartitionedTableWithCollation(name, schema, fkColl, 0, sql.Collation_Default)}
func NewPartitionedTableWithCollation(name string, schema sql.PrimaryKeySchema, fkColl *ForeignKeyCollection, numPartitions int, collation sql.CollationID) *Table {for i := 0; i < numPartitions; i++ {key := strconv.Itoa(i)keys = append(keys, []byte(key))partitions[key] = []sql.Row{}}return &Table{name: name,schema: schema,fkColl: fkColl,collation: collation,partitions: partitions,partitionKeys: keys,autoIncVal: autoIncVal,autoColIdx: autoIncIdx,}
其中table的定义如下:
type Table struct {// Schema and related infoname stringschema sql.PrimaryKeySchemaindexes map[string]sql.IndexfkColl *ForeignKeyCollectionchecks []sql.CheckDefinitioncollation sql.CollationIDpkIndexesEnabled bool// pushdown infofilters []sql.Expression // currently unused, filter pushdown is significantly broken right nowprojection []stringprojectedSchema sql.Schemacolumns []int// Data storagepartitions map[string][]sql.RowpartitionKeys [][]byte// Insert bookkeepinginsertPartIdx int// Indexed lookupslookup sql.DriverIndexLookup// AUTO_INCREMENT bookkeepingautoIncVal uint64autoColIdx inttableStats *TableStatistics}
重点看下它的插入方法,看下如何插入数据。
func (t *Table) Insert(ctx *sql.Context, row sql.Row) error {inserter := t.Inserter(ctx)if err := inserter.Insert(ctx, row); err != nil {return err}return inserter.Close(ctx)}
func (t *Table) Inserter(*sql.Context) sql.RowInserter {return t.newTableEditor()}
func (t *Table) newTableEditor() *tableEditor {for _, idx := range t.indexes {if !idx.IsUnique() {continue}var colNames []stringexpressions := idx.(*Index).Exprsfor _, exp := range expressions {colNames = append(colNames, exp.(*expression.GetField).Name())}colIdxs, err := t.columnIndexes(colNames)if err != nil {panic("failed to get column indexes")}uniqIdxCols = append(uniqIdxCols, colIdxs)}return &tableEditor{
github.com/dolthub/go-mysql-server@v0.14.0/sql/schema.go
func NewPrimaryKeySchema(s Schema, pkOrds ...int) PrimaryKeySchema {if len(pkOrds) == 0 {pkOrds = make([]int, 0)for i, c := range s {if c.PrimaryKey {pkOrds = append(pkOrds, i)}}}return PrimaryKeySchema{Schema: s, PkOrdinals: pkOrds}}
定义完table后进行数据插入:
db.AddTable(tableName, table)ctx := sql.NewEmptyContext()table.Insert(ctx, sql.NewRow("John Doe", "john@doe.com", sql.JSONDocument{Val: []string{"555-555-555"}}, time.Now()))
github.com/dolthub/go-mysql-server@v0.14.0/memory/table_editor.go
type tableEditor struct {table *TableinitialAutoIncVal uint64initialPartitions map[string][]sql.Rowea tableEditAccumulatorinitialInsert intarray of key ordinals for each unique index defined on the tableuniqueIdxCols [][]intfkTable *Table}
func (t *tableEditor) Insert(ctx *sql.Context, row sql.Row) error {if err := checkRow(t.table.schema.Schema, row); err != nil {t.table.verifyRowTypes(row)partitionRow, added, err := t.ea.Get(row)if added {pkColIdxes := t.pkColumnIndexes()return sql.NewUniqueKeyErr(formatRow(row, pkColIdxes), true, partitionRow)for _, cols := range t.uniqueIdxCols {if hasNullForAnyCols(row, cols) {continue}existing, found, err := t.ea.GetByCols(row, cols)if err != nil {return err}if found {return sql.NewUniqueKeyErr(formatRow(row, cols), false, existing)}}err = t.ea.Insert(row)
具体存储每一行数据还是通过map结构,key是唯一键组成的key,value就是行数据。
func (pke *pkTableEditAccumulator) Insert(value sql.Row) error {rowKey := pke.getRowKey(value)delete(pke.deletes, rowKey)pke.adds[rowKey] = value
func (pke *pkTableEditAccumulator) getRowKey(r sql.Row) string {var rowKey strings.Builderfor _, i := range pke.table.schema.PkOrdinals {rowKey.WriteString(fmt.Sprintf("%v", r[i]))
type pkTableEditAccumulator struct {table *Tableadds map[string]sql.Rowdeletes map[string]sql.Row}
github.com/dolthub/go-mysql-server@v0.14.0/sql/row.go行数据定义如下:
func NewRow(values ...interface{}) Row {row := make([]interface{}, len(values))copy(row, values)return row}
接着看下,如何定义用户
engine.Analyzer.Catalog.MySQLDb.AddRootAccount()
github.com/dolthub/go-mysql-server@v0.14.0/sql/mysql_db/mysql_db.go
func (db *MySQLDb) AddRootAccount() {db.Enabled = trueaddSuperUser(db.user, "root", "localhost", "")db.clearCache()}
github.com/dolthub/go-mysql-server@v0.14.0/sql/mysql_db/user_table.go
func addSuperUser(userTable *mysqlTable, username string, host string, password string) {err := userTable.data.Put(sql.NewEmptyContext(), &User{User: username,Host: host,PrivilegeSet: newPrivilegeSetWithAllPrivileges(),Plugin: "mysql_native_password",
github.com/dolthub/go-mysql-server@v0.14.0/sql/analyzer/catalog.go
type Catalog struct {MySQLDb *mysql_db.MySQLDbprovider sql.DatabaseProviderbuiltInFunctions function.Registrymu sync.RWMutexlocks sessionLocks}
定义完用户后,开始定义数据库的配置:
config := server.Config{Protocol: "tcp",Address: "localhost:3306",}
github.com/dolthub/go-mysql-server@v0.14.0/server/server_config.go
type Config struct {// Protocol for the connection.Protocol string// Address of the server.Address string// Tracer to use in the server. By default, a noop tracer will be used if// no tracer is provided.Tracer trace.Tracer// Version string to advertise in running serverVersion string// ConnReadTimeout is the server's read timeoutConnReadTimeout time.Duration// ConnWriteTimeout is the server's write timeoutConnWriteTimeout time.Duration// MaxConnections is the maximum number of simultaneous connections that the server will allow.MaxConnections uint64// TLSConfig is the configuration for TLS on this server. If |nil|, TLS is not supported.TLSConfig *tls.Config// RequestSecureTransport will require incoming connections to be TLS. Requires non-|nil| TLSConfig.RequireSecureTransport bool// DisableClientMultiStatements will prevent processing of incoming// queries as if they contain more than one query. This processing// currently works in some simple cases, but breaks in the presence of// statements (such as in CREATE TRIGGER queries). Configuring the// server to disable processing these is one option for users to get// support back for single queries that contain statements, at the cost// of not supporting the CLIENT_MULTI_STATEMENTS option on the server.DisableClientMultiStatements bool// NoDefaults prevents using persisted configuration for new server sessionsNoDefaults bool// Socket is a path to unix socket fileSocket stringAllowClearTextWithoutTLS bool}type Server struct {Listener *mysql.Listenerhandler mysql.HandlersessionMgr *SessionManager}






