前言
图分析是数据科学中的一个重要领域,其专注于通过图结构来表示数据,并执行各种计算和分析任务。图结构由节点(或称为顶点)和边组成,节点通常代表实体,边表示实体之间的关系。图计算广泛应用于社交网络分析、推荐系统、知识图谱、路径优化等多个领域。
PolarDB PostgreSQL版(下文简称为 PolarDB)是一款阿里云自主研发的云原生关系型数据库产品,100% 兼容 PostgreSQL,高度兼容Oracle语法(公有云版支持Oracle语法);采用基于 Shared-Storage 的存储计算分离架构,具有极致弹性、毫秒级延迟、HTAP 的能力和高可靠、高可用、弹性扩展等企业级数据库特性。同时,PolarDB 具有大规模并行计算能力,可以应对OLTP与OLAP混合负载。
本文介绍的图分析能力,依托阿里云云原生关系型数据库PolarDB PostgreSQL版建设输出。
业务场景
背景信息
保险理赔欺诈一般根据保险提供商所具备的患者、疾病和索赔等数据,分析与被保人相关的理赔单、疾病等实体的关联关系,识别异常理赔记录,发现欺诈团伙。
数据和模型
数据来源自以保险领域公开数据集。
数据包括保险行业基本元素,可以将数据模型抽象为下图:
- 点:投保人(policyholder)、保单(incharge)、理赔(claim)、病人(patient)、疾病(disease)。
- 边:疾病-病人(has_disease)、投保人-理赔(policyholder_of_claim)、保单-理赔(inchagre_of_claim)、病人-理赔(insured_of_claim)、相似理赔(similar_claim)、投保人关联(policyholder_connection)。
- 属性:姓名(name),是否高危(high_risk)、风险分数(risk_score)、疾病名称(disease_name)、相似度(similarity_score)、关联等级(level)、理赔时间(claim_date)、保额(charge)等。

部分数据如下所示:
- POLICYHOLDER(投保人)
"POLICYHOLDER_ID","FNAME","LNAME","RISK_SCORE","HIGH_RISK"
PH3068,ADAM,OCHSENBEIN,88,1
PH3069,MALINDA,MEHSERLE,42,0
PH3070,SANDRA,KUHTA,20,0
PH3071,DORA,TAHU,62,1
...
- CLAIM(理赔)
"CLAIM_ID","CHARGE","CLAIM_DATE","DURATION","INSURED_ID","DIAGNOSIS","PERSON_INCHARGE_ID","TYPE","POLICYHOLDER_ID"
C3571,6517.53,2013-08-11 00:00:00,13,"28523",no exception,PI23070,services,PH9507
C3572,49273.65,2017-02-10 00:00:00,3,"1220",no exception,PI21197,services,PH406
C3573,52005.98,2014-06-29 00:00:00,27,"23735",no exception,PI22361,services,PH7911
- POLICYHOLDER 和 CLAIM的关联关系
"CLAIM_ID","POLICYHOLDER_ID"
C1528,PH2963
C1529,PH1353
C1530,PH1071
最佳实践
技术实现
PolarDB 图数据库引擎AGE(A Graph Extension) 是一个为 PostgreSQL系列数据库打造的扩展,旨在增强其处理图数据的能力。AGE 旨在结合关系型数据库与图数据库的优势,提供一个高性能、灵活且易于扩展的解决方案。
AGE主要包含以下特点
- 完全兼容PostgreSQL
AGE 是PolarDB PostgreSQL版的一个扩展,这意味着可以在现有的PolarDB数据库中使用,而无需重新构建数据库。AGE 继承了 PolarDB 的所有强大功能,包括事务、并发控制、以及多种索引和优化技术。
- 统一的图形和关系型查询
AGE 允许同时处理关系型数据和图形数据,可以在同一个查询中混合使用 SQL 和图查询语言。这使得处理复杂的数据模型更加容易和高效。
- 支持 Cypher 查询语言
AGE 支持使用 Cypher 查询语言,这是一种专为图数据库设计的查询语言,语法简单且灵活。为用户提供了一种直观的方式来进行图数据的查询和操作。
- 高性能
通过结合 PolarDB 的优化技术和专为图数据设计的索引,AGE 能够高效地处理大规模图形数据和复杂的图形查询。
如上所述, 借助于AGE强大的能力,PolarDB可以简单、高效地处理各类图查询。

建议配置
为了得到良好的体验,建议使用以下配置:
| 项目 | 推荐配置 |
|---|---|
| PolarDB 版本 | 标准版 兼容PostgreSQL 14 |
| CPU | >16 Core |
| 内存 | >64 GB |
| 磁盘 | >100GB (AUTOPL) |
| 版本 | >2.0.14.23.1 |
数据库准备
数据库中需要预先创建age扩展
create extension age;
在每个会话执行时,需要设置search_path并执行SQL来加载扩展:
SET search_path = ag_catalog, "$user", public;
select * from get_cypher_keywords() limit 0;
如不想在每个会话中设置search_path,可对数据库进行该项操作
ALTER DATABASE xxx
SET search_path = ag_catalog, "$user", public;
数据入库
创建图
使用函数create_graph 可以创建一个图,创建时需要指定图的名称。
SELECT create_graph('graph');
插入节点和边
由于下载的数据为csv文件,不包含所需的id信息,因此需要将数据进行转换后进行入库操作。
本文附录中提供了将数据转换为PolarDB中vertex和edge的的python脚本,转换后如下所示:
- POLICYHOLDER(投保人)
SELECT create_vlabel('graph','policyholder');
SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3068',fname:'ADAM',lname:'OCHSENBEIN',risk_score:'88',high_risk:'1'}) $$ ) as (n agtype);
SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3069',fname:'MALINDA',lname:'MEHSERLE',risk_score:'42',high_risk:'0'}) $$ ) as (n agtype);
SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3070',fname:'SANDRA',lname:'KUHTA',risk_score:'20',high_risk:'0'}) $$ ) as (n agtype);
...
- CLAIM(理赔)
- Create vlabel
SELECT create_vlabel('graph','claim');
SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3571',charge:'6517.53',claim_date:'2013-08-11 00:00:00',duration:'13',insured_id:'28523',diagnosis:'no exception',person_incharge_id:'PI23070',type:'services',policyholder_id:'PH9507'}) $$ ) as (n agtype);
SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3572',charge:'49273.65',claim_date:'2017-02-10 00:00:00',duration:'3',insured_id:'1220',diagnosis:'no exception',person_incharge_id:'PI21197',type:'services',policyholder_id:'PH406'}) $$ ) as (n agtype);
SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3573',charge:'52005.98',claim_date:'2014-06-29 00:00:00',duration:'27',insured_id:'23735',diagnosis:'no exception',person_incharge_id:'PI22361',type:'services',policyholder_id:'PH7911'}) $$ ) as (n agtype);
...
- POLICYHOLDER 和 CLAIM的关联关系
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1528' AND b.policyholder_id = 'PH2963' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype);
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1529' AND b.policyholder_id = 'PH1353' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype);
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1530' AND b.policyholder_id = 'PH1071' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype);
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1531' AND b.policyholder_id = 'PH8102' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype);
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1532' AND b.policyholder_id = 'PH4768' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype);
...
将转换后的结果保存为sql文件,配合客户端工具,如psql等可完成数据导入。
使用示例
简单查询
数据统计
- 各种类型节点数量
SELECT count(*) FROM cypher('graph', $$
MATCH (v)
RETURN v
$$) as (v agtype);
count
--------
120567
-- claim
SELECT count(*) FROM cypher('graph', $$
MATCH (v:claim)
RETURN v
$$) as (v agtype);
count
--------
100001
-- policyholder
SELECT count(*) FROM cypher('graph', $$
MATCH (v:policyholder)
RETURN v
$$) as (v agtype);
count
-------
10006
-- incharge
SELECT count(*) FROM cypher('graph', $$
MATCH (v:incharge)
RETURN v
$$) as (v agtype);
count
-------
10001
--disease
SELECT count(*) FROM cypher('graph', $$
MATCH (v:disease)
RETURN v
$$) as (v agtype);
count
-------
393
--patient
SELECT count(*) FROM cypher('graph', $$
MATCH (v:patient)
RETURN v
$$) as (v agtype);
count
-------
166
过滤查询、排序查询
- 查询理赔单C4377的投保、理赔、被保情况
SELECT 'policyholder_id' as type, policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(policyholder:policyholder)
RETURN policyholder.policyholder_id
$$) as (policyholder_id agtype)
UNION
SELECT 'incharge_id', incharge_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(v:incharge)
RETURN v.incharge_id
$$) as (incharge_id agtype)
UNION
SELECT 'patient_id', patient_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(v:patient)
RETURN v.patient_id
$$) as (patient_id agtype);
type | policyholder_id
-----------------+-----------------
patient_id | "11279"
policyholder_id | "PH3759"
incharge_id | "PI26607"
通用场景
K阶邻居
- 已知保单C4377为欺诈保单,查询和理赔单C4377有相同理赔病人的理赔单,说明该理赔人有涉嫌骗保的嫌疑。
SELECT 'claim_id', claim_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(p:patient)<-[]-(c:claim)
RETURN c.claim_id
$$) as (claim_id agtype);
?column? | claim_id
----------+----------
claim_id | "C28963"
claim_id | "C3679"
claim_id | "C96545"
claim_id | "C26586"
claim_id | "C26754"
claim_id | "C87278"
claim_id | "C87603"
claim_id | "C69395"
claim_id | "C67594"
claim_id | "C96155"
claim_id | "C10160"
- 查询已知欺诈保单C4377的投保人的社交关系,可以对这些人的理赔情况提前预警。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(a:policyholder)-[r*1..3]->(p:policyholder)
RETURN p.policyholder_id
$$) as (policyholder_id agtype);
?column? | policyholder_id
-----------------+-----------------
policyholder_id | "PH52532"
policyholder_id | "PH11283"
policyholder_id | "PH11328"
policyholder_id | "PH1"
policyholder_id | "PH5"
policyholder_id | "PH512"
policyholder_id | "PH1569"
policyholder_id | "PH4722"
policyholder_id | "PH4731"
路径检索
- 查询投保人PH3759和投保人PH4722的路径,分析投保人之间的关联关系。
SELECT *
FROM cypher('graph', $$
MATCH path = (:policyholder {policyholder_id: 'PH3759'})-[r*1..3]->(:policyholder {policyholder_id: 'PH4722'})
RETURN path
$$) AS (v agtype);
-------
[{"id": 844424930136988, "label": "policyholder", "properties": {"fname": "KURTIS", "lname": "ALKEMA", "high_risk": "1", "risk_score": "78", "policyholder_id": "PH3759"}}::vertex, {"id": 2251799813685487, "label": "RELTYPE", "end_id": 844424930133473, "start_id": 844424930136988, "properties": {"level": "65"}}::edge, {"id": 844424930133473, "label": "policyholder", "properties": {"fname": "TERRA", "lname": "SWARB", "high_risk": "0", "risk_score": "25", "policyholder_id": "PH512"}}::vertex, {"id": 2251799813685546, "label": "RELTYPE", "end_id": 844424930138502, "start_id": 844424930133473, "properties": {"level": "62"}}::edge, {"id": 844424930138502, "label": "policyholder", "properties": {"fname": "VETA", "lname": "SEDLACK", "high_risk": "0", "risk_score": "31", "policyholder_id": "PH1569"}}::vertex, {"id": 2251799813685594, "label": "RELTYPE", "end_id": 844424930136281, "start_id": 844424930138502, "properties": {"level": "92"}}::edge, {"id": 844424930136281, "label": "policyholder", "properties": {"fname": "DEANNA", "lname": "BALSER", "high_risk": "0", "risk_score": "36", "policyholder_id": "PH4722"}}::vertex]::path
共同邻居
- 查询保单C4377和保单C67594的共同邻居,从而找到两个保单的共同投保人。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(:claim {claim_id: 'C67594'})
RETURN p.policyholder_id
$$) as (policyholder_id agtype);
?column? | policyholder_id
-----------------+-----------------
policyholder_id | "PH3759"
协同推荐
- 已知保单C4377为欺诈保单,查找和保单C4377有共同投保人的保单,从而找到欺诈疑似涉诈保单。
SELECT 'claim_id', claim_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(c:claim)
RETURN c.claim_id
$$) as (claim_id agtype);
?column? | claim_id
----------+----------
claim_id | "C28963"
claim_id | "C96545"
claim_id | "C3679"
claim_id | "C87603"
claim_id | "C26754"
claim_id | "C26586"
claim_id | "C87278"
claim_id | "C69395"
claim_id | "C67594"
claim_id | "C96155"
claim_id | "C10160"
- 与已知涉诈保单 C4377 相似度最大的保单,返回前20个:
with t as (
SELECT claim_id, replace(trim(both '"' from to_jsonb(properties)::text), '\"', '"') as similarity FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[e]->(c:claim)
RETURN properties(e), c.claim_id
$$) as (properties agtype, claim_id agtype)
)
SELECT claim_id, replace((similarity::jsonb->'similarity_score')::text, '"','')::integer as s
from t
ORDER BY s DESC
LIMIT 20;
claim_id | s
----------+----
"C67594" | 13
"C69395" | 13
"C10160" | 13
"C87603" | 13
"C28963" | 13
"C3679" | 13
"C26754" | 13
"C96155" | 13
"C26586" | 13
"C87278" | 13
"C96545" | 13
"C20113" | 8
"C70759" | 8
"C28785" | 8
"C12793" | 8
"C59736" | 8
"C38059" | 8
"C34068" | 8
"C71827" | 8
"C15760" | 8
总结
本文介绍了如何利用阿里云云原生关系型数据库PolarDB PostgreSQL版的图分析能力来进行图数据分析。
PolarDB结合AGE扩展,提供了图数据计算分析的功能, 包括使用Cypher查询语言,高效处理查询图数据。本文以公开的保险数据集为例,示例了在保险理赔场景下,执行图查询来发现异常理赔记录和欺诈团伙:例如,查询与欺诈保单有相同理赔病人的其他保单,或者找出欺诈保单的投保人社交关系,以便进行欺诈预警。PolarDB在关系型数据库的基础上,提供了图分析能力,为企业的统一数据管理和分析,提供了强有力的支撑。
试用体验
欢迎访问PolarDB免费试用页面,选择试用“云原生数据库PolarDB PostgreSQL版”,体验PolarDB的图计算能力
附录
数据转换脚本
import csv
import os
def convert_vertex_csv(file_path, graph):
file_name = os.path.splitext(os.path.basename(file_path))[0].lower()
# create vlabel
print("------------------------------------------------")
print("-- Create vlabel")
print("SELECT create_vlabel('{}','{}');".format(graph, file_name))
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
header = next(reader)
for row in reader:
p = ""
for h in header:
if p != "":
p += ","
else:
p += "{"
p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip())
if p != "":
p += "}"
print("SELECT * FROM cypher('{}', $$ CREATE (:{} {}) $$ ) as (n agtype);".format(graph, file_name, p))
def convert_edge_csv(file_path, graph, from_type, to_type):
file_name = os.path.splitext(os.path.basename(file_path))[0].lower()
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
header = next(reader)
for row in reader:
p = ""
for h in header:
if (h.endswith("ID")):
continue;
if p != "":
p += ","
else:
p += "{"
p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip())
if p != "":
p += "}"
print("SELECT * FROM cypher('{0}', $$ MATCH (a:{1}), (b:{2}) WHERE a.{1}_id = '{3}' AND "
"b.{2}_id = '{4}' CREATE (a)-[e:RELTYPE {5} ]->(b) RETURN e$$) as (e agtype);".format(graph, from_type, to_type, row[0].strip(), row[1].strip(), p))
def generate_graph_csv(directory, graph):
print("------------------------------------------------")
print("-- Create graph")
print("SELECT create_graph('{}');".format(graph))
print("------------------------------------------------")
print("-- Create vertex")
convert_vertex_csv(directory + "/POLICYHOLDER.csv", graph)
convert_vertex_csv(directory + "/INCHARGE.csv", graph)
convert_vertex_csv(directory + "/PATIENT.csv", graph)
convert_vertex_csv(directory + "/CLAIM.csv", graph)
convert_vertex_csv(directory + "/DISEASE.csv", graph)
print("------------------------------------------------")
print("-- Create edge")
convert_edge_csv(directory + "/POLICYHOLDER_CONNECTION.csv", graph, 'policyholder','policyholder')
convert_edge_csv(directory + "/INCHARGE_OF_CLAIM.csv", graph,'claim', 'incharge')
convert_edge_csv(directory + "/CLAIM_SIMILARITY.csv", graph, 'claim','claim')
convert_edge_csv(directory + "/POLICYHOLDER_OF_CLAIM.csv", graph,'claim', 'policyholder')
convert_edge_csv(directory + "/INSURED_OF_CLAIM.csv", graph, 'claim','patient')
convert_edge_csv(directory + "/HAS_DISEASE.csv", graph, 'patient','disease')
generate_graph_csv("analyzing-insurance-claims-using-ibm-db2-graph-master/data", "graph")




