COMBINATORIAL_BLAS  1.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FilteredBFS.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.3 -------------------------------------------------*/
4 /* date: 2/1/2013 ----------------------------------------------*/
5 /* authors: Aydin Buluc (abuluc@lbl.gov), Adam Lugowski --------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-, Aydin Buluc
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 #define DETERMINISTIC
30 #include "../CombBLAS.h"
31 #include <mpi.h>
32 #include <sys/time.h>
33 #include <iostream>
34 #include <functional>
35 #include <algorithm>
36 #include <vector>
37 #include <string>
38 #include <sstream>
39 #ifdef THREADED
40  #ifndef _OPENMP
41  #define _OPENMP
42  #endif
43  #include <omp.h>
44 #endif
45 #ifdef TAU_PROFILE
46  #include <Profile/Profiler.h>
47 #endif
48 
49 
52 #ifdef _OPENMP
53 int cblas_splits = omp_get_max_threads();
54 #else
55 int cblas_splits = 1;
56 #endif
57 
58 #include "TwitterEdge.h"
59 
60 #define MAX_ITERS 20000
61 #define EDGEFACTOR 16
62 #define ITERS 16
63 #define CC_LIMIT 100
64 #define PERCENTS 4 // testing with 4 different percentiles
65 #define MINRUNS 4
66 //#define ONLYTIME // don't calculate TEPS
67 
68 using namespace std;
69 
70 
71 template <typename PARMAT>
72 void Symmetricize(PARMAT & A)
73 {
74  // boolean addition is practically a "logical or"
75  // therefore this doesn't destruct any links
76  PARMAT AT = A;
77  AT.Transpose();
78  A += AT;
79 }
80 
81 #ifdef DETERMINISTIC
82  MTRand GlobalMT(1);
83 #else
85 #endif
86 struct Twitter_obj_randomizer : public std::unary_function<TwitterEdge, TwitterEdge>
87 {
88  const TwitterEdge operator()(const TwitterEdge & x) const
89  {
90  short mycount = 1;
91  bool myfollow = 0;
92  time_t mylatest = static_cast<int64_t>(GlobalMT.rand() * 10000); // random.randrange(0,10000)
93 
94  return TwitterEdge(mycount, myfollow, mylatest);
95  }
96 };
97 
98 struct Twitter_materialize: public std::binary_function<TwitterEdge, time_t, bool>
99 {
100  bool operator()(const TwitterEdge & x, time_t sincedate) const
101  {
102  if(x.isRetwitter() && x.LastTweetBy(sincedate))
103  return false; // false if the edge is going to be kept
104  else
105  return true; // true if the edge is to be pruned
106  }
107 };
108 
109 int main(int argc, char* argv[])
110 {
111 #ifdef TAU_PROFILE
112  TAU_PROFILE_TIMER(maintimer, "main()", "int (int, char **)", TAU_DEFAULT);
113  TAU_PROFILE_INIT(argc, argv);
114  TAU_PROFILE_START(maintimer);
115 #endif
116  int nprocs, myrank;
117  MPI_Init(&argc, &argv);
118  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
119  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
120  int MAXTRIALS;
121  if(argc < 3)
122  {
123  if(myrank == 0)
124  {
125  cout << "Usage: ./FilteredBFS <File, Gen> <Input Name | Scale> (Optional: Double)" << endl;
126  cout << "Example: ./FilteredBFS File twitter_small.txt Double" << endl;
127  }
128  MPI_Finalize();
129  return -1;
130  }
131  {
134 
135  // Declare objects
136  PSpMat_Twitter A;
137  FullyDistVec<int64_t, int64_t> indegrees; // in-degrees of vertices (including multi-edges and self-loops)
138  FullyDistVec<int64_t, int64_t> oudegrees; // out-degrees of vertices (including multi-edges and self-loops)
139  FullyDistVec<int64_t, int64_t> degrees; // combined degrees of vertices (including multi-edges and self-loops)
140  PSpMat_Bool * ABool;
141 
142  double t01 = MPI_Wtime();
143  if(string(argv[1]) == string("File")) // text|binary input option
144  {
145  SpParHelper::Print("Using real data, which we NEVER permute for load balance, also leaving isolated vertices as-is, if any\n");
146  SpParHelper::Print("This is because the input is assumed to be load balanced\n");
147  SpParHelper::Print("BFS is run on DIRECTED graph, hence hitting SCCs, and TEPS is unidirectional\n");
148 
149  // ReadDistribute (const string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
150  // if nonum is true, then numerics are not supplied and they are assumed to be all 1's
151  A.ReadDistribute(string(argv[2]), 0, false, TwitterReadSaveHandler<int64_t>(), true, true); // read it from file (and transpose on the fly)
152 
153  A.PrintInfo();
154  SpParHelper::Print("Read input\n");
155 
156  ABool = new PSpMat_Bool(A);
157  if(argc == 4 && string(argv[3]) == string("Double"))
158  {
159  MAXTRIALS = 2;
160  }
161  else
162  {
163  MAXTRIALS = 1;
164  }
165  }
166  else if(string(argv[1]) == string("Gen"))
167  {
168  SpParHelper::Print("Using synthetic data, which we ALWAYS permute for load balance\n");
169  SpParHelper::Print("We only balance the original input, we don't repermute after each filter change\n");
170  SpParHelper::Print("BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
171 
172  double initiator[4] = {.57, .19, .19, .05};
173  double t01 = MPI_Wtime();
174  double t02;
176 
177  unsigned scale = static_cast<unsigned>(atoi(argv[2]));
178  ostringstream outs;
179  outs << "Forcing scale to : " << scale << endl;
180  SpParHelper::Print(outs.str());
181 
182  // parameters: (double initiator[4], int log_numverts, int edgefactor, bool scramble, bool packed)
183  DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
184  SpParHelper::Print("Generated renamed edge lists\n");
185 
186  ABool = new PSpMat_Bool(*DEL, false);
187  int64_t removed = ABool->RemoveLoops();
188  ostringstream loopinfo;
189  loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
190  SpParHelper::Print(loopinfo.str());
191  ABool->PrintInfo();
192  delete DEL; // free memory
193  A = PSpMat_Twitter(*ABool); // any upcasting generates the default object
194 
195  MAXTRIALS = PERCENTS; // benchmarking
196  }
197  else
198  {
199  SpParHelper::Print("Not supported yet\n");
200  return 0;
201  }
202  double t02 = MPI_Wtime();
203  ostringstream tinfo;
204  tinfo << "I/O (or generation) took " << t02-t01 << " seconds" << endl;
205  SpParHelper::Print(tinfo.str());
206 
207  // indegrees is sum along rows because A is loaded as "tranposed", similarly oudegrees is sum along columns
208  ABool->PrintInfo();
209  ABool->Reduce(oudegrees, Column, plus<int64_t>(), static_cast<int64_t>(0));
210  ABool->Reduce(indegrees, Row, plus<int64_t>(), static_cast<int64_t>(0));
211 
212  // indegrees_filt and oudegrees_filt is used for the real data
213  FullyDistVec<int64_t, int64_t> indegrees_filt;
214  FullyDistVec<int64_t, int64_t> oudegrees_filt;
215  FullyDistVec<int64_t, int64_t> degrees_filt[4]; // used for the synthetic data (symmetricized before randomization)
216  int64_t keep[PERCENTS] = {100, 1000, 2500, 10000}; // ratio of edges kept in range (0, 10000)
217 
218  if(string(argv[1]) == string("File")) // if using synthetic data, no notion of out/in degrees after randomization exist
219  {
220  struct tm timeinfo;
221  memset(&timeinfo, 0, sizeof(struct tm));
222  int year, month, day, hour, min, sec;
223  year = 2009; month = 7; day = 1;
224  hour = 0; min = 0; sec = 0;
225 
226  timeinfo.tm_year = year - 1900; // year is "years since 1900"
227  timeinfo.tm_mon = month - 1 ; // month is in range 0...11
228  timeinfo.tm_mday = day; // range 1...31
229  timeinfo.tm_hour = hour; // range 0...23
230  timeinfo.tm_min = min; // range 0...59
231  timeinfo.tm_sec = sec; // range 0.
232  time_t mysincedate = timegm(&timeinfo);
233 
234  PSpMat_Twitter B = A;
235  B.Prune(bind2nd(Twitter_materialize(), mysincedate));
236  PSpMat_Bool BBool = B;
237  BBool.PrintInfo();
238  BBool.Reduce(oudegrees_filt, Column, plus<int64_t>(), static_cast<int64_t>(0));
239  BBool.Reduce(indegrees_filt, Row, plus<int64_t>(), static_cast<int64_t>(0));
240  }
241 
242  degrees = indegrees;
243  degrees.EWiseApply(oudegrees, plus<int64_t>());
244  SpParHelper::Print("All degrees calculated\n");
245  delete ABool;
246 
247  float balance = A.LoadImbalance();
248  ostringstream outs;
249  outs << "Load balance: " << balance << endl;
250  SpParHelper::Print(outs.str());
251 
252  if(string(argv[1]) == string("Gen"))
253  {
254  // We symmetricize before we apply the random generator
255  // Otherwise += will naturally add the random numbers together
256  // hence will create artificially high-permeable filters
257  Symmetricize(A); // A += A';
258  SpParHelper::Print("Symmetricized\n");
259 
261  A.PrintInfo();
262 
263  FullyDistVec<int64_t, int64_t> * nonisov = new FullyDistVec<int64_t, int64_t>(degrees.FindInds(bind2nd(greater<int64_t>(), 0)));
264  SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
265  nonisov->RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
266  A(*nonisov, *nonisov, true); // in-place permute to save memory
267  SpParHelper::Print("Dropped isolated vertices from input\n");
268 
269  indegrees = indegrees(*nonisov); // fix the degrees arrays too
270  oudegrees = oudegrees(*nonisov);
271  degrees = degrees(*nonisov);
272  delete nonisov;
273 
274  for (int i=0; i < PERCENTS; i++)
275  {
276  PSpMat_Twitter B = A;
277  B.Prune(bind2nd(Twitter_materialize(), keep[i]));
278  PSpMat_Bool BBool = B;
279  BBool.PrintInfo();
280  float balance = B.LoadImbalance();
281  ostringstream outs;
282  outs << "Load balance of " << static_cast<float>(keep[i])/100 << "% filtered case: " << balance << endl;
283  SpParHelper::Print(outs.str());
284 
285  // degrees_filt[i] is by-default generated as permuted
286  BBool.Reduce(degrees_filt[i], Column, plus<int64_t>(), static_cast<int64_t>(0)); // Column=Row since BBool is symmetric
287  }
288  }
289  // real data is pre-balanced, because otherwise even the load balancing routine crushes due to load imbalance
290 
291  float balance_former = A.LoadImbalance();
292  ostringstream outs_former;
293  outs_former << "Load balance: " << balance_former << endl;
294  SpParHelper::Print(outs_former.str());
295 
296  MPI_Barrier(MPI_COMM_WORLD);
297  double t1 = MPI_Wtime();
298 
300  double nver = (double) degrees.TotalLength();
301  Cands.SelectCandidates(nver, true);
302 
303  for(int trials =0; trials < MAXTRIALS; trials++)
304  {
305  if(string(argv[1]) == string("Gen"))
306  {
307  LatestRetwitterBFS::sincedate = keep[trials];
308  ostringstream outs;
309  outs << "Initializing since date (only once) to " << LatestRetwitterBFS::sincedate << endl;
310  SpParHelper::Print(outs.str());
311  }
312 
314  cblas_alltoalltime = 0;
315 
316  double MTEPS[ITERS]; double INVMTEPS[ITERS]; double TIMES[ITERS]; double EDGES[ITERS];
317  double MPEPS[ITERS]; double INVMPEPS[ITERS];
318  int sruns = 0; // successful runs
319  for(int i=0; i<MAX_ITERS && sruns < ITERS; ++i)
320  {
321  // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
323 
324  // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
326 
327  MPI_Barrier(MPI_COMM_WORLD);
328  double t1 = MPI_Wtime();
329 
330  fringe.SetElement(Cands[i], Cands[i]);
331  parents.SetElement(Cands[i], ParentType(Cands[i])); // make root discovered
332  int iterations = 0;
333  while(fringe.getnnz() > 0)
334  {
335  fringe.ApplyInd(NumSetter);
336 
337  // SpMV with sparse vector, optimizations disabled for generality
338  SpMV<LatestRetwitterBFS>(A, fringe, fringe, false);
339 
340  // EWiseApply (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W,
341  // _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero)
342  fringe = EWiseApply<ParentType>(fringe, parents, getfringe(), keepinfrontier_f(), true, ParentType());
343  parents += fringe;
344  iterations++;
345 
346  }
347  MPI_Barrier(MPI_COMM_WORLD);
348  double t2 = MPI_Wtime();
349 
350 
351  FullyDistSpVec<int64_t, ParentType> parentsp = parents.Find(isparentset());
352  parentsp.Apply(myset<ParentType>(ParentType(1)));
353 
354 #ifndef ONLYTIME
355  FullyDistSpVec<int64_t, int64_t> intraversed, inprocessed, outraversed, ouprocessed;
356  inprocessed = EWiseApply<int64_t>(parentsp, indegrees, seldegree(), passifthere(), true, ParentType());
357  ouprocessed = EWiseApply<int64_t>(parentsp, oudegrees, seldegree(), passifthere(), true, ParentType());
358  int64_t nedges, in_nedges, ou_nedges;
359  if(string(argv[1]) == string("Gen"))
360  {
361  FullyDistSpVec<int64_t, int64_t> traversed = EWiseApply<int64_t>(parentsp, degrees_filt[trials], seldegree(), passifthere(), true, ParentType());
362  nedges = traversed.Reduce(plus<int64_t>(), (int64_t) 0);
363  }
364  else
365  {
366  intraversed = EWiseApply<int64_t>(parentsp, indegrees_filt, seldegree(), passifthere(), true, ParentType());
367  outraversed = EWiseApply<int64_t>(parentsp, oudegrees_filt, seldegree(), passifthere(), true, ParentType());
368  in_nedges = intraversed.Reduce(plus<int64_t>(), (int64_t) 0);
369  ou_nedges = outraversed.Reduce(plus<int64_t>(), (int64_t) 0);
370  nedges = in_nedges + ou_nedges; // count birectional edges twice
371  }
372 
373  int64_t in_nedges_processed = inprocessed.Reduce(plus<int64_t>(), (int64_t) 0);
374  int64_t ou_nedges_processed = ouprocessed.Reduce(plus<int64_t>(), (int64_t) 0);
375  int64_t nedges_processed = in_nedges_processed + ou_nedges_processed; // count birectional edges twice
376 #else
377  int64_t in_nedges, ou_nedges, nedges, in_nedges_processed, ou_nedges_processed, nedges_processed = 0;
378 #endif
379 
380  if(parentsp.getnnz() > CC_LIMIT)
381  {
382  // intraversed.PrintInfo("Incoming edges traversed per vertex");
383  // intraversed.DebugPrint();
384  // outraversed.PrintInfo("Outgoing edges traversed per vertex");
385  // outraversed.DebugPrint();
386 
387  #ifdef DEBUG
388  parents.PrintInfo("Final parents array");
389  parents.DebugPrint();
390  #endif
391 
392  ostringstream outnew;
393  outnew << i << "th starting vertex was " << Cands[i] << endl;
394  outnew << "Number iterations: " << iterations << endl;
395  outnew << "Number of vertices found: " << parentsp.getnnz() << endl;
396  outnew << "Number of edges traversed in both directions: " << nedges << endl;
397  if(string(argv[1]) == string("File"))
398  outnew << "Number of edges traversed in one direction: " << ou_nedges << endl;
399  outnew << "Number of edges processed in both directions: " << nedges_processed << endl;
400  outnew << "Number of edges processed in one direction: " << ou_nedges_processed << endl;
401  outnew << "BFS time: " << t2-t1 << " seconds" << endl;
402  outnew << "MTEPS (bidirectional): " << static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
403  if(string(argv[1]) == string("File"))
404  outnew << "MTEPS (unidirectional): " << static_cast<double>(ou_nedges) / (t2-t1) / 1000000.0 << endl;
405  outnew << "MPEPS (bidirectional): " << static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0 << endl;
406  outnew << "MPEPS (unidirectional): " << static_cast<double>(ou_nedges_processed) / (t2-t1) / 1000000.0 << endl;
407  outnew << "Total communication (average so far): " << (cblas_allgathertime + cblas_alltoalltime) / (i+1) << endl;
408 
409  TIMES[sruns] = t2-t1;
410  if(string(argv[1]) == string("Gen"))
411  EDGES[sruns] = static_cast<double>(nedges);
412  else
413  EDGES[sruns] = static_cast<double>(ou_nedges);
414 
415  MTEPS[sruns] = EDGES[sruns] / (t2-t1) / 1000000.0;
416  MPEPS[sruns++] = static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0;
417  SpParHelper::Print(outnew.str());
418  }
419  }
420  if (sruns < MINRUNS)
421  {
422  SpParHelper::Print("Not enough valid runs done\n");
423  MPI_Finalize();
424  return 0;
425  }
426  ostringstream os;
427 
428  os << sruns << " valid runs done" << endl;
429  os << "Connected component lower limite was " << CC_LIMIT << endl;
430  os << "Per iteration communication times: " << endl;
431  os << "AllGatherv: " << cblas_allgathertime / sruns << endl;
432  os << "AlltoAllv: " << cblas_alltoalltime / sruns << endl;
433 
434  sort(EDGES, EDGES+sruns);
435  os << "--------------------------" << endl;
436  os << "Min nedges: " << EDGES[0] << endl;
437  os << "Median nedges: " << (EDGES[(sruns/2)-1] + EDGES[sruns/2])/2 << endl;
438  os << "Max nedges: " << EDGES[sruns-1] << endl;
439  double mean = accumulate( EDGES, EDGES+sruns, 0.0 )/ sruns;
440  vector<double> zero_mean(sruns); // find distances to the mean
441  transform(EDGES, EDGES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
442  // self inner-product is sum of sum of squares
443  double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
444  deviation = sqrt( deviation / (sruns-1) );
445  os << "Mean nedges: " << mean << endl;
446  os << "STDDEV nedges: " << deviation << endl;
447  os << "--------------------------" << endl;
448 
449  sort(TIMES,TIMES+sruns);
450  os << "Filter keeps " << static_cast<double>(keep[trials])/100.0 << " percentage of edges" << endl;
451  os << "Min time: " << TIMES[0] << " seconds" << endl;
452  os << "Median time: " << (TIMES[(sruns/2)-1] + TIMES[sruns/2])/2 << " seconds" << endl;
453  os << "Max time: " << TIMES[sruns-1] << " seconds" << endl;
454  mean = accumulate( TIMES, TIMES+sruns, 0.0 )/ sruns;
455  transform(TIMES, TIMES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
456  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
457  deviation = sqrt( deviation / (sruns-1) );
458  os << "Mean time: " << mean << " seconds" << endl;
459  os << "STDDEV time: " << deviation << " seconds" << endl;
460  os << "--------------------------" << endl;
461 
462  sort(MTEPS, MTEPS+sruns);
463  os << "Min MTEPS: " << MTEPS[0] << endl;
464  os << "Median MTEPS: " << (MTEPS[(sruns/2)-1] + MTEPS[sruns/2])/2 << endl;
465  os << "Max MTEPS: " << MTEPS[sruns-1] << endl;
466  transform(MTEPS, MTEPS+sruns, INVMTEPS, safemultinv<double>()); // returns inf for zero teps
467  double hteps = static_cast<double>(sruns) / accumulate(INVMTEPS, INVMTEPS+sruns, 0.0);
468  os << "Harmonic mean of MTEPS: " << hteps << endl;
469  transform(INVMTEPS, INVMTEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hteps));
470  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
471  deviation = sqrt( deviation / (sruns-1) ) * (hteps*hteps); // harmonic_std_dev
472  os << "Harmonic standard deviation of MTEPS: " << deviation << endl;
473 
474  sort(MPEPS, MPEPS+sruns);
475  os << "Bidirectional Processed Edges per second (to estimate sustained BW)"<< endl;
476  os << "Min MPEPS: " << MPEPS[0] << endl;
477  os << "Median MPEPS: " << (MPEPS[(sruns/2)-1] + MPEPS[sruns/2])/2 << endl;
478  os << "Max MPEPS: " << MPEPS[sruns-1] << endl;
479  transform(MPEPS, MPEPS+sruns, INVMPEPS, safemultinv<double>()); // returns inf for zero teps
480  double hpeps = static_cast<double>(sruns) / accumulate(INVMPEPS, INVMPEPS+sruns, 0.0);
481  os << "Harmonic mean of MPEPS: " << hpeps << endl;
482  transform(INVMPEPS, INVMPEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hpeps));
483  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
484  deviation = sqrt( deviation / (sruns-1) ) * (hpeps*hpeps); // harmonic_std_dev
485  os << "Harmonic standard deviation of MPEPS: " << deviation << endl;
486  SpParHelper::Print(os.str());
487  }
488  }
489 #ifdef TAU_PROFILE
490  TAU_PROFILE_STOP(maintimer);
491 #endif
492  MPI_Finalize();
493  return 0;
494 }
495