I am trying to run a pregel operation in the small weighted undirected-graph to find the shortest path from node 1 to all other nodes. However I get the following errors:
Create Labels Vertices and edgesCreate RDD for vertices and edges
Create Graph from vertices and edges
(4,1.7976931348623157E308)
(1,0.0)
(5,1.7976931348623157E308)
(6,1.7976931348623157E308)
(2,1.7976931348623157E308)
(3,1.7976931348623157E308)
(7,1.7976931348623157E308)
Edge(1,3,3.6)
Edge(1,5,4.24)
Edge(1,2,5.0)
Edge(2,3,4.24)
Edge(2,4,5.1)
Edge(3,4,4.47)
Edge(3,5,5.0)
Edge(4,6,6.32)
Edge(4,6,5.0)
Edge(4,7,3.16)
Edge(6,7,5.1)
Run pregel over our graph with apply, scatter and gather functions
22/05/23 20:30:55 WARN BlockManager: Putting block rdd_49_0 failed due to exception java.lang.NullPointerException: Cannot invoke "java.lang.Double.equals(Object)" because "message" is null.
22/05/23 20:30:55 WARN BlockManager: Block rdd_49_0 could not be removed as it was not found on disk or in memory
22/05/23 20:30:55 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 52)
java.lang.NullPointerException: Cannot invoke "java.lang.Double.equals(Object)" because "message" is null
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:14)
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:1)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1(GraphOps.scala:227)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1$adapted(GraphOps.scala:225)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:139)
at org.apache.spark.graphx.impl.VertexRDDImpl.$anonfun$leftZipJoin$1(VertexRDDImpl.scala:159)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
22/05/23 20:30:56 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 52) (mainMaster executor driver): java.lang.NullPointerException: Cannot invoke "java.lang.Double.equals(Object)" because "message" is null
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:14)
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:1)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1(GraphOps.scala:227)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1$adapted(GraphOps.scala:225)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:139)
at org.apache.spark.graphx.impl.VertexRDDImpl.$anonfun$leftZipJoin$1(VertexRDDImpl.scala:159)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
22/05/23 20:30:56 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 52) (mainMaster executor driver): java.lang.NullPointerException: Cannot invoke "java.lang.Double.equals(Object)" because "message" is null
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:14)
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:1)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1(GraphOps.scala:227)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1$adapted(GraphOps.scala:225)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:139)
at org.apache.spark.graphx.impl.VertexRDDImpl.$anonfun$leftZipJoin$1(VertexRDDImpl.scala:159)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2291)
at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:161)
at org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:370)
at com.aavash.ann.sparkann.shortestPathDoubleWeight.main(shortestPathDoubleWeight.java:85)
Caused by: java.lang.NullPointerException
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:14)
at com.aavash.ann.pregel.VprogD.apply(VprogD.java:1)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1(GraphOps.scala:227)
at org.apache.spark.graphx.GraphOps.$anonfun$joinVertices$1$adapted(GraphOps.scala:225)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:139)
at org.apache.spark.graphx.impl.VertexRDDImpl.$anonfun$leftZipJoin$1(VertexRDDImpl.scala:159)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
I will post my codes along with the classes that the pregel uses:
- shorestpathDoubleWeight.java
public class shortestPathDoubleWeight implements Serializable {
private static final Object INITIAL_VALUE = Double.MAX_VALUE;
private static final double DMAX_VAL = Double.MAX_VALUE;
public static void main(String[] args) throws IOException {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("ShortestPath");
// SparkConf conf1 = new SparkConf().setMaster("local").setAppName("Graph")
// .set("spark.shuffle.service.enabled", "false").set("spark.driver.blockManager.port", "10026")
// .set("spark.driver.port", "10027").set("spark.cores.max", "3").set("spark.executor.memory", "1G")
// .set("spark.driver.host", "210.107.197.209").set("spark.shuffle.service.enabled", "false")
// .set("spark.dynamicAllocation.enabled", "false").set("spark.shuffle.blockTransferService", "nio");
JavaSparkContext jsp = new JavaSparkContext(conf);
String nodeDatasetFile = "Dataset/TinygraphNodes.txt";
String edgeDataSetFile = "Dataset/TinygraphEdges.txt";
System.err.print("Create Labels Vertices and edges");
Map<Long, Integer> labels = UtilitiesMgmt.readTextNodeReturnImtmap(nodeDatasetFile);
List<Tuple2<Object, Double>> nodes = UtilitiesMgmt.getMapKeysCreateList(labels);
List<Edge<Double>> edges = UtilitiesMgmt.readTextEdgeFileD(edgeDataSetFile);
System.err.println("Create RDD for vertices and edges");
JavaRDD<Tuple2<Object, Double>> nodesRDD = jsp.parallelize(nodes);
JavaRDD<Edge<Double>> connectingEdgesRDD = jsp.parallelize(edges);
Graph<Double, Double> graph = Graph.apply(nodesRDD.rdd(), connectingEdgesRDD.rdd(), 0.0,
StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),
scala.reflect.ClassTag$.MODULE$.apply(Double.class),
scala.reflect.ClassTag$.MODULE$.apply(Double.class));
System.err.println("Create Graph from vertices and edges");
graph.vertices().toJavaRDD().collect().forEach(System.out::println);
graph.edges().toJavaRDD().collect().forEach(System.out::println);
GraphOps ops = new GraphOps(graph, scala.reflect.ClassTag$.MODULE$.apply(Double.class),
scala.reflect.ClassTag$.MODULE$.apply(Double.class));
int srcLabel = labels.get(1L);
System.err.println("Run pregel over our graph with apply, scatter and gather functions");
System.out.println();
JavaRDD<Tuple2<Object, Double>> output_rdd = ops.pregel(Double.MAX_VALUE, Int.MaxValue(), EdgeDirection.Out(),
new VprogD(), new sendMsgD(), new mergeD(), scala.reflect.ClassTag$.MODULE$.apply(Double.class))
.vertices().toJavaRDD();
output_rdd.sortBy(f -> ((Tuple2<Object, Double>) f)._1, true, 0).foreach(v -> {
Tuple2<Object, Double> vertex = (Tuple2<Object, Double>) v;
Long vertexId = (Long) vertex._1;
Double cost = (Double) vertex._2;
int descLabel = labels.get(vertexId);
System.out.println("Minimum cost to get from '" + srcLabel + "' to '" + descLabel + "' is " + cost);
});
}
}
- VProgD
public class VprogD extends AbstractFunction3<Long, Double, Double, Double> implements Serializable {
static final Double INITIAL_VALUE = Double.MAX_VALUE;
@Override
public Double apply(Long vertexId, Double vertexValue, Double message) {
// TODO Auto-generated method stub
if (message.equals(INITIAL_VALUE)) {
return vertexValue;
} else {
return Math.min(vertexValue, message);
}
}
}
- sendMsgD
public class sendMsgD extends AbstractFunction1<EdgeTriplet<Double, Double>, Iterator<Tuple2<Object, Double>>>
implements Serializable {
static final Double INITIAL_VALUE = Double.MAX_VALUE;
@Override
public Iterator<Tuple2<Object, Double>> apply(EdgeTriplet<Double, Double> triplet) {
// TODO Auto-generated method stub
Long srcId = triplet.srcId();
Long dstId = triplet.dstId();
Double weight = triplet.attr();
Double srcVertex = triplet.srcAttr();
Double descVertex = triplet.dstAttr();
if (srcVertex.equals(INITIAL_VALUE)) {
return JavaConverters.asScalaIteratorConverter(new ArrayList<Tuple2<Object, Double>>().iterator())
.asScala();
} else {
double value_to_send = srcVertex + weight;
return JavaConverters
.asScalaIteratorConverter(
Arrays.asList(new Tuple2<Object, Double>(triplet.dstId(), value_to_send)).iterator())
.asScala();
}
}
}
- mergeD
public class mergeD extends AbstractFunction2<Double, Double, Double> implements Serializable {
@Override
public Double apply(Double msg1, Double msg2) {
// TODO Auto-generated method stub
return null;
}
}
Is there a way that I could run this in a loop where every iteration the vertex is changed and it's shortest path to all other vertices is recalculated?