
导读:Spark除了批处理和流处理,还提供了GraphX组件提供图计算。近些年,图计算越来越受到数据分析人员的青睐。图计算目前广泛应用于公安系统和银行金融领域。通过社交网络分析,可以打击犯罪团伙,金融欺诈、信用卡盗刷等。通过人与人之间的关联关系推断,还可以用于理财产品推荐等场景。
图算法
常见的图算法大致可以分为路径搜索算法(例如DFS & BFS、最短路径、 最小生成树、随机游走等)、中心性算法(例如DegreeCentrality、 ClosenessCentrality、BetweennessCentrality、PageRank) 以及社群发现算法(例如 MeasuringAlgorithm、ComponentsAlgorithm、LabelPropagation Algorithm、LouvainModularity Algorithm)。
路径搜索算法建立在图搜索算法的基础上,用来探索节点之间的路径。这些路径从一个节点开始,遍历关系,直到到达目的地。路径搜索算法可以用来进行物流规划,最低成本呼叫或者叫IP路由问题等。
中心性算法用于识别图中特定节点的角色及其对网络的影响。中心性算法能够帮助我们识别最重要的节点,帮助我们了解组动态,例如可信度、可访问性、事物传播的速度以及组与组之间的连接。
社群的形成在各种类型的网络中都很常见。识别社群对于评估群体行为或突发事件至关重要。对于一个社群来说,内部节点与内部节点的关系(边)比社群外部节点的关系更多。识别这些社群可以揭示节点的分群,找到孤立的社群,发现整体网络结构关系。社群发现算法有助于发现社群中群体行为或者偏好,寻找嵌套关系,或者成为其他分析的前序步骤。社群发现算法也常用于网络可视化。
GraphX实战

对于上图,我们要找出5号节点与各个节点的最短路,可以在Spark的GraphX帮助下利用最短路算法来实现。
import org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionobject GraphXTest {def main(args: Array[String]) {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val spark = SparkSession.builder().appName("WordCount").master("local").getOrCreate()val sc = spark.sparkContextval vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(5L, 2L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)println("找出5到各顶点的最短路:")val sourceId: VertexId = 5Lval initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)val sssp = initialGraph.pregel(Double.PositiveInfinity)((id, dist, newDist) => math.min(dist, newDist),triplet => {if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))} else {Iterator.empty}},(a,b) => math.min(a,b))println(sssp.vertices.collect.mkString("\n"))sc.stop()}}
运行结果

总结
本案例只是对GraphX的基本图算法实现进行了演示,更多的图算法实现都可以参照这个流程来实现,用你的智慧去尽情地发掘图网络中的价值吧~
本次的实例代码也已同步至Gitee,欢迎下载调试。
https://gitee.com/doubledue/sparktest

●Spark训练营(一)-- 开发环境搭建及wordCount实战
文章都看完了
不点个
吗
欢迎 点赞、在看、分享 三连哦~~





