分享

单源最短路径 · spark

 樊清波 2016-12-22
import scala.reflect.ClassTag import org.apache.spark.graphx._ /** * Computes shortest paths to the given set of landmark vertices, returning a graph where each * vertex attribute is a map containing the shortest-path distance to each reachable landmark. */ object ShortestPaths { /** Stores a map from the vertex id of a landmark to the distance to that landmark. */ type SPMap = Map[VertexId, Int] private def makeMap(x: (VertexId, Int)*) = Map(x: _*) private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = (spmap1.keySet ++ spmap2.keySet).map { k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) }.toMap /** * Computes shortest paths to the given set of landmark vertices. * * @tparam ED the edge attribute type (not used in the computation) * * @param graph the graph for which to compute the shortest paths * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each * landmark. * * @return a graph where each vertex attribute is a map containing the shortest-path distance to * each reachable landmark vertex. */ def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = { val spGraph = graph.mapVertices { (vid, attr) => if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap() } val initialMessage = makeMap() def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { addMaps(attr, msg) } def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = { val newAttr = incrementMap(edge.dstAttr) if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) else Iterator.empty } Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) } } 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多