做完准备工作后就开始启动server,定义如下:包含listener、handler和sessionManager三部分
type Server struct {Listener *mysql.Listenerhandler mysql.HandlersessionMgr *SessionManager}
首先看下默认server的定义
s, err := server.NewDefaultServer(config, engine)
源码位于github.com/dolthub/go-mysql-server@v0.14.0/server/server.go
func NewDefaultServer(cfg Config, e *sqle.Engine) (*Server, error) {return NewServer(cfg, e, DefaultSessionBuilder, nil)}
核心内容就是初始化上述三个内容:
func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder, listener ServerEventListener) (*Server, error) {sm := NewSessionManager(sb, tracer, e.Analyzer.Catalog.HasDB, e.MemoryManager, e.ProcessList, cfg.Address)handler := NewHandler(e, sm, cfg.ConnReadTimeout, cfg.DisableClientMultiStatements, listener)return newServerFromHandler(cfg, e, sm, handler)
最后调用start函数启动服务,开始监听端口
func (s *Server) Start() error {s.Listener.Accept()
type Listener struct {// Construction parameters, set by NewListener.// authServer is the AuthServer object to use for authentication.authServer AuthServer// handler is the data handler.handler Handler// This is the main listener socket.listener net.Listener// Max limit for connectionsmaxConns uint64// The following parameters are read by multiple connection go// routines. They are not protected by a mutex, so they// should be set after NewListener, and not changed while// Accept is running.// ServerVersion is the version we will advertise.ServerVersion string// TLSConfig is the server TLS config. If set, we will advertise// that we support SSL.TLSConfig *tls.Config// AllowClearTextWithoutTLS needs to be set for the// mysql_clear_password authentication method to be accepted// by the server when TLS is not in use.AllowClearTextWithoutTLS sync2.AtomicBool// SlowConnectWarnThreshold if non-zero specifies an amount of time// beyond which a warning is logged to identify the slow connectionSlowConnectWarnThreshold sync2.AtomicDuration// The following parameters are changed by the Accept routine.// Incrementing ID for connection id.connectionID uint32// Read timeout on a given connectionconnReadTimeout time.Duration// Write timeout on a given connectionconnWriteTimeout time.Duration// connReadBufferSize is size of buffer for reads from underlying connection.// Reads are unbuffered if it's <=0.connReadBufferSize int// shutdown indicates that Shutdown method was called.shutdown sync2.AtomicBool// RequireSecureTransport configures the server to reject connections from insecure clientsRequireSecureTransport bool}
其中sessionManager定义位于github.com/dolthub/go-mysql-server@v0.14.0/server/context.go
type SessionManager struct {addr stringtracer trace.TracerhasDBFunc func(ctx *sql.Context, name string) boolmemory *sql.MemoryManagerprocesslist sql.ProcessListmu *sync.Mutexbuilder SessionBuildersessions map[uint32]*managedSessionpid uint64}
hanlder位于github.com/dolthub/go-mysql-server@v0.14.0/server/handler.go
func NewHandler(e *sqle.Engine, sm *SessionManager, rt time.Duration, disableMultiStmts bool, listener ServerEventListener) *Handler {
type Handler struct {e *sqle.Enginesm *SessionManagerreadTimeout time.DurationdisableMultiStmts boolsel ServerEventListener}
比如处理查询请求的hanler函数,先进行sql语句解析,然后进行参数绑定,运行查询,最后将查询结果放回返回容器里。
func (h *Handler) ComQuery(c *mysql.Conn,query string,callback func(*sqltypes.Result, bool) error,) error {_, err := h.errorWrappedDoQuery(c, query, MultiStmtModeOff, nil, callback)
func (h *Handler) errorWrappedDoQuery(c *mysql.Conn,query string,mode MultiStmtMode,bindings map[string]*query.BindVariable,callback func(*sqltypes.Result, bool) error,) (string, error) {remainder, err := h.doQuery(c, query, mode, bindings, callback)
func (h *Handler) doQuery(c *mysql.Conn,query string,mode MultiStmtMode,bindings map[string]*query.BindVariable,callback func(*sqltypes.Result, bool) error,) (string, error) {if parsed == nil {parsed, err = parse.Parse(ctx, query)sqlBindings, err = bindingsToExprs(bindings)ctx, err = ctx.ProcessList.AddProcess(ctx, query)schema, rowIter, err := h.e.QueryNodeWithBindings(ctx, query, parsed, sqlBindings)if ri2, ok := rowIter.(sql.RowIterTypeSelector); ok && ri2.IsNode2() {rowIter2 = rowIter.(sql.RowIter2)row2Chan = make(chan sql.Row2, 512)} else {rowChan = make(chan sql.Row, 512)}
其中sql语句的解析过程如下github.com/dolthub/go-mysql-server@v0.14.0/sql/parse/parse.go
func ParseOne(ctx *sql.Context, query string) (sql.Node, string, string, error) {return parse(ctx, query, true)}
func parse(ctx *sql.Context, query string, multi bool) (sql.Node, string, string, error) {stmt, ri, err = sqlparser.ParseOne(s)
github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/vt/sqlparser/ast.go
func ParseOne(sql string) (Statement, int, error) {tokenizer := NewStringTokenizer(sql)tokenizer.stopAfterFirstStmt = truetree, err := parseTokenizer(sql, tokenizer)
func parseTokenizer(sql string, tokenizer *Tokenizer) (Statement, error) {s.Start()
启动server后,开始监听端口,在循环中不断accept请求github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/mysql/server.go,当请求到来后,开始启动一个协程调用一个handler来处理请求:
func (l *Listener) Accept() {for {conn, err := l.listener.Accept()connCount.Add(1)connAccept.Add(1)go l.handle(conn, connectionID, acceptTime)
func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Time) {c := newServerConn(conn, l)c.ConnectionID = connectionIDl.handler.NewConnection(c)salt, err := c.writeHandshakeV10(l.ServerVersion, l.authServer, l.TLSConfig != nil)response, err := c.readEphemeralPacketDirect()user, authMethod, authResponse, err := l.parseClientHandshakePacket(c, true, response)c.recycleReadPacket()authServerMethod, err := l.authServer.AuthMethod(user, conn.RemoteAddr().String())// Set db name.if err = l.handler.ComInitDB(c, c.schemaName); err != nil {if err := c.writeOKPacket(0, 0, c.StatusFlags, 0); err != nil {for {err := c.handleNextCommand(l.handler)
github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/mysql/conn.go,对于每一个请求(连接),中间会发送很多命令,所以需要在一个循环中,依次解析每一个命令,并处理每一个命令:
func (c *Conn) handleNextCommand(handler Handler) error {data, err := c.readEphemeralPacket()switch data[0] {case ComQuit:c.recycleReadPacket()return errors.New("ComQuit")case ComInitDB:db := c.parseComInitDB(data)case ComQuery:// flush is called at the end of this block.// To simplify error handling, we do not// encapsulate it with a defer'd func()c.startWriterBuffering()queryStart := time.Now()query := c.parseComQuery(data)c.recycleReadPacket()multiStatements := !c.DisableClientMultiStatements && c.Capabilities&CapabilityClientMultiStatements != 0var err error
比如上面提到的查询命令,命令内部,会根据解析后sql语法树,对每一个token进行对应的处理:
for query, err = c.execQuery(query, handler, multiStatements); err == nil && query != ""; {query, err = c.execQuery(query, handler, multiStatements)}if err != nil {return err}timings.Record(queryTimingKey, queryStart)if err := c.flush(); err != nil {log.Errorf("Conn %v: Flush() failed: %v", c.ID(), err)return err}case ComFieldList:case ComPing:case ComSetOption:case ComPrepare:case ComStmtExecute:case ComStmtSendLongData:case ComStmtClose:case ComStmtReset:case ComStmtFetch:case ComResetConnection:
func (c *Conn) execQuery(query string, handler Handler, multiStatements bool) (string, error) {resultsCB := func(qr *sqltypes.Result, more bool) error {return c.writeOKPacketWithInfo(qr.RowsAffected, qr.InsertID, flag, handler.WarningCount(c), qr.Info)return c.writeRows(qr)if multiStatements {remainder, err = handler.ComMultiQuery(c, query, resultsCB)} else {err = handler.ComQuery(c, query, resultsCB)}
其中的hanler的初始化位于初始化listener的时候:
return &Listener{authServer: cfg.AuthServer,handler: cfg.Handler,
type Handler interface {NewConnection is called when a connection is created.It is not established yet. The handler can decide toset StatusFlags that will be returned by the handshake methods.In particular, ServerStatusAutocommit might be set.NewConnection(c *Conn)ConnectionClosed is called when a connection is closed.ConnectionClosed(c *Conn)InitDB is called once at the beginning to set db name,/ and subsequently for every ComInitDB event.ComInitDB(c *Conn, schemaName string) error// ComQuery is called when a connection receives a query.// Note the contents of the query slice may change after// the first call to callback. So the Handler should not// hang on to the byte slice.ComQuery(c *Conn, query string, callback func(res *sqltypes.Result, more bool) error) error// ComMultiQuery is called when a connection receives a query and the// client supports MULTI_STATEMENT. It should process the first// statement in |query| and return the remainder. It will be called// multiple times until the remainder is |""|.ComMultiQuery(c *Conn, query string, callback func(res *sqltypes.Result, more bool) error) (string, error)// ComPrepare is called when a connection receives a prepared// statement query.ComPrepare(c *Conn, query string) ([]*querypb.Field, error)// ComStmtExecute is called when a connection receives a statement// execute query.ComStmtExecute(c *Conn, prepare *PrepareData, callback func(*sqltypes.Result) error) error// WarningCount is called at the end of each query to obtain// the value to be returned to the client in the EOF packet.// Note that this will be called either in the context of the// ComQuery callback if the result does not contain any fields,// or after the last ComQuery call completes.WarningCount(c *Conn) uint16ComResetConnection(c *Conn)}
总的来说,整个server就是一个tcp服务器,分为认证、连接管理、命令处理等多个部分,每个部分根据mysql协议进行具体处理。






