暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Galaxybase-S PAR API使用文档

原创 手机用户5009 2022-10-10
826

1. 概述

在Galaxybase-S 提供的APOC库中包含了数百个通用的函数和聚合函数,可用于数据集成、图算法或数据转换等领域。但随着图数据库的应用领域不断扩大,Galaxybase-S 提供的接口和语言已经逐渐不能完全满足所有的业务场景。并且由于驱动接口产生的网络影响以及语言产生的调用链影响,用户不能百分之百地利用Galaxybase-S 计算框架的性能。为了能让用户根据业务定制接口,并且能够充分利用 Galaxybase-S 的计算框架,因此提供了PAR API的高级Java编程方式。PAR全称为Parameterized Algorithm Routine(参数化算法程序)。

2. 使用方式

2.1 准备

PAR API项目需要依赖一个jar包。

  • graphdbapi-par-api-3.4.0.jar:提供了图的基础操作、注解列表等。支持列表请查看graph-api(Graph模型和数据操作)和function(注解列表和变量列表)文档

2.2 编码方式

通过使用注解,实现OpenCypher语句对应的执行自定义函数和过程,选择2.3中的注册方式。可以参考自定义实现demo

2.3 注册方式

2.3.1 内存编译

需要将源码(注:不是整个项目,只用相关.java文件,目录结构没要求)打成zip包通过51314的PAR管理进行上传、上线、使用。

源码格式规则:

  • package需要单独一行

  • 每个import需要单独一行,不允许*的形式

import允许列表,*代表package下面的类:

  • java.util.*
  • java.util.concurrent.*
  • java.util.concurrent.atomic.*
  • java.util.concurrent.locks.*
  • java.util.regex.*
  • java.time.*
  • java.time.chrono.*
  • java.util.stream.*
  • java.time.format.*
  • java.time.zone.*
  • java.math.*
  • java.nio.charset.*
  • java.text.*
  • java.util.function.*
  • com.google.gson.*
  • com.graphdbapi.*
  • com.graphdbapi.procedure.*
  • com.graphdbapi.values.*
  • com.graphdbapi.values.api.*
  • com.graphdbapi.values.exception.*
  • com.graphdbapi.values.storable.*
  • com.graphdbapi.values.util.*
  • com.graphdbapi.values.virtual.*

2.4 执行

在注册好后,需要通过bolt执行OpenCypher语句进行调用。可以往conf/galaxybase.properties下添加log.level=DEBUG来开启DEBUG模式查看注册情况

3. 自定义实现demo

3.1 自定义函数

3.1.1 自定义Scalar函数

用户自定义函数的调用方式与其他OpenCypher函数的调用方式相同,函数名必须确定。用户自定义函数以@UserFunction注解,不会像过程一样返回值流,而只返回单值。

注解列表:

  • Context:资源类注解。默认提供了com.graphdbapi.Graph和com.graphdbapi.GraphLogger、com.graphdbapi.file.FileObject(具体使用请看第五小节)。如需要使用,必须要在字段上加注解。
  • UserFunction:将方法声明为一个函数,表示该方法可通过OpenCypher查询语言调用。@UserFunction默认通过类的包名.类方法名调用,也可以定义@UserFunction(“自定义的OpenCypher”)通过自定义OpenCypher语句调用。必填。
  • Description:对函数的描述。
  • Name:用以界定输入参数的名称,在调用时为函数确定参数值。必填。建议与参数名称相同。

自定义类:

package par.graph;

import com.graphdbapi.GraphLogger;
import com.graphdbapi.Graph;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Name;
import com.graphdbapi.procedure.UserFunction;

public class ScalarTest {

    @Context
    public Graph graph;

    @Context
    public GraphLogger log;

    @UserFunction
    public long valueAdd(@Name("value") Long value) {
        log.info("value add");
        return value + 10;
    }
    
}

执行:

StatementResult statementResult = graph.executeQuery("match (n:个人) where par.graph.valueAdd(n.评分) > 10 return n");
while (statementResult.hasNext()) {
    Record next = statementResult.next();
    System.out.println(next.get(0).asNode());
}

结果:

node<36700160, 个人, {评分=9}>

3.1.2 自定义Aggregating函数

用户自定义聚合函数的调用方式与其他OpenCypher聚合函数的调用方式相同,函数名必须确定。用户自定义聚合函数使用@UserAggregationFunction注解。该注解函数必须返回聚合器类的实例,一个聚合器类包含一个用@UserAggregationUpdate注解的方法和一个用@UserAggregationResult注解的方法。UserAggregationUpdate会被多次调用,使类得以聚合数据;当聚合完成时,将调用一次带有@UserAggregationResult注解的方法,并返回聚合结果。

注解列表:

  • Context:资源类注解。默认提供了com.graphdbapi.Graph和com.graphdbapi.GraphLogger、com.graphdbapi.file.FileObject(具体使用请看第五小节)。如需要使用,必须要在字段上加注解。
  • UserAggregationFunction:将方法声明为一个聚合函数,表示该方法可通过OpenCypher查询语言调用。@UserAggregationFunction默认通过类的包名.类方法名调用,也可以定义@UserAggregationFunction(“自定义的OpenCypher”)通过自定义OpenCypher语句调用。必填。
  • Description:对聚合函数的描述。
  • UserAggregationResult:聚合结果
  • UserAggregationUpdate:聚合具体操作
  • Name:用以界定输入参数的名称,在调用时为函数确定参数值。必填。建议与参数名称相同。

自定义类:

package par.graph;

import com.graphdbapi.GraphLogger;
import com.graphdbapi.Graph;
import com.graphdbapi.values.api.Node;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Name;
import com.graphdbapi.procedure.UserAggregationFunction;
import com.graphdbapi.procedure.UserAggregationResult;
import com.graphdbapi.procedure.UserAggregationUpdate;
import com.graphdbapi.procedure.UserFunction;

public class AggregatingTest {

    @Context
    public Graph graph;

    @Context
    public GraphLogger log;

    @UserAggregationFunction
    public LongStringAggregator longestString() {
        return new LongStringAggregator();
    }

    public static class LongStringAggregator {
        private int longest;
        private String longestString;

        @UserAggregationUpdate
        public void findLongest(@Name("string") String string) {
            if (string != null && string.length() > longest) {
                longest = string.length();
                longestString = string;
            }
        }

        @UserAggregationResult
        public String result() {
            return longestString;
        }
    }
}

执行:

StatementResult statementResult = graph.executeQuery("match (n) return par.graph.longestString(n.姓名)");
while (statementResult.hasNext()) {
    Record next = statementResult.next();
    System.out.println(next.get(0).asString());
}

结果:

郝痴海

3.1.3 单值支持列表

  • String
  • Long
  • Double
  • Boolean
  • java.time.LocalDate
  • java.time.OffsetTime
  • java.time.LocalTime
  • java.time.ZonedDateTime
  • java.time.LocalDateTime
  • com.graphdbapi.values.api.Node
  • com.graphdbapi.values.api.Relationship
  • com.graphdbapi.values.api.Path

3.2 自定义过程实现

将方法声明为过程,这意味着可以从OpenCypher查询语言调用该方法。@Procedure必填字段,指定了调用的方法名,若不填则默认以包名+方法名,建议填写。@Description是方法描述。

注解列表:

  • Context:资源类注解。默认提供了com.graphdbapi.Graph和com.graphdbapi.GraphLogger、com.graphdbapi.file.FileObject(具体使用请看第五小节)。如需要使用,必须要在字段上加注解。
  • Procedure:将方法声明为一个过程,表示该方法可通过OpenCypher查询语言调用。@Procedure默认通过类的包名.类方法名调用,也可以定义@Procedure(“自定义的OpenCypher”)通过自定义OpenCypher语句调用。必填。
  • Description:对过程的描述。
  • Name:用以界定输入参数的名称,在调用时为过程确定参数值。必填。建议与参数名称相同。

自定义类:

package par.graph;

import java.util.stream.Stream;

import com.graphdbapi.GraphLogger;
import com.graphdbapi.Graph;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Description;
import com.graphdbapi.procedure.Procedure;

public class Test {

    @Context
    public Graph graph;

    @Context
    public GraphLogger log;

    @Procedure("par.graph.count")
    @Description("count")
    public Stream<SizeCount> count() {
        return Stream.of(new SizeCount(graph.getAllVertexCount()));
    }

}

自定义返回值对象:

package par.graph;

public class SizeCount {
    public Long out;

    public SizeCount(Long out) {
        this.out = out;
    }
}

3.2.1自定义过程

执行:

StatementResult statementResult = graph.executeQuery("call par.graph.count()");

while (statementResult.hasNext()) {
	Record next = statementResult.next();
	System.out.println(next.get("out").asLong());
}

结果:

4500

3.2.2 返回值支持列表

  • boolean
  • long
  • byte[]
  • com.graphdbapi.values.storable.CypherBoolean
  • com.graphdbapi.values.storable.CypherDate
  • com.graphdbapi.values.storable.CypherDouble
  • com.graphdbapi.values.storable.CypherDuration
  • com.graphdbapi.values.storable.CypherFloat
  • com.graphdbapi.values.storable.CypherInt
  • com.graphdbapi.values.storable.CypherLong
  • com.graphdbapi.values.storable.CypherNumber
  • com.graphdbapi.values.storable.CypherText
  • com.graphdbapi.values.virtual.CypherList
  • com.graphdbapi.values.virtual.CypherMap
  • com.graphdbapi.values.virtual.CypherPath
  • com.graphdbapi.values.virtual.CypherNode:可以通过com.graphdbapi.values.util.ConvertUtil的nodeValue()将Vertex转成该对象传输
  • com.graphdbapi.values.virtual.CypherRelationship:可以通过com.graphdbapi.values.util.ConvertUtil的relationshipValue()将Edge转成该对象传输
  • double, java.lang.Boolean
  • java.lang.Double
  • java.lang.Long
  • java.lang.Number
  • java.lang.Object
  • java.lang.String
  • java.time.LocalDate
  • java.time.LocalDateTime
  • java.time.LocalTime
  • java.time.OffsetTime
  • java.time.ZonedDateTime
  • java.time.temporal.TemporalAmount
  • java.util.List
  • java.util.Map

定义返回值:

public class ValueResult {
    public boolean b1;
    public byte[] b2;
    public CypherBoolean b3;
    public CypherDate b4;
    public CypherDouble b5;
    public CypherDuration b6;
    public CypherFloat b7;
    public CypherInt b8;
    public CypherLong b9;
    public CypherNumber b10;
    public CypherText x1;
    public CypherList x2;
    public CypherMap x3;
    public CypherNode x4;
    public CypherRelationship x5;
    public Boolean a1;
    public Double a2;
    public Long a3;
    public Number a4;
    public Object a5;
    public String a6;
    public LocalDate a7;
    public LocalDateTime a8;
    public LocalTime a9;
    public OffsetTime a10;
    public ZonedDateTime a11;

    public ValueResult(CypherRelationship relationshipValue) {
        b1 = true;
        b2 = new byte[]{(byte)1, (byte)2};
        b3 = CypherValues.booleanValue(true);
        b4 = CypherDate.date(LocalDate.of(2010, 10, 10));
        b5 = CypherValues.doubleValue(0.1);
        b6 = CypherDuration.duration(10, 10, 10, 10);
        b7 = CypherValues.floatValue(0.2f);
        b8 = CypherValues.intValue(1);
        b9 = CypherValues.longValue(2l);
        b10 = CypherValues.numberValue((short) 10);
        x1 = CypherValues.stringValue("test");
        x2 = CypherVirtuals.list(b8, b8);
        x3 = CypherVirtuals.map(Collections.singletonMap("String", b10));
        x4 = CypherVirtuals.nodeValue(0, CypherValues.stringValue("test"), x3);
        x5 = relationshipValue;

        a1 = false;
        a2 = 1.1;
        a3 = 1l;
        a4 = 1;
        a5 = "d";
        a6 = "test";
        a7 = LocalDate.of(2010, 10, 10);
        a8 = LocalDateTime.now();
        a9 = LocalTime.now();
        a10 = OffsetTime.now();
        a11 = ZonedDateTime.now();
    }
}

执行并获取值:

while (statementResult.hasNext()) {
    Record next = statementResult.next();
    System.out.println(next.get("b1").asBoolean());
    System.out.println(Arrays.toString(next.get("b2").asByteArray()));
    System.out.println(next.get("b3").asBoolean());
    System.out.println(next.get("b4").asLocalDate().getYear());
    System.out.println(next.get("b5").asDouble());
    System.out.println(next.get("b6").asIsoDuration());
    System.out.println(next.get("b7").asFloat());
    System.out.println(next.get("b8").asInt());
    System.out.println(next.get("b9").asLong());
    System.out.println(next.get("b10").asNumber().shortValue());
    System.out.println(next.get("x1").asString());
    System.out.println(next.get("x2").asList());
    System.out.println(next.get("x3").asMap());
    System.out.println(next.get("x4").asNode());
    System.out.println(next.get("x5").asRelationship());

    System.out.println(next.get("a1").asBoolean());
    System.out.println(next.get("a2").asDouble());
    System.out.println(next.get("a3").asLong());
    System.out.println(next.get("a4").asNumber());
    System.out.println(next.get("a5").asObject());
    System.out.println(next.get("a6").asString());
    System.out.println(next.get("a7").asLocalDate());
    System.out.println(next.get("a8").asLocalDateTime());
    System.out.println(next.get("a9").asLocalTime());
    System.out.println(next.get("a10").asOffsetTime());
    System.out.println(next.get("a11").asZonedDateTime());
}

4. 操作PAR

4.1 查询

4.1.1 查询自定义函数

句法:call dbms.functions()

返回值:函数列表

执行:

StatementResult statement = graph.executeQuery("call dbms.functions()");
while (statement.hasNext()) {
    Record next = statement.next();
    System.out.println(next);
}

结果:

Record<{name: "par.graph.size", signature: "par.graph.size(value :: INTEGER?) :: (INTEGER?)", description: ""}>
...

4.1.2 查询自定义聚合函数

句法:call dbms.aggregationFunctions()

返回值:聚合函数列表

执行:

StatementResult statement = graph.executeQuery("call dbms.aggregationFunctions()");
while (statement.hasNext()) {
    Record next = statement.next();
    System.out.println(next);
}

结果:

Record<{name: "par.graph.longestString", signature: "par.graph.longestString(string :: STRING?) :: (STRING?)", description: ""}>
...

4.1.3 查询自定义过程

句法:call dbms.procedures()

返回值:过程列表

执行:

StatementResult statement = graph.executeQuery("call dbms.procedures()");
while (statement.hasNext()) {
    Record next = statement.next();
    System.out.println(next);
}

结果:

Record<{name: "Aggregate", signature: "Aggregate(graphName :: STRING?, property :: STRING?, vertexTypes :: LIST? OF STRING?, edgeTypes :: LIST? OF STRING?, functions = [] :: LIST? OF STRING?) :: (value :: STRING?)", description: "execute Aggregate."}>
...

4.2 删除

4.2.1 删除自定义函数

句法:call remove.function("函数")

返回值:函数

执行:

StatementResult statement = graph.executeQuery("call remove.function('par.graph.size')");
while (statement.hasNext()) {
    Record next = statement.next();
    System.out.println(next);
}

结果:

Record<{name: "par.graph.size", signature: "par.graph.size(value :: INTEGER?) :: (INTEGER?)", description: ""}>

4.2.2 删除自定义聚合函数

句法:call remove.aggregationFunction("聚合函数")

返回值:聚合函数

执行:

StatementResult statement = graph.executeQuery("call remove.aggregationFunction('par.graph.longestString')");
while (statement.hasNext()) {
    Record next = statement.next();
    System.out.println(next);
}

结果:

Record<{name: "par.graph.longestString", signature: "par.graph.longestString(string :: STRING?) :: (STRING?)", description: ""}>

4.2.3 删除自定义过程

句法:call remove.procedure("过程")

返回值:过程

执行:

StatementResult statement = graph.executeQuery("call remove.procedure('par.graph.count')");
while (statement.hasNext()) {
    Record next = statement.next();
    System.out.println(next);
}

结果:

Record<{name: "par.graph.count", signature: "par.graph.count()", description: ""}>

5. File操作

5.1 使用方式

通过 Context:资源类注解com.graphdbapi.file.FileObject,可通过传入的路径,对相应的文件进行相应的操作。(相关接口参考javadoc文档)

可操作的文件范围在pars/work/下的文件,传输路径示例:(pars/work/test/test.txt)则path为:(test/test.txt)

其辅助使用的类有com.graphdbapi.file.BufferedReaderObject、com.graphdbapi.file.BufferedWriterObject、com.graphdbapi.file.GraphDbFileIOException。

对于BufferedReaderObject、BufferedWriterObject的对象可通过FileObject对象相应的方法设置文件路径、字符集格式来获取,GraphDbFileIOException为IO异常所抛出的异常类型。

5.2 示例

package temp;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Future;
import java.util.stream.Stream;

import com.graphdbapi.file.BufferedReaderObject;
import com.graphdbapi.file.BufferedWriterObject;
import com.graphdbapi.file.FileObject;
import com.graphdbapi.file.GraphDbFileIOException;
import com.graphdbapi.file.RandomAccessFileObject;
import com.graphdbapi.procedure.Context;
import com.graphdbapi.procedure.Description;
import com.graphdbapi.procedure.Name;
import com.graphdbapi.procedure.Procedure;



public class TestDemo {

    @Context
    public FileObject fileObject;

    @Procedure(name = "temp.show.all.file")
    @Description("查看指定目录下所有文件")
    public Stream<OutPut> list(@Name("path") String path) throws GraphDbFileIOException {
        List<String> list = fileObject.list(path);
        return Stream.of(new OutPut(list));
    }

    @Procedure(name = "temp.demo.file.isExist")
    @Description("查看指定路径是否存在")
    public Stream<OutPut> isExist(@Name("path") String path) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.isExist(path)));
    }

    @Procedure(name = "temp.demo.file.isDir")
    @Description(value = "查看指定路径是否是文件夹")
    public Stream<OutPut> isDir(@Name("path") String path) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.isDir(path)));
    }

    @Procedure(name = "temp.demo.file.createFile")

    public Stream<OutPut> createFile(@Name("path") String path) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.createFile(path)));
    }

    @Procedure(name = "temp.demo.file.createDir")

    public Stream<OutPut> createDir(@Name("path") String path) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.createDir(path)));
    }

    @Procedure(name = "temp.demo.file.delete")
    public Stream<OutPut> delete(@Name("path") String path) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.delete(path)));
    }

    @Procedure(name = "temp.demo.file.deleteForce")
    public Stream<OutPut> deleteForce(@Name("path") String path) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.deleteForce(path)));

    }

    @Procedure(name = "temp.demo.file.copy")
    public Stream<OutPut> copy(@Name("path") String path, @Name("newPath") String newPath) throws GraphDbFileIOException {
        return Stream.of(new OutPut(fileObject.copy(path, newPath)));
    }

    @Procedure(name = "temp.demo.file.bufferedReader")
    public Stream<OutPut> bufferedReader( @Name("path") String path) throws GraphDbFileIOException {
        BufferedReaderObject reader = fileObject.bufferedReader(path, StandardCharsets.UTF_8);
        String s = reader.readLine();
        return Stream.of(new OutPut(s));

    }

    @Procedure(name = "temp.demo.file.bufferedWriter")
    public Stream<OutPut> bufferedWriter(@Name("path") String path, @Name("append") boolean append, @Name("txt") String txt) throws GraphDbFileIOException {
        try {
        BufferedWriterObject writer = fileObject.bufferedWriter(path, append, StandardCharsets.UTF_8);
            writer.write(txt);
            writer.flush();
            try {
                writer.close();
            } catch (Exception e) {
                throw new GraphDbFileIOException(e);
            }
            return Stream.of(new OutPut(true));
        } catch (GraphDbFileIOException e) {
            return Stream.of(new OutPut(false));
        }

    }

    public static class OutPut{
        public String s;
        public boolean b;
        public double d;
        public List<String>  f;

        public OutPut(String s) {
            this.s = s;
        }

        public OutPut(double d) {
            this.d = d;
        }

        public OutPut(boolean b) {
            this.b = b;
        }

        public OutPut(List<String> f) {
            this.f = f;
        }
    }

}

6. PAR停止状态

PAR提供一个能即时获取OpenCypher执行状态的接口。通过更改执行状态,用户可以自行实现PAR程序的停止。

  • 将OpenCypher状态切换为停止状态的方式
  1. Bolt API调用stopCypher接口。

  2. 集群界面或图展示界面OpenCypher语句管理窗口点击停止按钮。

  • 示例

PAR注册代码:

@Context
Graph graph;

@Procedure(name = "demo.parStop.test")
@Description("PAR停止任务测试")
public void test() {
    int i = 0;
    while(i++<100) 
		// 通过返回的布尔值供用户获取PAR任务状态。true代表OpenCypher当前是停止状态。false代表OpenCypher当前是运行状态。
        if (graph.isStop()) { 
            return;
        }
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(i);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Bolt执行:

  • 开始任务
graph.executeQuery("call demo.parStop.test").list();
  • 停止任务
graph.stopCypher(1111111111111L);
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论