在分析完如何使用cayley后,我们总结下使用的时候的步骤:
1,初始化后端存储,比如memory、kv、sql等
2,加入四元祖
3,指定起始节点
4,绑定查询条件
5,使用迭代器获取结果
如何初始化后端存储呢?我们分析下它的源码,比如memstore的初始化逻辑位于imports.go,第一个参数是存储类型,第二个参数存储的磁盘路径,第三个参数是可选项:
func NewMemoryGraph() (*Handle, error) {return NewGraph("memstore", "", nil)}
初始化后端存储的同时会初始化四元祖writer
func NewGraph(name, dbpath string, opts graph.Options) (*Handle, error) {qs, err := graph.NewQuadStore(name, dbpath, opts)qw, err := graph.NewQuadWriter("single", qs, nil)return &Handle{qs, qw}, nil
type Handle struct {graph.QuadStoregraph.QuadWriter}
graph/quadwriter.go
func Unwrap(qs QuadStore) QuadStore {if h, ok := qs.(*Handle); ok {return h.QuadStore}return qs}
四元祖writer定义如下:
type QuadWriter interface {// AddQuad adds a quad to the store.AddQuad(quad.Quad) error// TODO(barakmich): Deprecate in favor of transaction.// AddQuadSet adds a set of quads to the store, atomically if possible.AddQuadSet([]quad.Quad) error// RemoveQuad removes a quad matching the given one from the database,// if it exists. Does nothing otherwise.RemoveQuad(quad.Quad) error// ApplyTransaction applies a set of quad changes.ApplyTransaction(*Transaction) error// RemoveNode removes all quads which have the given node as subject, predicate, object, or label.//// It returns ErrNodeNotExists if node is missing.RemoveNode(quad.Value) error// Close cleans up replication and closes the writing aspect of the database.Close() error}
这个文件也通过name alias的方式导出了其它包的对象:
StartMorphism = path.StartMorphismStartPath = path.StartPathNewTransaction = graph.NewTransaction
绑定起始节点的逻辑位于graph/path/path.go
func StartPath(qs graph.QuadStore, nodes ...quad.Value) *Path {return newPath(qs, isMorphism(nodes...))}
它的第二个参数是isMorphism的返回值,其中morphism的定义如下,表示一个路径的映射,内部包含了反向映射:
graph/path/morthisms_apply_functions.go
func isMorphism(nodes ...quad.Value) morphism {return morphism{Reversal: func(ctx *pathContext) (morphism, *pathContext) { return isMorphism(nodes...), ctx },Apply: func(in shape.Shape, ctx *pathContext) (shape.Shape, *pathContext) {
type morphism struct {IsTag boolReversal func(*pathContext) (morphism, *pathContext)Apply applyMorphismtags []string}
func newPath(qs graph.QuadStore, m ...morphism) *Path {qs = graph.Unwrap(qs)return &Path{stack: m,qs: qs,}
path是我们进行链式绑定查询条件的基础对象,其定义如下:
type Path struct {stack []morphismqs graph.QuadStore // Optionally. A nil qs is equivalent to a morphism.baseContext pathContext}
比如我们绑定扇出查询条件:它会复制一份路径信息,然后把查询条件放在后面。
func (p *Path) Out(via ...interface{}) *Path {np := p.clone()np.stack = append(np.stack, outMorphism(nil, via...))if len(tags) != 0 {via = Save{From: via, Tags: tags}}
具体如何绑定扇出条件的代码位于graph/shape/path.go
func Out(from, via, labels Shape, tags ...string) Shape {return buildOut(from, via, labels, tags, false)}
func buildOut(from, via, labels Shape, tags []string, in bool) Shape {start, goal := quad.Subject, quad.Objectif in {start, goal = goal, start}if _, ok := from.(AllNodes); !ok {quads = append(quads, QuadFilter{Dir: start, Values: from,})if _, ok := via.(AllNodes); !ok {quads = append(quads, QuadFilter{Dir: quad.Predicate, Values: via,})if labels != nil {if _, ok := labels.(AllNodes); !ok {quads = append(quads, QuadFilter{Dir: quad.Label, Values: labels,})return NodesFrom{Quads: quads, Dir: goal}
本质上是构造了查询四元祖。
func Iterate(ctx context.Context, qs graph.QuadStore, s Shape) *graph.IterateChain {it := BuildIterator(qs, s)return graph.Iterate(ctx, it).On(qs)
四元祖过滤逻辑位于graph/shape/shape.go
type Quads []QuadFiltertype QuadFilter struct {Dir quad.DirectionValues Shape}
type Shape interface {// BuildIterator constructs an iterator tree from a given shapes and binds it to QuadStore.BuildIterator(qs graph.QuadStore) graph.Iterator// Optimize runs an optimization pass over a query shape.//// It returns a bool that indicates if shape was replaced and should always return a copy of shape in this case.// In case no optimizations were made, it returns the same unmodified shape.//// If Optimizer is specified, it will be used instead of default optimizations.Optimize(r Optimizer) (Shape, bool)}
func BuildIterator(qs graph.QuadStore, s Shape) graph.Iterator {qs = graph.Unwrap(qs)return s.BuildIterator(qs)
绑定完条件,我们就通过迭代器来获取查询结果:
func (p *Path) Iterate(ctx context.Context) *graph.IterateChain {return shape.Iterate(ctx, p.qs, p.Shape())
graph/iterate.go,多个迭代器会组成一个迭代器链
type IterateChain struct {ctx context.Contexts IteratorShapeit Scannerqs QuadStorepaths booloptimize boollimit intn int}
func Iterate(ctx context.Context, it Iterator) *IterateChain {return &IterateChain{ctx: ctx, s: AsShape(it),limit: -1, paths: true,optimize: true,}
func (c *IterateChain) On(qs QuadStore) *IterateChain {c.qs = qsreturn c
设置好迭代器后通过EachValue来遍历结果集:
func (c *IterateChain) EachValue(qs QuadStore, fnc func(quad.Value)) error {return c.Each(func(v Ref) {if nv := c.qs.NameOf(v); nv != nil {fnc(nv)}})
func (c *IterateChain) Each(fnc func(Ref)) error {c.start()defer c.end()for c.next() {select {case <-done:return c.ctx.Err()default:}fnc(c.it.Result())for c.nextPath() {select {case <-done:fnc(c.it.Result())
在each函数内部进行了广度优先遍历。
func (c *IterateChain) start() {c.it = c.s.Iterate()
func (c *IterateChain) end() {
其中next函数和nextPath函数是逻辑的核心所在:
func (c *IterateChain) next() bool {ok := (c.limit < 0 || c.n < c.limit) && c.it.Next(c.ctx)
func (c *IterateChain) nextPath() bool {ok := c.paths && (c.limit < 0 || c.n < c.limit) && c.it.NextPath(c.ctx)
遍历完了,通过ref获取结果graph/values.go
type Ref interface {// Key returns a dynamic type that is comparable according to the Go language specification.// The returned value must be unique for each receiver value.Key() interface{}}






