暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

流图计算之增量match原理与应用

TuGraph 2025-06-03
106
点击蓝字,关注我们


问题背景

在流式计算中,数据往往不是全部一批到来,而会源源不断地进行输入和计算,在图计算/图查询领域,也存在类似的场景,图的点边不断地从数据源读取,进行构图,从而形成增量图。在增量图查询中,图随时发生着变化,在不同的图版本中,进行图查询的结果也会有所不同。对于某一次新增的点边,构成了一个新的版本的图,如果重新对全图(即当前所有点边)进行图遍历,开销较大,并且也会和历史数据有重复。由于历史的数据已经计算过一遍,理想情况下,只需要对增量所影响的部分进行计算/查询,而不需要对全图重新进行查询。

GQL(Graph Query Language)是国际标准化组织(ISO)为标准化图查询语言所制定的一个标准,用于在图上执行查询的语言。TuGraph Analytics (Geaflow)是蚂蚁图计算团队开源的流图计算引擎,专注于处理动态变化的图数据,支持大规模、高并发的实时图计算场景。本文将介绍在Geaflow引擎中,对增量图使用GQL进行增量Match的方法,目的尽可能地只对增量的数据进行查询,避免冗余的全量计算。


当前问题

Geaflow引擎基于点中心框架(vertex center),通过迭代的方式,每一轮迭代中,每个点向其他点发送消息,并在下一轮收到消息时进行处理、分析。在Geaflow的框架中,GQL的查询需要从前往后进行Traversal遍历走图,即从起始节点开始出发,进行扩散,依次进行点边匹配,直到match所需要的查询pattern。在动态图里,如果只使用新增的点边触发计算,结果会有缺失,例如下面例子所示。



方案步骤

整体流程示例图如下:



1、首先得到query的计划的迭代次数N,需向外扩充N-1度(maxEvolveIteration=N-1),即可覆盖当前query。框架的最大迭代数将设置为N + maxEvolveIteration(N>2)

    例如
    match(a)迭代数为1,此时不需要Evolve逻辑
    match(a)-[e]->(b)迭代数为2,此时不需要Evolve逻辑
    match(a)-[e]->(b)->[e2]->(c)迭代数为3 最大迭代数5

    2、由于当迭代数较大时,扩充子图可能可能扩充到全图,设置一个阈值T, 当N<=T 才执行这个增量逻辑。

    3、在每个window数据加入图中后,对于新增的点边,每个点会向邻居发送EvolveVertexMessage,执行N-1次迭代,将N-1度子图扩充进来。即当前迭代小于maxEvolveIteration(N-1)时,发送EvolveVertexMessage。

    4、每个点在向邻居点发送EvolveMessage时,需要将自己的id放在消息中,收到消息的点记录其发送点的id, 添加到targetIdList,在后续traversal阶段中使用。此步骤作用是下游节点将增量信息反向传递给上游,上游点在进行遍历时可以得知下游的增量影响部分,从而只遍历这些含有动态信息的下游点,而不需要再遍历所有邻居点。

    反向扩展的主要逻辑在GeaFlowDynamicVCTraversalFunction中,GeaFlowDynamicVCTraversalFunction继承自IncVertexCentricFunction,在Geaflow中IncVertexCentricFunction是一个表示增量VC方法(点中心)的接口,在每次迭代中,都会对当前收到消息的点进行触发,执行compute方法中的逻辑。


      @Override
      public void compute(Object vertexId, Iterator<MessageBox> messageIterator) {
          TraversalRuntimeContext context = commonFunction.getContext();
          if (needIncrTraversal()) {
              long iterationId = context.getIterationId();
              // sendEvolveMessage to evolve subGraphs when iterationId is less than the plan iteration
              if (iterationId < queryMaxIteration - 1) {
                  evolveIds.add(vertexId);
                  sendEvolveMessage(vertexId, context);
                  return;
              }


              if (iterationId == queryMaxIteration - 1) {
                  // the current iteration is the end of evolve phase.
                  evolveIds.add(vertexId);
                  return;
              }
              // traversal
              commonFunction.compute(vertexId, messageIterator);


          } else {
              commonFunction.compute(vertexId, messageIterator);
          }
      }


      具体示例如下:



      总结进行Evolve扩展的条件:


      1、query的迭代次数>2:当match小于两跳时不需要Evolve。
      2、query的迭代次数<=Threshold:如果迭代数太多可能扩展到全图。
      3、windowId>1:第一次构图不需要进行Evolve阶段。
      4、GQL语句中没有起始点:如果有起始点,则只需使用起始点计算,不需要扩展子图,例如查询语句Match(a:person where a.id = 1))return a.name。

      Demo示例

      在Geaflow中,通过设置点表或边表的windowSize来默认实现增量逻辑,即每一批读入windowSize大小的点边数据,来构建增量图。


        CREATE GRAPH modern (
          Vertex person (
          id bigint ID,
          name varchar,
          age int
        ),
        Edge knows (
          srcId bigint SOURCE ID,
          targetId bigint DESTINATION ID,
          weight double
        ),
        WITH (
            storeType='rocksdb',
          shardCount = 1
        );


        CREATE TABLE modern_vertex (
          id varchar,
          type varchar,
          name varchar,
          other varchar
        WITH (
          type='file',
          geaflow.dsl.file.path = 'resource:///data/incr_modern_vertex.txt',
          geaflow.dsl.window.size = 20
        );


        CREATE TABLE modern_edge (
          srcId bigint,
          targetId bigint,
          type varchar,
          weight double
        WITH (
          type='file',
        geaflow.dsl.file.path = 'resource:///data/incr_modern_edge.txt',
        geaflow.dsl.window.size = 3
        );


        INSERT INTO modern.person
          SELECT cast(id as bigint), name, cast(other as intas age
          FROM modern_vertex WHERE type = 'person'
        ;




        INSERT INTO modern.knows
          SELECT srcId, targetId, weight
          FROM modern_edge WHERE type = 'knows'
        ;


        CREATE TABLE tbl_result (
          a_id BIGINT,
          b_id BIGINT,
          c_id BIGINT,
          d_id BIGINT
        WITH (
            type='file',
        geaflow.dsl.file.path='${target}'
        );


        USE GRAPH modern;


        INSERT INTO tbl_result
          SELECT
          a_id, b_id, c_id,d_id
          FROM (
          MATCH (a:person) -[e:knows]->(b:person)<-[e2:knows]-(c:person)<-[e3:knows]-(d:person) where a.id!=c.id
          RETURN a.id as a_id,b.id as b_id,c.id as c_id , d.id as d_id
          )
        ;


        在Demo中,设置点windowSize为20,边windowSize为3,即构图时每个window导入20个点,3条边。并执行3跳的查询语句。示例Demo在IncrMatchTest.java中, 可直接运行执行Demo。


        总结和展望

        在动态图/流图的场景中,图的点边是在实时变化的,在进行图查询时,对于不同窗口数据的图,我们往往可以根据一些历史信息,只对增量的部分触发计算,来进行增量地计算,避免触发全图的遍历。Geaflow使用了一种基于子图扩展的增量match方法,应用于点中心分布式图计算框架,在动态图场景下进行增量的查询,未来期望实现更多更复杂场景下的增量匹配逻辑。



        ·END·



        欢迎关注TuGraph代码仓库✨

        TuGraph-DB 图数据库

        https://github.com/tugraph-family/tugraph-db

        GeaFlow 流式图计算引擎

        https://github.com/tugraph-family/tugraph-analytics

        Chat2Graph 图原生智能体系统
        https://github.com/tugraph-family/chat2graph




        文章转载自TuGraph,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论