下面分析下mysql作为后端存储的时候,是如何存储的,它的核心源码位graph/sql/mysql/mysql.go,首先定义了存储类型是mysql,然后注册。
const Type = "mysql"
func init() {csql.Register(Type, csql.Registration{Driver: "mysql",HashType: fmt.Sprintf(`BINARY(%d)`, quad.HashSize),
func runTxMysql(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) error {for _, n := range nodes {if n.RefInc >= 0 {nodeKey, values, err := csql.NodeValues(csql.NodeHash{n.Hash}, n.Val)if err != nil {return err}values = append([]interface{}{n.RefInc}, values...)values = append(values, n.RefInc) // one more time for UPDATEstmt, ok := insertValue[nodeKey]if !ok {var ph = make([]string, len(values)-1) // excluding last incrementfor i := range ph {ph[i] = "?"}stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` +strings.Join(nodeKey.Columns(), ", ") +`) VALUES (` + strings.Join(ph, ", ") +`) ON DUPLICATE KEY UPDATE refs = refs + ?;`)_, err = stmt.Exec(values...)for _, d := range quads {dirs := make([]interface{}, 0, len(quad.Directions))for _, h := range d.Quad.Dirs() {dirs = append(dirs, csql.NodeHash{h}.SQLValue())}if !d.Del {if insertQuad == nil {insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`)if err != nil {_, err := insertQuad.Exec(dirs...)
实现了把顶点和四元祖插入了mysql。
func convInsertError(err error) error {
graph/sql/database.go
var types = make(map[string]Registration)
func Register(name string, f Registration) {if f.Driver == "" {panic("no sql driver in type definition")}types[name] = fregisterQuadStore(name, name)
type Registration struct {Driver string // sql driver to use on dialHashType string // type for hash fieldsBytesType string // type for binary fieldsTimeType string // type for datetime fieldsHorizonType string // type for horizon counterNodesTableExtra string // extra SQL to append to nodes table definitionConditionalIndexes bool // database supports conditional indexesFillFactor bool // database supports fill percent on indexesNoForeignKeys bool // database has no support for FKsQueryDialectNoOffsetWithoutLimit bool // SELECT ... OFFSET can be used only with LIMITError func(error) error // error conversion functionEstimated func(table string) string // query that string that returns an estimated number of rows in tableRunTx func(tx *sql.Tx, nodes []graphlog.NodeUpdate, quads []graphlog.QuadUpdate, opts graph.IgnoreOpts) errorTxRetry func(tx *sql.Tx, stmts func() error) errorNoSchemaChangesInTx bool}
创建节点表
func (r Registration) nodesTable() string {htyp := r.HashTypeif htyp == "" {htyp = "BYTEA"}btyp := r.BytesTypeif btyp == "" {btyp = "BYTEA"}ttyp := r.TimeTypeif ttyp == "" {ttyp = "timestamp with time zone"}end := "\n);"if r.NodesTableExtra != "" {end = ",\n" + r.NodesTableExtra + end}return `CREATE TABLE nodes (hash ` + htyp + ` PRIMARY KEY,refs INT NOT NULL,value ` + btyp + `,value_string TEXT,datatype TEXT,language TEXT,iri BOOLEAN,bnode BOOLEAN,value_int BIGINT,value_bool BOOLEAN,value_float double precision,value_time ` + ttyp +end}
创建四元祖表
func (r Registration) quadsTable() string {htyp := r.HashTypeif htyp == "" {htyp = "BYTEA"}hztyp := r.HorizonTypeif hztyp == "" {hztyp = "SERIAL"}return `CREATE TABLE quads (horizon ` + hztyp + ` PRIMARY KEY,subject_hash ` + htyp + ` NOT NULL,predicate_hash ` + htyp + ` NOT NULL,object_hash ` + htyp + ` NOT NULL,label_hash ` + htyp + `,ts timestamp);`}
创建相关索引
func (r Registration) quadIndexes(options graph.Options) []string {indexes := make([]string, 0, 10)if r.ConditionalIndexes {indexes = append(indexes,`CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash) WHERE label_hash IS NULL;`,`CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash) WHERE label_hash IS NOT NULL;`,)} else {indexes = append(indexes,`CREATE UNIQUE INDEX spo_unique ON quads (subject_hash, predicate_hash, object_hash);`,`CREATE UNIQUE INDEX spol_unique ON quads (subject_hash, predicate_hash, object_hash, label_hash);`,)}if !r.NoForeignKeys {indexes = append(indexes,`ALTER TABLE quads ADD CONSTRAINT subject_hash_fk FOREIGN KEY (subject_hash) REFERENCES nodes (hash);`,`ALTER TABLE quads ADD CONSTRAINT predicate_hash_fk FOREIGN KEY (predicate_hash) REFERENCES nodes (hash);`,`ALTER TABLE quads ADD CONSTRAINT object_hash_fk FOREIGN KEY (object_hash) REFERENCES nodes (hash);`,`ALTER TABLE quads ADD CONSTRAINT label_hash_fk FOREIGN KEY (label_hash) REFERENCES nodes (hash);`,)}quadIndexes := [][3]quad.Direction{{quad.Subject, quad.Predicate, quad.Object},{quad.Object, quad.Predicate, quad.Subject},{quad.Predicate, quad.Object, quad.Subject},{quad.Object, quad.Subject, quad.Predicate},}factor, _ := options.IntKey("db_fill_factor", 50)for _, ind := range quadIndexes {var (name stringcols []string)for _, d := range ind {name += string(d.Prefix())cols = append(cols, d.String()+"_hash")}q := fmt.Sprintf(`CREATE INDEX %s_index ON quads (%s)`,name, strings.Join(cols, ", "))if r.FillFactor {q += fmt.Sprintf(" WITH (FILLFACTOR = %d)", factor)}indexes = append(indexes, q+";")}return indexes}
graph/sql/iterator.go
var _ shape.Optimizer = (*QuadStore)(nil)
func (qs *QuadStore) OptimizeShape(s shape.Shape) (shape.Shape, bool) {return qs.opt.OptimizeShape(s)
根据查询,组装sql语句:
func (qs *QuadStore) prepareQuery(s Shape) (string, []interface{}) {args := s.Args()vals := make([]interface{}, 0, len(args))for _, a := range args {vals = append(vals, a.SQLValue())}b := NewBuilder(qs.flavor.QueryDialect)qu := s.SQL(b)return qu, vals
执行查询
func (qs *QuadStore) QueryRow(ctx context.Context, s Shape) *sql.Row {qu, vals := qs.prepareQuery(s)return qs.db.QueryRowContext(ctx, qu, vals...)
var _ graph.IteratorFuture = (*Iterator)(nil)
func (qs *QuadStore) NewIterator(s Select) *Iterator {it := &Iterator{it: qs.newIterator(s),}it.Iterator = graph.NewLegacy(it.it, it)return it
type Iterator struct {it *iterator2graph.Iterator}
迭代器有两个
var _ graph.IteratorShapeCompat = (*iterator2)(nil)
type iterator2 struct {qs *QuadStorequery Selecterr error}
func (it *iterator2) Iterate() graph.Scanner {return newIteratorNext(it.qs, it.query)}
func (it *iterator2) Stats(ctx context.Context) (graph.IteratorCosts, error) {sz, err := it.getSize(ctx)return graph.IteratorCosts{NextCost: 1,ContainsCost: 10,Size: sz,}, err
func (it *iterator2) String() string {return it.query.SQL(NewBuilder(it.qs.flavor.QueryDialect))}
基础迭代器
type iteratorBase struct {qs *QuadStorequery Selectcols []stringcind map[quad.Direction]interr errorres graph.Reftags map[string]graph.Ref}
扫描行结果
func (it *iteratorBase) ensureColumns() {if it.cols != nil {return}it.cols = it.query.Columns()it.cind = make(map[quad.Direction]int, len(quad.Directions)+1)for i, name := range it.cols {if !strings.HasPrefix(name, tagPref) {continue}if name == tagNode {it.cind[quad.Any] = icontinue}name = name[len(tagPref):]for _, d := range quad.Directions {if name == d.String() {it.cind[d] = ibreak}}}}
扫描值
func (it *iteratorBase) scanValue(r *sql.Rows) bool {it.ensureColumns()nodes := make([]NodeHash, len(it.cols))pointers := make([]interface{}, len(nodes))for i := range pointers {pointers[i] = &nodes[i]}if err := r.Scan(pointers...); err != nil {for i, name := range it.cols {if !strings.Contains(name, tagPref) {it.tags[name] = nodes[i].ValueHash}
type iteratorNext struct {iteratorBasecursor *sql.Rows// TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in generalnextPathRes graph.RefnextPathTags map[string]graph.Ref}
type iteratorContains struct {iteratorBase// TODO(dennwc): nextPath workaround; remove when we get rid of NextPath in generalnextPathRows *sql.Rows}
进行层序遍历,先遍历next,结束后调用nextpath
func (it *iteratorContains) Contains(ctx context.Context, v graph.Ref) bool {it.ensureColumns()sel := it.querysel.Where = append([]Where{}, sel.Where...)switch v := v.(type) {case NodeHash:i, ok := it.cind[quad.Any]if !ok {return false}f := it.query.Fields[i]sel.WhereEq(f.Table, f.Name, v)case QuadHashes:for _, d := range quad.Directions {i, ok := it.cind[d]if !ok {return false}h := v.Get(d)if !h.Valid() {continue}f := it.query.Fields[i]sel.WhereEq(f.Table, f.Name, NodeHash{h})}default:return false}rows, err := it.qs.Query(ctx, sel)if err != nil {it.err = errreturn false}if it.query.nextPath {if it.nextPathRows != nil {_ = it.nextPathRows.Close()}it.nextPathRows = rows} else {defer rows.Close()}if !rows.Next() {it.err = rows.Err()return false}return it.scanValue(rows)}


文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




