1. 概述
在Galaxybase-S 提供的APOC库中包含了数百个通用的函数和聚合函数,可用于数据集成、图算法或数据转换等领域。但随着图数据库的应用领域不断扩大,Galaxybase-S 提供的接口和语言已经逐渐不能完全满足所有的业务场景。并且由于驱动接口产生的网络影响以及语言产生的调用链影响,用户不能百分之百地利用Galaxybase-S 计算框架的性能。为了能让用户根据业务定制接口,并且能够充分利用 Galaxybase-S 的计算框架,因此提供了PAR API的高级Java编程方式。PAR全称为Parameterized Algorithm Routine(参数化算法程序)。
2. 使用方式
2.1 准备
PAR API项目需要依赖一个jar包。
- graphdbapi-par-api-3.4.0.jar:提供了图的基础操作、注解列表等。支持列表请查看graph-api(Graph模型和数据操作)和function(注解列表和变量列表)文档
2.2 编码方式
通过使用注解,实现OpenCypher语句对应的执行自定义函数和过程,选择2.3中的注册方式。可以参考自定义实现demo。
2.3 注册方式
2.3.1 内存编译
需要将源码(注:不是整个项目,只用相关.java文件,目录结构没要求)打成zip包通过51314的PAR管理进行上传、上线、使用。
源码格式规则:
package需要单独一行
每个import需要单独一行,不允许*的形式
import允许列表,*代表package下面的类:
- java.util.*
- java.util.concurrent.*
- java.util.concurrent.atomic.*
- java.util.concurrent.locks.*
- java.util.regex.*
- java.time.*
- java.time.chrono.*
- java.util.stream.*
- java.time.format.*
- java.time.zone.*
- java.math.*
- java.nio.charset.*
- java.text.*
- java.util.function.*
- com.google.gson.*
- com.graphdbapi.*
- com.graphdbapi.procedure.*
- com.graphdbapi.values.*
- com.graphdbapi.values.api.*
- com.graphdbapi.values.exception.*
- com.graphdbapi.values.storable.*
- com.graphdbapi.values.util.*
- com.graphdbapi.values.virtual.*
2.4 执行
在注册好后,需要通过bolt执行OpenCypher语句进行调用。可以往conf/galaxybase.properties下添加log.level=DEBUG来开启DEBUG模式查看注册情况
3. 自定义实现demo
3.1 自定义函数
3.1.1 自定义Scalar函数
用户自定义函数的调用方式与其他OpenCypher函数的调用方式相同,函数名必须确定。用户自定义函数以@UserFunction注解,不会像过程一样返回值流,而只返回单值。
注解列表:
- Context:资源类注解。默认提供了com.graphdbapi.Graph和com.graphdbapi.GraphLogger、com.graphdbapi.file.FileObject(具体使用请看第五小节)。如需要使用,必须要在字段上加注解。
- UserFunction:将方法声明为一个函数,表示该方法可通过OpenCypher查询语言调用。@UserFunction默认通过
类的包名.类方法名调用,也可以定义@UserFunction(“自定义的OpenCypher”)通过自定义OpenCypher语句调用。必填。 - Description:对函数的描述。
- Name:用以界定输入参数的名称,在调用时为函数确定参数值。必填。建议与参数名称相同。
自定义类:
package par.graph;
import com.graphdbapi.GraphLogger;
import com.graphdbapi.Graph;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Name;
import com.graphdbapi.procedure.UserFunction;
public class ScalarTest {
@Context
public Graph graph;
@Context
public GraphLogger log;
@UserFunction
public long valueAdd(@Name("value") Long value) {
log.info("value add");
return value + 10;
}
}
执行:
StatementResult statementResult = graph.executeQuery("match (n:个人) where par.graph.valueAdd(n.评分) > 10 return n");
while (statementResult.hasNext()) {
Record next = statementResult.next();
System.out.println(next.get(0).asNode());
}
结果:
node<36700160, 个人, {评分=9}>
3.1.2 自定义Aggregating函数
用户自定义聚合函数的调用方式与其他OpenCypher聚合函数的调用方式相同,函数名必须确定。用户自定义聚合函数使用@UserAggregationFunction注解。该注解函数必须返回聚合器类的实例,一个聚合器类包含一个用@UserAggregationUpdate注解的方法和一个用@UserAggregationResult注解的方法。UserAggregationUpdate会被多次调用,使类得以聚合数据;当聚合完成时,将调用一次带有@UserAggregationResult注解的方法,并返回聚合结果。
注解列表:
- Context:资源类注解。默认提供了com.graphdbapi.Graph和com.graphdbapi.GraphLogger、com.graphdbapi.file.FileObject(具体使用请看第五小节)。如需要使用,必须要在字段上加注解。
- UserAggregationFunction:将方法声明为一个聚合函数,表示该方法可通过OpenCypher查询语言调用。@UserAggregationFunction默认通过
类的包名.类方法名调用,也可以定义@UserAggregationFunction(“自定义的OpenCypher”)通过自定义OpenCypher语句调用。必填。 - Description:对聚合函数的描述。
- UserAggregationResult:聚合结果
- UserAggregationUpdate:聚合具体操作
- Name:用以界定输入参数的名称,在调用时为函数确定参数值。必填。建议与参数名称相同。
自定义类:
package par.graph;
import com.graphdbapi.GraphLogger;
import com.graphdbapi.Graph;
import com.graphdbapi.values.api.Node;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Name;
import com.graphdbapi.procedure.UserAggregationFunction;
import com.graphdbapi.procedure.UserAggregationResult;
import com.graphdbapi.procedure.UserAggregationUpdate;
import com.graphdbapi.procedure.UserFunction;
public class AggregatingTest {
@Context
public Graph graph;
@Context
public GraphLogger log;
@UserAggregationFunction
public LongStringAggregator longestString() {
return new LongStringAggregator();
}
public static class LongStringAggregator {
private int longest;
private String longestString;
@UserAggregationUpdate
public void findLongest(@Name("string") String string) {
if (string != null && string.length() > longest) {
longest = string.length();
longestString = string;
}
}
@UserAggregationResult
public String result() {
return longestString;
}
}
}
执行:
StatementResult statementResult = graph.executeQuery("match (n) return par.graph.longestString(n.姓名)");
while (statementResult.hasNext()) {
Record next = statementResult.next();
System.out.println(next.get(0).asString());
}
结果:
郝痴海
3.1.3 单值支持列表
- String
- Long
- Double
- Boolean
- java.time.LocalDate
- java.time.OffsetTime
- java.time.LocalTime
- java.time.ZonedDateTime
- java.time.LocalDateTime
- com.graphdbapi.values.api.Node
- com.graphdbapi.values.api.Relationship
- com.graphdbapi.values.api.Path
3.2 自定义过程实现
将方法声明为过程,这意味着可以从OpenCypher查询语言调用该方法。@Procedure必填字段,指定了调用的方法名,若不填则默认以包名+方法名,建议填写。@Description是方法描述。
注解列表:
- Context:资源类注解。默认提供了com.graphdbapi.Graph和com.graphdbapi.GraphLogger、com.graphdbapi.file.FileObject(具体使用请看第五小节)。如需要使用,必须要在字段上加注解。
- Procedure:将方法声明为一个过程,表示该方法可通过OpenCypher查询语言调用。@Procedure默认通过
类的包名.类方法名调用,也可以定义@Procedure(“自定义的OpenCypher”)通过自定义OpenCypher语句调用。必填。 - Description:对过程的描述。
- Name:用以界定输入参数的名称,在调用时为过程确定参数值。必填。建议与参数名称相同。
自定义类:
package par.graph;
import java.util.stream.Stream;
import com.graphdbapi.GraphLogger;
import com.graphdbapi.Graph;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Description;
import com.graphdbapi.procedure.Procedure;
public class Test {
@Context
public Graph graph;
@Context
public GraphLogger log;
@Procedure("par.graph.count")
@Description("count")
public Stream<SizeCount> count() {
return Stream.of(new SizeCount(graph.getAllVertexCount()));
}
}
自定义返回值对象:
package par.graph;
public class SizeCount {
public Long out;
public SizeCount(Long out) {
this.out = out;
}
}
3.2.1自定义过程
执行:
StatementResult statementResult = graph.executeQuery("call par.graph.count()");
while (statementResult.hasNext()) {
Record next = statementResult.next();
System.out.println(next.get("out").asLong());
}
结果:
4500
3.2.2 返回值支持列表
- boolean
- long
- byte[]
- com.graphdbapi.values.storable.CypherBoolean
- com.graphdbapi.values.storable.CypherDate
- com.graphdbapi.values.storable.CypherDouble
- com.graphdbapi.values.storable.CypherDuration
- com.graphdbapi.values.storable.CypherFloat
- com.graphdbapi.values.storable.CypherInt
- com.graphdbapi.values.storable.CypherLong
- com.graphdbapi.values.storable.CypherNumber
- com.graphdbapi.values.storable.CypherText
- com.graphdbapi.values.virtual.CypherList
- com.graphdbapi.values.virtual.CypherMap
- com.graphdbapi.values.virtual.CypherPath
- com.graphdbapi.values.virtual.CypherNode:可以通过com.graphdbapi.values.util.ConvertUtil的nodeValue()将Vertex转成该对象传输
- com.graphdbapi.values.virtual.CypherRelationship:可以通过com.graphdbapi.values.util.ConvertUtil的relationshipValue()将Edge转成该对象传输
- double, java.lang.Boolean
- java.lang.Double
- java.lang.Long
- java.lang.Number
- java.lang.Object
- java.lang.String
- java.time.LocalDate
- java.time.LocalDateTime
- java.time.LocalTime
- java.time.OffsetTime
- java.time.ZonedDateTime
- java.time.temporal.TemporalAmount
- java.util.List
- java.util.Map
定义返回值:
public class ValueResult {
public boolean b1;
public byte[] b2;
public CypherBoolean b3;
public CypherDate b4;
public CypherDouble b5;
public CypherDuration b6;
public CypherFloat b7;
public CypherInt b8;
public CypherLong b9;
public CypherNumber b10;
public CypherText x1;
public CypherList x2;
public CypherMap x3;
public CypherNode x4;
public CypherRelationship x5;
public Boolean a1;
public Double a2;
public Long a3;
public Number a4;
public Object a5;
public String a6;
public LocalDate a7;
public LocalDateTime a8;
public LocalTime a9;
public OffsetTime a10;
public ZonedDateTime a11;
public ValueResult(CypherRelationship relationshipValue) {
b1 = true;
b2 = new byte[]{(byte)1, (byte)2};
b3 = CypherValues.booleanValue(true);
b4 = CypherDate.date(LocalDate.of(2010, 10, 10));
b5 = CypherValues.doubleValue(0.1);
b6 = CypherDuration.duration(10, 10, 10, 10);
b7 = CypherValues.floatValue(0.2f);
b8 = CypherValues.intValue(1);
b9 = CypherValues.longValue(2l);
b10 = CypherValues.numberValue((short) 10);
x1 = CypherValues.stringValue("test");
x2 = CypherVirtuals.list(b8, b8);
x3 = CypherVirtuals.map(Collections.singletonMap("String", b10));
x4 = CypherVirtuals.nodeValue(0, CypherValues.stringValue("test"), x3);
x5 = relationshipValue;
a1 = false;
a2 = 1.1;
a3 = 1l;
a4 = 1;
a5 = "d";
a6 = "test";
a7 = LocalDate.of(2010, 10, 10);
a8 = LocalDateTime.now();
a9 = LocalTime.now();
a10 = OffsetTime.now();
a11 = ZonedDateTime.now();
}
}
执行并获取值:
while (statementResult.hasNext()) {
Record next = statementResult.next();
System.out.println(next.get("b1").asBoolean());
System.out.println(Arrays.toString(next.get("b2").asByteArray()));
System.out.println(next.get("b3").asBoolean());
System.out.println(next.get("b4").asLocalDate().getYear());
System.out.println(next.get("b5").asDouble());
System.out.println(next.get("b6").asIsoDuration());
System.out.println(next.get("b7").asFloat());
System.out.println(next.get("b8").asInt());
System.out.println(next.get("b9").asLong());
System.out.println(next.get("b10").asNumber().shortValue());
System.out.println(next.get("x1").asString());
System.out.println(next.get("x2").asList());
System.out.println(next.get("x3").asMap());
System.out.println(next.get("x4").asNode());
System.out.println(next.get("x5").asRelationship());
System.out.println(next.get("a1").asBoolean());
System.out.println(next.get("a2").asDouble());
System.out.println(next.get("a3").asLong());
System.out.println(next.get("a4").asNumber());
System.out.println(next.get("a5").asObject());
System.out.println(next.get("a6").asString());
System.out.println(next.get("a7").asLocalDate());
System.out.println(next.get("a8").asLocalDateTime());
System.out.println(next.get("a9").asLocalTime());
System.out.println(next.get("a10").asOffsetTime());
System.out.println(next.get("a11").asZonedDateTime());
}
4. 操作PAR
4.1 查询
4.1.1 查询自定义函数
句法:call dbms.functions()
返回值:函数列表
执行:
StatementResult statement = graph.executeQuery("call dbms.functions()");
while (statement.hasNext()) {
Record next = statement.next();
System.out.println(next);
}
结果:
Record<{name: "par.graph.size", signature: "par.graph.size(value :: INTEGER?) :: (INTEGER?)", description: ""}>
...
4.1.2 查询自定义聚合函数
句法:call dbms.aggregationFunctions()
返回值:聚合函数列表
执行:
StatementResult statement = graph.executeQuery("call dbms.aggregationFunctions()");
while (statement.hasNext()) {
Record next = statement.next();
System.out.println(next);
}
结果:
Record<{name: "par.graph.longestString", signature: "par.graph.longestString(string :: STRING?) :: (STRING?)", description: ""}>
...
4.1.3 查询自定义过程
句法:call dbms.procedures()
返回值:过程列表
执行:
StatementResult statement = graph.executeQuery("call dbms.procedures()");
while (statement.hasNext()) {
Record next = statement.next();
System.out.println(next);
}
结果:
Record<{name: "Aggregate", signature: "Aggregate(graphName :: STRING?, property :: STRING?, vertexTypes :: LIST? OF STRING?, edgeTypes :: LIST? OF STRING?, functions = [] :: LIST? OF STRING?) :: (value :: STRING?)", description: "execute Aggregate."}>
...
4.2 删除
4.2.1 删除自定义函数
句法:call remove.function("函数")
返回值:函数
执行:
StatementResult statement = graph.executeQuery("call remove.function('par.graph.size')");
while (statement.hasNext()) {
Record next = statement.next();
System.out.println(next);
}
结果:
Record<{name: "par.graph.size", signature: "par.graph.size(value :: INTEGER?) :: (INTEGER?)", description: ""}>
4.2.2 删除自定义聚合函数
句法:call remove.aggregationFunction("聚合函数")
返回值:聚合函数
执行:
StatementResult statement = graph.executeQuery("call remove.aggregationFunction('par.graph.longestString')");
while (statement.hasNext()) {
Record next = statement.next();
System.out.println(next);
}
结果:
Record<{name: "par.graph.longestString", signature: "par.graph.longestString(string :: STRING?) :: (STRING?)", description: ""}>
4.2.3 删除自定义过程
句法:call remove.procedure("过程")
返回值:过程
执行:
StatementResult statement = graph.executeQuery("call remove.procedure('par.graph.count')");
while (statement.hasNext()) {
Record next = statement.next();
System.out.println(next);
}
结果:
Record<{name: "par.graph.count", signature: "par.graph.count()", description: ""}>
5. File操作
5.1 使用方式
通过 Context:资源类注解com.graphdbapi.file.FileObject,可通过传入的路径,对相应的文件进行相应的操作。(相关接口参考javadoc文档)
可操作的文件范围在pars/work/下的文件,传输路径示例:(pars/work/test/test.txt)则path为:(test/test.txt)
其辅助使用的类有com.graphdbapi.file.BufferedReaderObject、com.graphdbapi.file.BufferedWriterObject、com.graphdbapi.file.GraphDbFileIOException。
对于BufferedReaderObject、BufferedWriterObject的对象可通过FileObject对象相应的方法设置文件路径、字符集格式来获取,GraphDbFileIOException为IO异常所抛出的异常类型。
5.2 示例
package temp;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import com.graphdbapi.file.BufferedReaderObject;
import com.graphdbapi.file.BufferedWriterObject;
import com.graphdbapi.file.FileObject;
import com.graphdbapi.file.GraphDbFileIOException;
import com.graphdbapi.file.RandomAccessFileObject;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Description;
import com.graphdbapi.procedure.Name;
import com.graphdbapi.procedure.Procedure;
public class TestDemo {
@Context
public FileObject fileObject;
@Procedure(name = "temp.show.all.file")
@Description("查看指定目录下所有文件")
public Stream<OutPut> list(@Name("path") String path) throws GraphDbFileIOException {
List<String> list = fileObject.list(path);
return Stream.of(new OutPut(list));
}
@Procedure(name = "temp.demo.file.isExist")
@Description("查看指定路径是否存在")
public Stream<OutPut> isExist(@Name("path") String path) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.isExist(path)));
}
@Procedure(name = "temp.demo.file.isDir")
@Description(value = "查看指定路径是否是文件夹")
public Stream<OutPut> isDir(@Name("path") String path) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.isDir(path)));
}
@Procedure(name = "temp.demo.file.createFile")
public Stream<OutPut> createFile(@Name("path") String path) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.createFile(path)));
}
@Procedure(name = "temp.demo.file.createDir")
public Stream<OutPut> createDir(@Name("path") String path) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.createDir(path)));
}
@Procedure(name = "temp.demo.file.delete")
public Stream<OutPut> delete(@Name("path") String path) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.delete(path)));
}
@Procedure(name = "temp.demo.file.deleteForce")
public Stream<OutPut> deleteForce(@Name("path") String path) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.deleteForce(path)));
}
@Procedure(name = "temp.demo.file.copy")
public Stream<OutPut> copy(@Name("path") String path, @Name("newPath") String newPath) throws GraphDbFileIOException {
return Stream.of(new OutPut(fileObject.copy(path, newPath)));
}
@Procedure(name = "temp.demo.file.bufferedReader")
public Stream<OutPut> bufferedReader( @Name("path") String path) throws GraphDbFileIOException {
BufferedReaderObject reader = fileObject.bufferedReader(path, StandardCharsets.UTF_8);
String s = reader.readLine();
return Stream.of(new OutPut(s));
}
@Procedure(name = "temp.demo.file.bufferedWriter")
public Stream<OutPut> bufferedWriter(@Name("path") String path, @Name("append") boolean append, @Name("txt") String txt) throws GraphDbFileIOException {
try {
BufferedWriterObject writer = fileObject.bufferedWriter(path, append, StandardCharsets.UTF_8);
writer.write(txt);
writer.flush();
try {
writer.close();
} catch (Exception e) {
throw new GraphDbFileIOException(e);
}
return Stream.of(new OutPut(true));
} catch (GraphDbFileIOException e) {
return Stream.of(new OutPut(false));
}
}
public static class OutPut{
public String s;
public boolean b;
public double d;
public List<String> f;
public OutPut(String s) {
this.s = s;
}
public OutPut(double d) {
this.d = d;
}
public OutPut(boolean b) {
this.b = b;
}
public OutPut(List<String> f) {
this.f = f;
}
}
}
6. PAR停止状态
PAR提供一个能即时获取OpenCypher执行状态的接口。通过更改执行状态,用户可以自行实现PAR程序的停止。
- 将OpenCypher状态切换为停止状态的方式
Bolt API调用stopCypher接口。
集群界面或图展示界面OpenCypher语句管理窗口点击停止按钮。
- 示例
PAR注册代码:
@Context
Graph graph;
@Procedure(name = "demo.parStop.test")
@Description("PAR停止任务测试")
public void test() {
int i = 0;
while(i++<100)
// 通过返回的布尔值供用户获取PAR任务状态。true代表OpenCypher当前是停止状态。false代表OpenCypher当前是运行状态。
if (graph.isStop()) {
return;
}
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Bolt执行:
- 开始任务
graph.executeQuery("call demo.parStop.test").list();
- 停止任务
graph.stopCypher(1111111111111L);



