暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:cayley(10)

下面分析下mysql作为后端存储的时候,是如何存储的,它的核心源码位graph/sql/mysql/mysql.go,首先定义了存储类型是mysql,然后注册。

    const Type = "mysql"
      func init() {
      csql.Register(Type, csql.Registration{
      Driver: "mysql",
      HashType: fmt.Sprintf(`BINARY(%d)`, quad.HashSize),

        func runTxMysql(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error {
        for _, n := range nodes {
        if n.RefInc >= 0 {
        nodeKey, values, err := csql.NodeValues(csql.NodeHash{n.Hash}, n.Val)
        if err != nil {
        return err
        }
        values = append([]interface{}{n.RefInc}, values...)
        values = append(values, n.RefInc) // one more time for UPDATE
        stmt, ok := insertValue[nodeKey]
        if !ok {
        var ph = make([]string, len(values)-1) // excluding last increment
        for i := range ph {
        ph[i] = "?"
        }
        stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` +
        strings.Join(nodeKey.Columns(), ", ") +
        `) VALUES (` + strings.Join(ph, ", ") +
        `) ON DUPLICATE KEY UPDATE refs = refs + ?;`)
        _, err = stmt.Exec(values...)
        for _, d := range quads {
        dirs := make([]interface{}, 0, len(quad.Directions))
        for _, h := range d.Quad.Dirs() {
        dirs = append(dirs, csql.NodeHash{h}.SQLValue())
        }
        if !d.Del {
        if insertQuad == nil {
        insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`)
        if err != nil {
        _, err := insertQuad.Exec(dirs...)

        实现了把顶点和四元祖插入了mysql。

          func convInsertError(err error) error {

          graph/sql/database.go

            var types = make(map[string]Registration)
              func Register(name string, f Registration) {
              if f.Driver == "" {
              panic("no sql driver in type definition")
              }
              types[name] = f




              registerQuadStore(name, name)
                type Registration struct {
                Driver string // sql driver to use on dial
                HashType string // type for hash fields
                BytesType string // type for binary fields
                TimeType string // type for datetime fields
                HorizonType string // type for horizon counter
                NodesTableExtra string // extra SQL to append to nodes table definition
                ConditionalIndexes bool // database supports conditional indexes
                FillFactor bool // database supports fill percent on indexes
                NoForeignKeys bool // database has no support for FKs




                QueryDialect
                NoOffsetWithoutLimit bool // SELECT ... OFFSET can be used only with LIMIT




                Error func(error) error // error conversion function
                Estimated func(table string) string // query that string that returns an estimated number of rows in table
                RunTx func(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error
                TxRetry func(tx *sql.Tx, stmts func() error) error
                NoSchemaChangesInTx bool
                }

                创建节点表

                  func (r Registration) nodesTable() string {
                  htyp := r.HashType
                  if htyp == "" {
                  htyp = "BYTEA"
                  }
                  btyp := r.BytesType
                  if btyp == "" {
                  btyp = "BYTEA"
                  }
                  ttyp := r.TimeType
                  if ttyp == "" {
                  ttyp = "timestamp with time zone"
                  }
                  end := "\n);"
                  if r.NodesTableExtra != "" {
                  end = ",\n" + r.NodesTableExtra + end
                  }
                  return `CREATE TABLE nodes (
                  hash ` + htyp + ` PRIMARY KEY,
                  refs INT NOT NULL,
                  value ` + btyp + `,
                  value_string TEXT,
                  datatype TEXT,
                  language TEXT,
                  iri BOOLEAN,
                  bnode BOOLEAN,
                  value_int BIGINT,
                  value_bool BOOLEAN,
                  value_float double precision,
                  value_time ` + ttyp +
                  end
                  }

                  创建四元祖表

                    func (r Registration) quadsTable() string {
                    htyp := r.HashType
                    if htyp == "" {
                    htyp = "BYTEA"
                    }
                    hztyp := r.HorizonType
                    if hztyp == "" {
                    hztyp = "SERIAL"
                    }
                    return `CREATE TABLE quads (
                    horizon ` + hztyp + ` PRIMARY KEY,
                    subject_hash ` + htyp + ` NOT NULL,
                    predicate_hash ` + htyp + ` NOT NULL,
                    object_hash ` + htyp + ` NOT NULL,
                    label_hash ` + htyp + `,
                    ts timestamp
                    );`
                    }

                    创建相关索引

                      func (r Registration) quadIndexes(options graph.Options) []string {
                      indexes := make([]string, 0, 10)
                      if r.ConditionalIndexes {
                      indexes = append(indexes,
                      `CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash) WHERE label_hash IS NULL;`,
                      `CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash) WHERE label_hash IS NOT NULL;`,
                      )
                      } else {
                      indexes = append(indexes,
                      `CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash);`,
                      `CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash);`,
                      )
                      }
                      if !r.NoForeignKeys {
                      indexes = append(indexes,
                      `ALTER TABLE quads ADD CONSTRAINT subject_hash_fk FOREIGN KEY (subject_hash) REFERENCES nodes (hash);`,
                      `ALTER TABLE quads ADD CONSTRAINT predicate_hash_fk FOREIGN KEY (predicate_hash) REFERENCES nodes (hash);`,
                      `ALTER TABLE quads ADD CONSTRAINT object_hash_fk FOREIGN KEY (object_hash) REFERENCES nodes (hash);`,
                      `ALTER TABLE quads ADD CONSTRAINT label_hash_fk FOREIGN KEY (label_hash) REFERENCES nodes (hash);`,
                      )
                      }
                      quadIndexes := [][3]quad.Direction{
                      {quad.Subject, quad.Predicate, quad.Object},
                      {quad.Object, quad.Predicate, quad.Subject},
                      {quad.Predicate, quad.Object, quad.Subject},
                      {quad.Object, quad.Subject, quad.Predicate},
                      }
                      factor, _ := options.IntKey("db_fill_factor", 50)
                      for _, ind := range quadIndexes {
                      var (
                      name string
                      cols []string
                      )
                      for _, d := range ind {
                      name += string(d.Prefix())
                      cols = append(cols, d.String()+"_hash")
                      }
                      q := fmt.Sprintf(`CREATE INDEX %s_index ON quads (%s)`,
                      name, strings.Join(cols, ", "))
                      if r.FillFactor {
                      q += fmt.Sprintf(" WITH (FILLFACTOR = %d)", factor)
                      }
                      indexes = append(indexes, q+";")
                      }
                      return indexes
                      }

                      graph/sql/iterator.go

                        var _ shape.Optimizer = (*QuadStore)(nil)
                          func (qs *QuadStore) OptimizeShape(s shape.Shape) (shape.Shape, bool) {
                          return qs.opt.OptimizeShape(s)

                          根据查询,组装sql语句:

                            func (qs *QuadStore) prepareQuery(s Shape) (string, []interface{}) {
                            args := s.Args()
                            vals := make([]interface{}, 0, len(args))
                            for _, a := range args {
                            vals = append(vals, a.SQLValue())
                            }
                            b := NewBuilder(qs.flavor.QueryDialect)
                            qu := s.SQL(b)
                            return qu, vals

                            执行查询

                              func (qs *QuadStore) QueryRow(ctx context.Context, s Shape) *sql.Row {
                              qu, vals := qs.prepareQuery(s)
                              return qs.db.QueryRowContext(ctx, qu, vals...)
                                var _ graph.IteratorFuture = (*Iterator)(nil)
                                  func (qs *QuadStore) NewIterator(s Select) *Iterator {
                                  it := &Iterator{
                                  it: qs.newIterator(s),
                                  }
                                  it.Iterator = graph.NewLegacy(it.it, it)
                                  return it
                                    type Iterator struct {
                                    it *iterator2
                                    graph.Iterator
                                    }

                                    迭代器有两个

                                      var _ graph.IteratorShapeCompat = (*iterator2)(nil)
                                        type iterator2 struct {
                                        qs *QuadStore
                                        query Select
                                        err error
                                        }
                                          func (it *iterator2) Iterate() graph.Scanner {
                                          return newIteratorNext(it.qs, it.query)
                                          }
                                            func (it *iterator2) Stats(ctx context.Context) (graph.IteratorCosts, error) {
                                            sz, err := it.getSize(ctx)
                                            return graph.IteratorCosts{
                                            NextCost: 1,
                                            ContainsCost: 10,
                                            Size: sz,
                                            }, err
                                              func (it *iterator2) String() string {
                                              return it.query.SQL(NewBuilder(it.qs.flavor.QueryDialect))
                                              }

                                                      基础迭代器

                                                type iteratorBase struct {
                                                qs *QuadStore
                                                query Select




                                                cols []string
                                                cind map[quad.Direction]int




                                                err error
                                                res graph.Ref
                                                tags map[string]graph.Ref
                                                }

                                                扫描行结果

                                                  func (it *iteratorBase) ensureColumns() {
                                                  if it.cols != nil {
                                                  return
                                                  }
                                                  it.cols = it.query.Columns()
                                                  it.cind = make(map[quad.Direction]int, len(quad.Directions)+1)
                                                  for i, name := range it.cols {
                                                  if !strings.HasPrefix(name, tagPref) {
                                                  continue
                                                  }
                                                  if name == tagNode {
                                                  it.cind[quad.Any] = i
                                                  continue
                                                  }
                                                  name = name[len(tagPref):]
                                                  for _, d := range quad.Directions {
                                                  if name == d.String() {
                                                  it.cind[d] = i
                                                  break
                                                  }
                                                  }
                                                  }
                                                  }

                                                  扫描值

                                                    func (it *iteratorBase) scanValue(r *sql.Rows) bool {
                                                    it.ensureColumns()
                                                    nodes := make([]NodeHash, len(it.cols))
                                                    pointers := make([]interface{}, len(nodes))
                                                    for i := range pointers {
                                                    pointers[i] = &nodes[i]
                                                    }
                                                    if err := r.Scan(pointers...); err != nil {
                                                    for i, name := range it.cols {
                                                    if !strings.Contains(name, tagPref) {
                                                    it.tags[name] = nodes[i].ValueHash
                                                    }
                                                      type iteratorNext struct {
                                                      iteratorBase
                                                      cursor *sql.Rows
                                                      // TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in general
                                                      nextPathRes graph.Ref
                                                      nextPathTags map[string]graph.Ref
                                                      }
                                                        type iteratorContains struct {
                                                        iteratorBase
                                                        // TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in general
                                                        nextPathRows *sql.Rows
                                                        }

                                                        进行层序遍历,先遍历next,结束后调用nextpath

                                                          func (it *iteratorContains) Contains(ctx context.Context, v graph.Ref) bool {
                                                          it.ensureColumns()
                                                          sel := it.query
                                                          sel.Where = append([]Where{}, sel.Where...)
                                                          switch v := v.(type) {
                                                          case NodeHash:
                                                          i, ok := it.cind[quad.Any]
                                                          if !ok {
                                                          return false
                                                          }
                                                          f := it.query.Fields[i]
                                                          sel.WhereEq(f.Table, f.Name, v)
                                                          case QuadHashes:
                                                          for _, d := range quad.Directions {
                                                          i, ok := it.cind[d]
                                                          if !ok {
                                                          return false
                                                          }
                                                          h := v.Get(d)
                                                          if !h.Valid() {
                                                          continue
                                                          }
                                                          f := it.query.Fields[i]
                                                          sel.WhereEq(f.Table, f.Name, NodeHash{h})
                                                          }
                                                          default:
                                                          return false
                                                          }




                                                          rows, err := it.qs.Query(ctx, sel)
                                                          if err != nil {
                                                          it.err = err
                                                          return false
                                                          }
                                                          if it.query.nextPath {
                                                          if it.nextPathRows != nil {
                                                          _ = it.nextPathRows.Close()
                                                          }
                                                          it.nextPathRows = rows
                                                          } else {
                                                          defer rows.Close()
                                                          }
                                                          if !rows.Next() {
                                                          it.err = rows.Err()
                                                          return false
                                                          }
                                                          return it.scanValue(rows)
                                                          }

                                                          文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                          评论