暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【TuGraph开发指南】TuGraph Procedure API

原创 小小亮 2023-07-07
446

Procedure API

此文档主要讲解 TuGraph 的存储过程使用说明

1.简介

当用户需要表达的查询/更新逻辑较为复杂(例如 Cypher 无法描述,或是对性能要求较高)时,相比调用多个 REST 请求并在客户端完成整个处理流程的方式,TuGraph 提供的存储过程是更简洁和高效的选择。

与传统数据库类似,TuGraph 的存储过程运行在服务器端,用户通过将处理逻辑(即多个操作)封装到一个过程中来减少客户端使用时需要的 REST 接口调用次数,并且可以在实现时通过并行处理的方式(例如使用相关的 C++ OLAP 接口以及基于其实现的内置算法)进行进一步的加速。

2.使用说明

在 TuGraph 中,用户可以动态的加载,更新和删除存储过程。TuGraph 支持 C 语言和 Python 语言编写的存储过程。C 语言编写的存储过程扩展名一般为.so(Linux/Unix 系统下)或者.dll(Windows 系统下)。Python 存储过程的扩展名为.py。
两种存储过程是分别管理的,但使用方式是相同的。用户可以通过 REST API 或者 RPC 来管理和调用存储过程。下面的说明以 REST API 为例,相应的 RPC 调用方式详见 RPC 文档。

2.1.编写存储过程

2.1.1.编写C++存储过程

用户可以通过使用 core API 或者 Traversal API 来编写 C 存储过程。一个简单的 C 存储过程举例如下:

##include <iostream> ##include "lgraph.h" using namespace lgraph_api; extern "C" LGAPI bool Process(GraphDB& db, const std::string& request, std::string& response) { auto txn = db.CreateReadTxn(); size_t n = 0; for (auto vit = txn.GetVertexIterator(); vit.IsValid(); vit.Next()) { if (vit.GetLabel() == "student") { auto age = vit.GetField("age"); if (!age.is_null() && age.integer() == 10) n++; ## 统计所有年龄为10的学生数量 } } output = std::to_string(n); return true; }

从代码中我们可以看到,存储过程的入口函数是Process函数,它的参数有三个,分别为:

  • db: 数据库实例
  • request: 输入请求数据,可以是二进制字节数组,或者 JSON 串等其它任意格式。
  • response: 输出数据,可以是字符串,也可以直接返回二进制数据。

Process函数的返回值是一个布尔值。当它返回true的时候,表示该请求顺利完成,反之表示这个存储过程在执行过程中发现了错误,此时用户可以通过response来返回错误信息以方便调试。

C++存储过程编写完毕后需要编译成动态链接库。TuGraph 提供了compile.sh脚本来帮助用户自动编译存储过程。compile.sh脚本只有一个参数,是该存储过程的名称,在上面的例子中就是age_10。编译调用命令行如下:

g++ -fno-gnu-unique -fPIC -g --std=c++14 -I/usr/local/include/lgraph -rdynamic -O3 -fopenmp -o age_10.so age_10.cpp /usr/local/lib64/liblgraph.so -shared

如果编译顺利,会生成 age_10.so,然后用户就可以将它加载到服务器中了。

2.1.2.编写Python存储过程

与 C++类似,Python 存储过程也可以调用 core API,一个简单的例子如下:

def Process(db, input): txn = db.CreateReadTxn() it = txn.GetVertexIterator() n = 0 while it.IsValid(): if it.GetLabel() == 'student' and it['age'] and it['age'] == 10: n = n + 1 it.Next() return (True, str(nv))

Python 存储过程返回的是一个 tuple,其中第一个元素是一个布尔值,表示该存储过程是否成功执行;第二个元素是一个str,里面是需要返回的结果。

Python 存储过程不需要编译,可以直接加载。

2.2.加载存储过程

用户可以通过 REST API 和 RPC 来加载存储过程。以 REST API 为例,加载age_10.so的 C++代码如下:

import requests import json import base64 data = {'name':'age_10'} f = open('./age_10.so','rb') content = f.read() data['code_base64'] = base64.b64encode(content).decode() data['description'] = 'Calculate number of students in the age of 10' data['read_only'] = true data['code_type'] = 'so' js = json.dumps(data) r = requests.post(url='http://127.0.0.1:7071/db/school/cpp_plugin', data=js, headers={'Content-Type':'application/json'}) print(r.status_code) ## 正常时返回200

需要注意的是,这时的data['code']是一个经过 base64 处理的字符串,age_10.so中的二进制代码是无法通过 JSON 直接传输的。此外,存储过程的加载和删除都只能由具有管理员权限的用户来操作。

存储过程加载之后会被保存在数据库中,在服务器重启后也会被自动加载。此外,如果需要对存储过程进行更新,调用的 REST API 也是同样的。建议用户在更新存储过程时更新相应描述,以便区分不同版本的存储过程。

加载 Python 存储过程的方法与 C++类似,只是 url 地址不同。Python 代码同样需要使用 base64 处理。

2.2.1.列出已加载的存储过程

在服务器运行过程中,用户可以随时获取存储过程列表。其调用如下:

>>> r = requests.get('http://127.0.0.1:7071/db/school/cpp_plugin') >>> r.status_code 200 >>> r.text '{"plugins":[{"description":"Calculate number of students in the age of 10", "name":"age_10", "read_only":true}]}'

2.2.2.获取存储过程详情

在服务器运行过程中,用户可以随时获取单个存储过程的详情,包括代码。其调用如下:

>>> r = requests.get('http://127.0.0.1:7071/db/school/cpp_plugin/age_10') >>> r.status_code 200 >>> r.text '{"description":"Calculate number of students in the age of 10", "name":"age_10", "read_only":true, "code_base64":<CODE>, "code_type":"so"}'

2.2.3.调用存储过程

调用存储过程的代码示例如下:

>>> r = requests.post(url='http://127.0.0.1:7071/db/school/cpp_plugin/age_10', data='', headers={'Content-Type':'application/json'}) >>> r.status_code 200 >>> r.text 9

2.2.4.删除存储过程

删除存储过程只需要如下调用:

>>> r = requests.delete(url='http://127.0.0.1:7071/db/school/cpp_plugin/age_10') >>> r.status_code 200

与加载存储过程类似,只有管理员用户才能删除存储过程。

2.2.5.更新存储过程

更新存储过程需要执行如下两个步骤:

  1. 删除已存在的存储过程
  2. 安装新的存储过程

TuGraph 较为谨慎地管理存储过程操作的并发性,更新存储过程不会影响现有存储过程的运行。

3.C++ OLAP接口

TuGraph 强大的在线分析处理(OLAP)能力是其区别于其它图数据库的一个重要特性。
借助 C++ OLAP API(olap_on_db.h),用户可以快速地导出一个需要进行复杂分析的子图,然后在其上运行诸如 PageRank、连通分量、社区发现等迭代式图计算过程,最后根据结果做出相应决策。
导出和计算的过程都可以通过并行处理的方式进行加速,从而实现几乎实时的分析处理,避免了传统解决方案需要将数据导出、转换、再导入(ETL)到专门的分析系统进行离线处理的冗长步骤。

TuGraph 内置了大量常用的图分析算法和丰富的辅助接口,因此用户几乎不需要自己来实现具体的图计算过程,只需在实现自己的存储过程时将相应算法库的头文件(.h 文件)包含到自己程序中,并在编译时链接相应的动态库文件(.so)即可。
一般情况下,用户需要自己实现的只有将需要分析的子图抽取出来的过程。

3.1.Snapshot

C++ OLAP API 中的 Snapshot 模版类用于表示抽取出来的静态子图,其中 EdgeData 用来表示该子图上每条边所用权值的数据类型(如果边不需要权值,使用 Empty 作为 EdgeData 即可)。

抽取的子图通过 Snapshot 类的构造函数来描述:

Snapshot::Snapshot( GraphDB & db, Transaction & txn, size_t flags = 0, std::function<bool(VertexIterator &)> vertex_filter = nullptr, std::function<bool(OutEdgeIterator &, EdgeData &)> out_edge_filter = nullptr );

其中,db 为数据库句柄,txn 为事务句柄,flags 为生成时使用的选项,可选值包括以下的组合:SNAPSHOT_PARALLEL 表示导出时使用多个线程进行并行;SNAPSHOT_UNDIRECTED 表示需要将导出的图变为无向图。
vertex_filter 是面向顶点的用户自定义过滤函数,返回值为 true 表示该顶点需要被包含到待抽取的子图中,反之则表示需要被排除。
out_edge_filter 是面向边的用户自定义过滤函数,返回值为 true 表示该边需要被包含到待抽取的子图中,反之则表示需要被排除。
当过滤函数为缺省值时,则表示需要将所有顶点/边都包含进来。

Snapshot 类提供的其它方法请参考详细的 C++ API 文档(olap_on_db.h)。

3.2.Traversal

图数据库中十分常见的一大类分析是基于一个或多个顶点出发,逐层地拓展并访问邻居。
尽管这类分析也可以使用 Cypher 完成,但是当访问的层数较深时,其性能会受到串行解释执行的限制。
使用 C++ Core API 编写存储过程尽管避免了解释执行,但依然受限于单个线程的处理能力。
为了让用户能够方便地通过并行处理的方式加速这一类应用场景,我们基于 C++ OLAP API 封装了一个 Traversal 框架,用户可以直接使用其中的 FrontierTraversal 和 PathTraversal 类来完成这种逐层遍历的分析任务,具体的使用方法可以参考相应的 C++ API 文档(lgraph_traversal.h)。

ParallelVector<size_t> FindVertices( GraphDB & db, Transaction & txn, std::function<bool(VertexIterator &)> filter, bool parallel = false );

该方法可用于找到所有满足条件(filter 返回 true)的顶点,当 parallel 为 true 时则会并行该查找过程。

template <typename VertexData> ParallelVector<VertexData> ExtractVertexData( GraphDB & db, Transaction & txn, ParallelVector<size_t> & frontier, std::function<void(VertexIterator &, VertexData &)> extract, bool parallel = false );

该方法可用于从指定顶点集(frontier)中(通过 extract 方法)抽取(类型为 VertexData 的)属性,当 parallel 为 true 时会并行该抽取过程。

FrontierTraversal 适用于只关注遍历扩展到的顶点集的情况;当用户在遍历过程或是结果中需要访问路径上的信息(路径上的顶点/边)时,则需要使用 PathTraversal。
两类 Traversal 的构造函数均有三个参数,分别为数据库句柄 db、事务句柄 txn 和选项 flags。
选项的可选值包括以下的组合:TRAVERSAL_PARALLEL 表示遍历时使用多个线程并行;TRAVERSAL_ALLOW_REVISITS 表示遍历时允许重复地访问顶点(PathTraversal 隐含了该选项)。

void SetFrontier(size_t root_vid); void SetFrontier(ParallelVector<size_t> & root_vids); void SetFrontier(std::function<bool(VertexIterator &)> root_vertex_filter);

两类 Traversal 设置遍历的起始顶点/顶点集有上述三种方式,前两种通过顶点 ID 直接指定,最后一种方式则类似于 FindVertices。

两类 Traversal 的遍历都是从当前层的顶点集合出发,根据使用的扩展函数访问每条出边/入边/出边和入边,通过用户自定义的过滤函数决定扩展是否成功,若成功则将邻居顶点/追加了该条边的路径加入下一层的顶点/路径集合。

void ExpandOutEdges( std::function<bool(OutEdgeIterator &)> out_edge_filter = nullptr, std::function<bool(VertexIterator &)> out_neighbour_filter = nullptr ); void ExpandInEdges( std::function<bool(InEdgeIterator &)> in_edge_filter = nullptr, std::function<bool(VertexIterator &)> in_neighbour_filter = nullptr ); void ExpandEdges( std::function<bool(OutEdgeIterator &)> out_edge_filter = nullptr, std::function<bool(InEdgeIterator &)> in_edge_filter = nullptr, std::function<bool(VertexIterator &)> out_neighbour_filter = nullptr, std::function<bool(VertexIterator &)> in_neighbour_filter = nullptr );

上述为 FrontierTraversal 的三种遍历方式,即从当前的顶点集合出发,对集合中的每个顶点,依次访问每条出边/入边/出边和入边,若满足用户自定义的过滤条件(其中,edge_filter 为面向边的过滤函数,neighbour_filter 则为面向邻居顶点的过滤函数),则将邻居顶点加入新的顶点集合。

ParallelVector<size_t> & GetFrontier();

当前顶点集合的扩展结束后,新的顶点集合可以通过上述方法取得。

void ExpandOutEdges( std::function<bool(OutEdgeIterator &, Path &, IteratorHelper &)> out_edge_filter = nullptr, std::function<bool(VertexIterator &, Path &, IteratorHelper &)> out_neighbour_filter = nullptr ); void ExpandInEdges( std::function<bool(InEdgeIterator &, Path &, IteratorHelper &)> in_edge_filter = nullptr, std::function<bool(VertexIterator &, Path &, IteratorHelper &)> in_neighbour_filter = nullptr ); void ExpandEdges( std::function<bool(OutEdgeIterator &, Path &, IteratorHelper &)> out_edge_filter = nullptr, std::function<bool(InEdgeIterator &, Path &, IteratorHelper &)> in_edge_filter = nullptr, std::function<bool(VertexIterator &, Path &, IteratorHelper &)> out_neighbour_filter = nullptr, std::function<bool(VertexIterator &, Path &, IteratorHelper &)> in_neighbour_filter = nullptr );

PathTraversal 的三种遍历方式与 FrontierTraversal 类似,只是用户自定义的过滤函数中增加了两个参数,其中:Path 包含了到新扩展的这条边之前的路径,IteratorHelper 可用于将路径中的顶点/边转为数据库中对应的迭代器,相关文档可参考对应的 C++ API 文档。

4.Procedure-api 文档

TuGraph-Python-Procedure-API [文档下载]

TuGraph-CPP-Procedure-API [文档下载]

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论