暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:dtm分布式事务(4)

            我们继续上一篇golang源码分析:dtm分布式事务(3)分析api服务的源码,位置位于dtmsvr/svr.go:

      func StartSvr() *gin.Engine {
       dtmcli.GetRestyClient().SetTimeout
      app := dtmutil.GetGinApp()
    app = httpMetrics(app)
    addRoute(app)
    addJrpcRouter(app)

    go func() {
    err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
      dtmgpb.RegisterDtmServer(s, &dtmServer{})
      
      go func() {
    err := s.Serve(lis)
    for i := 0; i < int(conf.UpdateBranchAsyncGoroutineNum); i++ {
    go updateBranchAsync()
      updateTopicsMap()
    go CronUpdateTopicsMap()

    err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint)

    首先启动一个gin服务器,然后注册一个grpcserver :dtmServer,然后通过一个协程启动grpc服务,然后启动协程,每200ms一次将分支信息同步到存储。然后检查下kv存储里"topics"的值的版本,存储到topicsMap,紧接着启动一个协程任务在后台执行上面的kv存储到内存的更新。最后把我们的server注册到服务发现,一般是通过环境变量控制的

      #   Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
      # EndPoint: 'localhost:36790'

      总结下就5件事

      1,启动http服务

      2,启动grpc服务

      3,将分支的更新同步到存储

      4,将kc里面存储的topics数据同步到内存。

      5,将服务注册到服务发现注册中心。

      其中,启动http服务包括三部分

      A,为监控注册路由

      B,为http服务注册路由

      C,为json-rpc注册路由

            1,  其中http路由就是简单的gin路由注册,位于dtmsvr/api_http.go

        func addRoute(engine *gin.Engine) {
        engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
        engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
        engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))

        提供了常见申请全局事务ID,prepare,abort,commit等事务执行动作。

          func prepare(c *gin.Context) interface{} {
          return svcPrepare(TransFromContext(c))
          }

          其中prepare需要获取全局事务的分支事务

            dbt := GetTransGlobal(t.Gid)
              trans := GetStore().FindTransGlobalStore(gid)

              默认这些信息存储在boltdb里面dtmsvr/storage/boltdb/boltdb.go,可以看下获取全局存储的过程

                func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
                     trans = tGetGlobal(t, gid)
                bs := t.Bucket(bucketGlobal).Get([]byte(gid))

                        全局存储的定义如下:

                  type TransGlobalStore struct {
                  dtmutil.ModelBase
                  Gid string `json:"gid,omitempty"`
                  TransType string `json:"trans_type,omitempty"`
                  Steps []map[string]string `json:"steps,omitempty" gorm:"-"`
                  Payloads []string `json:"payloads,omitempty" gorm:"-"`
                  BinPayloads [][]byte `json:"-" gorm:"-"`
                  Status string `json:"status,omitempty"`
                  QueryPrepared string `json:"query_prepared,omitempty"`
                  Protocol string `json:"protocol,omitempty"`
                  FinishTime *time.Time `json:"finish_time,omitempty"`
                  RollbackTime *time.Time `json:"rollback_time,omitempty"`
                  Result string `json:"result,omitempty"`
                  RollbackReason string `json:"rollback_reason,omitempty"`
                  Options string `json:"options,omitempty"`
                  CustomData string `json:"custom_data,omitempty"`
                  NextCronInterval int64 `json:"next_cron_interval,omitempty"`
                  NextCronTime *time.Time `json:"next_cron_time,omitempty"`
                  Owner string `json:"owner,omitempty"`
                  Ext TransGlobalExt `json:"-" gorm:"-"`
                  ExtData string `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Options
                  dtmcli.TransOptions
                  }

                  对于json_rpc也类似,就是通过gin的http路由,http协议的内容传输的是json数据,实现位于dtmsvr/api_json_rpc.go

                    func addJrpcRouter(engine *gin.Engine) {
                    engine.POST("/api/json-rpc", func(c *gin.Context) {
                    return handlers[req.Method](req.Params)

                    真实的路由是定义在一个map里面的

                      handlers := map[string]jrpcFunc{
                      "newGid": jrpcNewGid,
                      "prepare": jrpcPrepare,
                      "submit": jrpcSubmit,
                      "abort": jrpcAbort,
                      "registerBranch": jrpcRegisterBranch,
                      }

                      比如其中的获取全局事务id最终实现,和http协议是一样的

                        func jrpcNewGid(interface{}) interface{} {
                        return map[string]interface{}{"gid": GenGid()}
                        }

                               2, 然后我们看下grpc的实现dtmsvr/api_grpc.go

                          func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
                          r := svcAbort(TransFromDtmRequest(ctx, in))

                          中断事务的执行过程是通过全局事务id获取事务的元数据,修改全局事务的状态为中断,然后通过事务id获取所有的分支事务 ,最后处理分支事务:

                            func svcAbort(t *TransGlobal) interface{} {
                              dbt := GetTransGlobal(t.Gid)
                              dbt.changeStatus(dtmcli.StatusAborting, withRollbackReason(t.RollbackReason))
                            branches := GetStore().FindBranches(t.Gid)
                            return dbt.Process(branches)

                                    处理过程位于dtmsvr/trans_process.go

                              func (t *TransGlobal) process(branches []TransBranch) error {
                              rerr = t.getProcessor().ProcessOnce(branches)

                                  每一种分布式事务模型的处理逻辑都不一样,以saga为例dtmsvr/trans_type_saga.go

                                func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
                                    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))

                                它修改了boltdb里面的状态,dtmsvr/storage/boltdb/boltdb.go

                                  func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
                                       if finished {
                                  tDelIndex(t, g.NextCronTime.Unix(), g.Gid)
                                        tPutGlobal(t, global)
                                       err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)

                                  如果状态是完成状态,会删除相应的记录。

                                          3,然后,我们看下第二部分,同步分支状态到存储,dtmsvr/svr.go

                                    k := updateBranch.gid + updateBranch.branchID + "-" + updateBranch.op
                                    rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})
                                    for { // flush branches every 200ms
                                    flushBranchs()
                                    }

                                    对于存储过的状态不用再次操作,所以本地用了一个k来去重,它是通过全局事务id,分支id以及分支的处理动作名字来做的唯一键。以存储介质为mysql,动作为创建分支的操作来说,它的代码实现位于

                                    dtmsvr/storage/sql/sql.go

                                      db := dbGet().Clauses(clause.OnConflict{
                                      OnConstraint: "gid_branch_uniq",
                                      DoUpdates: clause.AssignmentColumns(updates),
                                      }).Create(branches)

                                      4,第三部分加载topics到内存的操作流程如下: dtmsvr/cron.go

                                        cronUpdateTopicsMapOnce()

                                        dtmsvr/topics.go

                                          func updateTopicsMap() {
                                          kvs := GetStore().FindKV(topicsCat, "")
                                          topicsMap[kv.K] = newTopic
                                            func topic2urls(topic string) []string {
                                            for k, subscriber := range topicsMap[topic].Subscribers {
                                            urls[k] = subscriber.URL

                                            事务模型是一致性消息的时候dtmsvr/trans_type_msg.go

                                              func (t *transMsgProcessor) GenBranches() []TransBranch {
                                              for i, step := range t.Steps {
                                              urls := dtmimp.If(mayTopic == step[dtmimp.OpAction], []string{mayTopic}, topic2urls(mayTopic)).([]string)
                                              for j, url := range urls {
                                              b := TransBranch{

                                              通过将消息中的url组装成分支事务信息的。

                                                    5,最后一步服务发现的注册的实现单独提供了一个包。

                                                github.com/dtm-labs/dtmdriver@v0.0.6/driver-mgr.go

                                                文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                评论