COMBINATORIAL_BLAS  1.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Graph500.cpp
Go to the documentation of this file.
1 #define DETERMINISTIC
2 #include "../CombBLAS.h"
3 #include <mpi.h>
4 #include <sys/time.h>
5 #include <iostream>
6 #include <functional>
7 #include <algorithm>
8 #include <vector>
9 #include <string>
10 #include <sstream>
11 #ifdef THREADED
12  #ifndef _OPENMP
13  #define _OPENMP
14  #endif
15 #endif
16 
17 #ifdef _OPENMP
18  #include <omp.h>
19 #endif
20 
26 #ifdef _OPENMP
27 int cblas_splits = omp_get_max_threads();
28 #else
29 int cblas_splits = 1;
30 #endif
31 
32 #define ITERS 16
33 #define EDGEFACTOR 16
34 using namespace std;
35 
36 // 64-bit floor(log2(x)) function
37 // note: least significant bit is the "zeroth" bit
38 // pre: v > 0
39 unsigned int highestbitset(uint64_t v)
40 {
41  // b in binary is {10,1100, 11110000, 1111111100000000 ...}
42  const uint64_t b[] = {0x2ULL, 0xCULL, 0xF0ULL, 0xFF00ULL, 0xFFFF0000ULL, 0xFFFFFFFF00000000ULL};
43  const unsigned int S[] = {1, 2, 4, 8, 16, 32};
44  int i;
45 
46  unsigned int r = 0; // result of log2(v) will go here
47  for (i = 5; i >= 0; i--)
48  {
49  if (v & b[i]) // highestbitset is on the left half (i.e. v > S[i] for sure)
50  {
51  v >>= S[i];
52  r |= S[i];
53  }
54  }
55  return r;
56 }
57 
58 template <class T>
59 bool from_string(T & t, const string& s, std::ios_base& (*f)(std::ios_base&))
60 {
61  istringstream iss(s);
62  return !(iss >> f >> t).fail();
63 }
64 
65 
66 template <typename PARMAT>
67 void Symmetricize(PARMAT & A)
68 {
69  // boolean addition is practically a "logical or"
70  // therefore this doesn't destruct any links
71  PARMAT AT = A;
72  AT.Transpose();
73  A += AT;
74 }
75 
80 struct prunediscovered: public std::binary_function<int64_t, int64_t, int64_t >
81 {
82  int64_t operator()(int64_t x, const int64_t & y) const
83  {
84  return ( y == -1 ) ? x: -1;
85  }
86 };
87 
88 int main(int argc, char* argv[])
89 {
90  int nprocs, myrank;
91  MPI_Init(&argc, &argv);
92  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
93  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
94  if(argc < 3)
95  {
96  if(myrank == 0)
97  {
98  cout << "Usage: ./Graph500 <Force,Input> <Scale Forced | Input Name> {FastGen}" << endl;
99  cout << "Example: ./Graph500 Force 25 FastGen" << endl;
100  }
101  MPI_Finalize();
102  return -1;
103  }
104  {
107  typedef SpParMat < int64_t, bool, SpDCCols<int32_t,bool> > PSpMat_s32p64; // sequentially use 32-bits for local matrices, but parallel semantics are 64-bits
108  typedef SpParMat < int64_t, int, SpDCCols<int32_t,int> > PSpMat_s32p64_Int; // similarly mixed, but holds integers as upposed to booleans
110 
111  // Declare objects
112  PSpMat_Bool A;
113  PSpMat_s32p64 Aeff;
114  FullyDistVec<int64_t, int64_t> degrees; // degrees of vertices (including multi-edges and self-loops)
115  FullyDistVec<int64_t, int64_t> nonisov; // id's of non-isolated (connected) vertices
116  unsigned scale;
117  OptBuf<int32_t, int64_t> optbuf; // let indices be 32-bits
118  bool scramble = false;
119 
120  if(string(argv[1]) == string("Input")) // input option
121  {
122  A.ReadDistribute(string(argv[2]), 0); // read it from file
123  SpParHelper::Print("Read input\n");
124 
125  PSpMat_Int64 * G = new PSpMat_Int64(A);
126  G->Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // identity is 0
127  delete G;
128 
129  Symmetricize(A); // A += A';
131  A.Reduce(*ColSums, Column, plus<int64_t>(), static_cast<int64_t>(0)); // plus<int64_t> matches the type of the output vector
132  nonisov = ColSums->FindInds(bind2nd(greater<int64_t>(), 0)); // only the indices of non-isolated vertices
133  delete ColSums;
134  A = A(nonisov, nonisov);
135  Aeff = PSpMat_s32p64(A);
136  A.FreeMemory();
137  SpParHelper::Print("Symmetricized and pruned\n");
138 
139  Aeff.OptimizeForGraph500(optbuf); // Should be called before threading is activated
140  #ifdef THREADED
141  ostringstream tinfo;
142  tinfo << "Threading activated with " << cblas_splits << " threads" << endl;
143  SpParHelper::Print(tinfo.str());
144  Aeff.ActivateThreading(cblas_splits);
145  #endif
146  }
147  else if(string(argv[1]) == string("Binary"))
148  {
149  uint64_t n, m;
150  from_string(n,string(argv[3]),std::dec);
151  from_string(m,string(argv[4]),std::dec);
152 
153  ostringstream outs;
154  outs << "Reading " << argv[2] << " with " << n << " vertices and " << m << " edges" << endl;
155  SpParHelper::Print(outs.str());
156  DistEdgeList<int64_t> * DEL = new DistEdgeList<int64_t>(argv[2], n, m);
157  SpParHelper::Print("Read binary input to distributed edge list\n");
158 
159  PermEdges(*DEL);
160  SpParHelper::Print("Permuted Edges\n");
161 
162  RenameVertices(*DEL);
163  //DEL->Dump32bit("graph_permuted");
164  SpParHelper::Print("Renamed Vertices\n");
165 
166  // conversion from distributed edge list, keeps self-loops, sums duplicates
167  PSpMat_Int64 * G = new PSpMat_Int64(*DEL, false);
168  delete DEL; // free memory before symmetricizing
169  SpParHelper::Print("Created Int64 Sparse Matrix\n");
170 
171  G->Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // Identity is 0
172 
173  A = PSpMat_Bool(*G); // Convert to Boolean
174  delete G;
175  int64_t removed = A.RemoveLoops();
176 
177  ostringstream loopinfo;
178  loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
179  SpParHelper::Print(loopinfo.str());
180  A.PrintInfo();
181 
184  A.Reduce(*ColSums, Column, plus<int64_t>(), static_cast<int64_t>(0));
185  A.Reduce(*RowSums, Row, plus<int64_t>(), static_cast<int64_t>(0));
186  ColSums->EWiseApply(*RowSums, plus<int64_t>());
187  delete RowSums;
188 
189  nonisov = ColSums->FindInds(bind2nd(greater<int64_t>(), 0)); // only the indices of non-isolated vertices
190  delete ColSums;
191 
192  SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
193  nonisov.RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
194  A.PrintInfo();
195  #ifndef NOPERMUTE
196  A(nonisov, nonisov, true); // in-place permute to save memory
197  SpParHelper::Print("Dropped isolated vertices from input\n");
198  A.PrintInfo();
199  #endif
200  Aeff = PSpMat_s32p64(A); // Convert to 32-bit local integers
201  A.FreeMemory();
202 
203  Symmetricize(Aeff); // A += A';
204  SpParHelper::Print("Symmetricized\n");
205  //A.Dump("graph_symmetric");
206 
207  Aeff.OptimizeForGraph500(optbuf); // Should be called before threading is activated
208  #ifdef THREADED
209  ostringstream tinfo;
210  tinfo << "Threading activated with " << cblas_splits << " threads" << endl;
211  SpParHelper::Print(tinfo.str());
212  Aeff.ActivateThreading(cblas_splits);
213  #endif
214  }
215  else
216  {
217  if(string(argv[1]) == string("Force"))
218  {
219  scale = static_cast<unsigned>(atoi(argv[2]));
220  ostringstream outs;
221  outs << "Forcing scale to : " << scale << endl;
222  SpParHelper::Print(outs.str());
223 
224  if(argc > 3 && string(argv[3]) == string("FastGen"))
225  {
226  SpParHelper::Print("Using fast vertex permutations; skipping edge permutations (like v2.1)\n");
227  scramble = true;
228  }
229  }
230  else
231  {
232  SpParHelper::Print("Unknown option\n");
233  MPI_Finalize();
234  return -1;
235  }
236  // this is an undirected graph, so A*x does indeed BFS
237  double initiator[4] = {.57, .19, .19, .05};
238 
239  double t01 = MPI_Wtime();
240  double t02;
242  if(!scramble)
243  {
244  DEL->GenGraph500Data(initiator, scale, EDGEFACTOR);
245  SpParHelper::Print("Generated edge lists\n");
246  t02 = MPI_Wtime();
247  ostringstream tinfo;
248  tinfo << "Generation took " << t02-t01 << " seconds" << endl;
249  SpParHelper::Print(tinfo.str());
250 
251  PermEdges(*DEL);
252  SpParHelper::Print("Permuted Edges\n");
253  //DEL->Dump64bit("edges_permuted");
254  //SpParHelper::Print("Dumped\n");
255 
256  RenameVertices(*DEL); // intermediate: generates RandPerm vector, using MemoryEfficientPSort
257  SpParHelper::Print("Renamed Vertices\n");
258  }
259  else // fast generation
260  {
261  DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
262  SpParHelper::Print("Generated renamed edge lists\n");
263  t02 = MPI_Wtime();
264  ostringstream tinfo;
265  tinfo << "Generation took " << t02-t01 << " seconds" << endl;
266  SpParHelper::Print(tinfo.str());
267  }
268 
269  // Start Kernel #1
270  MPI_Barrier(MPI_COMM_WORLD);
271  double t1 = MPI_Wtime();
272 
273  // conversion from distributed edge list, keeps self-loops, sums duplicates
274  PSpMat_s32p64_Int * G = new PSpMat_s32p64_Int(*DEL, false);
275  delete DEL; // free memory before symmetricizing
276  SpParHelper::Print("Created Sparse Matrix (with int32 local indices and values)\n");
277 
278  MPI_Barrier(MPI_COMM_WORLD);
279  double redts = MPI_Wtime();
280  G->Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // Identity is 0
281  MPI_Barrier(MPI_COMM_WORLD);
282  double redtf = MPI_Wtime();
283 
284  ostringstream redtimeinfo;
285  redtimeinfo << "Calculated degrees in " << redtf-redts << " seconds" << endl;
286  SpParHelper::Print(redtimeinfo.str());
287  A = PSpMat_Bool(*G); // Convert to Boolean
288  delete G;
289  int64_t removed = A.RemoveLoops();
290 
291  ostringstream loopinfo;
292  loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
293  SpParHelper::Print(loopinfo.str());
294  A.PrintInfo();
295 
296  FullyDistVec<int64_t, int64_t> * ColSums = new FullyDistVec<int64_t, int64_t>(A.getcommgrid());
297  FullyDistVec<int64_t, int64_t> * RowSums = new FullyDistVec<int64_t, int64_t>(A.getcommgrid());
298  A.Reduce(*ColSums, Column, plus<int64_t>(), static_cast<int64_t>(0));
299  A.Reduce(*RowSums, Row, plus<int64_t>(), static_cast<int64_t>(0));
300  SpParHelper::Print("Reductions done\n");
301  ColSums->EWiseApply(*RowSums, plus<int64_t>());
302  delete RowSums;
303  SpParHelper::Print("Intersection of colsums and rowsums found\n");
304 
305  // TODO: seg fault in FindInds for scale 33
306  nonisov = ColSums->FindInds(bind2nd(greater<int64_t>(), 0)); // only the indices of non-isolated vertices
307  delete ColSums;
308 
309  SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
310  nonisov.RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
311  A.PrintInfo();
312  #ifndef NOPERMUTE
313  A(nonisov, nonisov, true); // in-place permute to save memory
314  SpParHelper::Print("Dropped isolated vertices from input\n");
315  A.PrintInfo();
316  #endif
317 
318  Aeff = PSpMat_s32p64(A); // Convert to 32-bit local integers
319  A.FreeMemory();
320  Symmetricize(Aeff); // A += A';
321  SpParHelper::Print("Symmetricized\n");
322 
323  Aeff.OptimizeForGraph500(optbuf); // Should be called before threading is activated
324  #ifdef THREADED
325  ostringstream tinfo;
326  tinfo << "Threading activated with " << cblas_splits << " threads" << endl;
327  SpParHelper::Print(tinfo.str());
328  Aeff.ActivateThreading(cblas_splits);
329  #endif
330  Aeff.PrintInfo();
331 
332  MPI_Barrier(MPI_COMM_WORLD);
333  double t2=MPI_Wtime();
334 
335  ostringstream k1timeinfo;
336  k1timeinfo << (t2-t1) - (redtf-redts) << " seconds elapsed for Kernel #1" << endl;
337  SpParHelper::Print(k1timeinfo.str());
338  }
339  Aeff.PrintInfo();
340  float balance = Aeff.LoadImbalance();
341  ostringstream outs;
342  outs << "Load balance: " << balance << endl;
343  SpParHelper::Print(outs.str());
344 
345  MPI_Barrier(MPI_COMM_WORLD);
346  double t1 = MPI_Wtime();
347 
348  // Now that every remaining vertex is non-isolated, randomly pick ITERS many of them as starting vertices
349  #ifndef NOPERMUTE
350  degrees = degrees(nonisov); // fix the degrees array too
351  degrees.PrintInfo("Degrees array");
352  #endif
353  // degrees.DebugPrint();
355  double nver = (double) degrees.TotalLength();
356 
357 #ifdef DETERMINISTIC
358  MTRand M(1);
359 #else
360  MTRand M; // generate random numbers with Mersenne Twister
361 #endif
362 
363  vector<double> loccands(ITERS);
364  vector<int64_t> loccandints(ITERS);
365  if(myrank == 0)
366  {
367  for(int i=0; i<ITERS; ++i)
368  loccands[i] = M.rand();
369  copy(loccands.begin(), loccands.end(), ostream_iterator<double>(cout," ")); cout << endl;
370  transform(loccands.begin(), loccands.end(), loccands.begin(), bind2nd( multiplies<double>(), nver ));
371 
372  for(int i=0; i<ITERS; ++i)
373  loccandints[i] = static_cast<int64_t>(loccands[i]);
374  copy(loccandints.begin(), loccandints.end(), ostream_iterator<double>(cout," ")); cout << endl;
375  }
376  MPI_Bcast(&(loccandints[0]), ITERS, MPIType<int64_t>(),0,MPI_COMM_WORLD);
377  for(int i=0; i<ITERS; ++i)
378  {
379  Cands.SetElement(i,loccandints[i]);
380  }
381 
382  #define MAXTRIALS 1
383  for(int trials =0; trials < MAXTRIALS; trials++) // try different algorithms for BFS
384  {
386  cblas_alltoalltime = 0;
388  cblas_transvectime = 0;
390  MPI_Pcontrol(1,"BFS");
391 
392  double MTEPS[ITERS]; double INVMTEPS[ITERS]; double TIMES[ITERS]; double EDGES[ITERS];
393  for(int i=0; i<ITERS; ++i)
394  {
395  // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
396  FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1); // identity is -1
397 
398  // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
399  FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol()); // numerical values are stored 0-based
400 
401  MPI_Barrier(MPI_COMM_WORLD);
402  double t1 = MPI_Wtime();
403 
404  fringe.SetElement(Cands[i], Cands[i]);
405  int iterations = 0;
406  while(fringe.getnnz() > 0)
407  {
408  fringe.setNumToInd();
409  fringe = SpMV(Aeff, fringe,optbuf); // SpMV with sparse vector (with indexisvalue flag preset), optimization enabled
410  fringe = EWiseMult(fringe, parents, true, (int64_t) -1); // clean-up vertices that already has parents
411  parents.Set(fringe);
412  iterations++;
413  }
414  MPI_Barrier(MPI_COMM_WORLD);
415  double t2 = MPI_Wtime();
416 
417  FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
418  parentsp.Apply(myset<int64_t>(1));
419 
420  // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
421  int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
422 
423  ostringstream outnew;
424  outnew << i << "th starting vertex was " << Cands[i] << endl;
425  outnew << "Number iterations: " << iterations << endl;
426  outnew << "Number of vertices found: " << parentsp.Reduce(plus<int64_t>(), (int64_t) 0) << endl;
427  outnew << "Number of edges traversed: " << nedges << endl;
428  outnew << "BFS time: " << t2-t1 << " seconds" << endl;
429  outnew << "MTEPS: " << static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
430  outnew << "Total communication (average so far): " << (cblas_allgathertime + cblas_alltoalltime) / (i+1) << endl;
431  TIMES[i] = t2-t1;
432  EDGES[i] = nedges;
433  MTEPS[i] = static_cast<double>(nedges) / (t2-t1) / 1000000.0;
434  SpParHelper::Print(outnew.str());
435  }
436  SpParHelper::Print("Finished\n");
437  ostringstream os;
438  MPI_Pcontrol(-1,"BFS");
439 
440 
441  os << "Per iteration communication times: " << endl;
442  os << "AllGatherv: " << cblas_allgathertime / ITERS << endl;
443  os << "AlltoAllv: " << cblas_alltoalltime / ITERS << endl;
444  os << "Transvec: " << cblas_transvectime / ITERS << endl;
445 
446  os << "Per iteration computation times: " << endl;
447  os << "MergeCont: " << cblas_mergeconttime / ITERS << endl;
448  os << "LocalSpmv: " << cblas_localspmvtime / ITERS << endl;
449 
450  sort(EDGES, EDGES+ITERS);
451  os << "--------------------------" << endl;
452  os << "Min nedges: " << EDGES[0] << endl;
453  os << "First Quartile nedges: " << (EDGES[(ITERS/4)-1] + EDGES[ITERS/4])/2 << endl;
454  os << "Median nedges: " << (EDGES[(ITERS/2)-1] + EDGES[ITERS/2])/2 << endl;
455  os << "Third Quartile nedges: " << (EDGES[(3*ITERS/4) -1 ] + EDGES[3*ITERS/4])/2 << endl;
456  os << "Max nedges: " << EDGES[ITERS-1] << endl;
457  double mean = accumulate( EDGES, EDGES+ITERS, 0.0 )/ ITERS;
458  vector<double> zero_mean(ITERS); // find distances to the mean
459  transform(EDGES, EDGES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
460  // self inner-product is sum of sum of squares
461  double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
462  deviation = sqrt( deviation / (ITERS-1) );
463  os << "Mean nedges: " << mean << endl;
464  os << "STDDEV nedges: " << deviation << endl;
465  os << "--------------------------" << endl;
466 
467  sort(TIMES,TIMES+ITERS);
468  os << "Min time: " << TIMES[0] << " seconds" << endl;
469  os << "First Quartile time: " << (TIMES[(ITERS/4)-1] + TIMES[ITERS/4])/2 << " seconds" << endl;
470  os << "Median time: " << (TIMES[(ITERS/2)-1] + TIMES[ITERS/2])/2 << " seconds" << endl;
471  os << "Third Quartile time: " << (TIMES[(3*ITERS/4)-1] + TIMES[3*ITERS/4])/2 << " seconds" << endl;
472  os << "Max time: " << TIMES[ITERS-1] << " seconds" << endl;
473  mean = accumulate( TIMES, TIMES+ITERS, 0.0 )/ ITERS;
474  transform(TIMES, TIMES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
475  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
476  deviation = sqrt( deviation / (ITERS-1) );
477  os << "Mean time: " << mean << " seconds" << endl;
478  os << "STDDEV time: " << deviation << " seconds" << endl;
479  os << "--------------------------" << endl;
480 
481  sort(MTEPS, MTEPS+ITERS);
482  os << "Min MTEPS: " << MTEPS[0] << endl;
483  os << "First Quartile MTEPS: " << (MTEPS[(ITERS/4)-1] + MTEPS[ITERS/4])/2 << endl;
484  os << "Median MTEPS: " << (MTEPS[(ITERS/2)-1] + MTEPS[ITERS/2])/2 << endl;
485  os << "Third Quartile MTEPS: " << (MTEPS[(3*ITERS/4)-1] + MTEPS[3*ITERS/4])/2 << endl;
486  os << "Max MTEPS: " << MTEPS[ITERS-1] << endl;
487  transform(MTEPS, MTEPS+ITERS, INVMTEPS, safemultinv<double>()); // returns inf for zero teps
488  double hteps = static_cast<double>(ITERS) / accumulate(INVMTEPS, INVMTEPS+ITERS, 0.0);
489  os << "Harmonic mean of MTEPS: " << hteps << endl;
490  transform(INVMTEPS, INVMTEPS+ITERS, zero_mean.begin(), bind2nd(minus<double>(), 1/hteps));
491  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
492  deviation = sqrt( deviation / (ITERS-1) ) * (hteps*hteps); // harmonic_std_dev
493  os << "Harmonic standard deviation of MTEPS: " << deviation << endl;
494  SpParHelper::Print(os.str());
495  }
496  }
497  MPI_Finalize();
498  return 0;
499 }
500