30 #include "../CombBLAS.h"
46 #include <Profile/Profiler.h>
60 #define MAX_ITERS 20000
64 #define PERCENTS 4 // testing with 4 different percentiles
71 template <
typename PARMAT>
109 int main(
int argc,
char* argv[])
112 TAU_PROFILE_TIMER(maintimer,
"main()",
"int (int, char **)", TAU_DEFAULT);
113 TAU_PROFILE_INIT(argc, argv);
114 TAU_PROFILE_START(maintimer);
117 MPI_Init(&argc, &argv);
118 MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
119 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
125 cout <<
"Usage: ./FilteredBFS <File, Gen> <Input Name | Scale> (Optional: Double)" << endl;
126 cout <<
"Example: ./FilteredBFS File twitter_small.txt Double" << endl;
142 double t01 = MPI_Wtime();
143 if(
string(argv[1]) ==
string(
"File"))
145 SpParHelper::Print(
"Using real data, which we NEVER permute for load balance, also leaving isolated vertices as-is, if any\n");
147 SpParHelper::Print(
"BFS is run on DIRECTED graph, hence hitting SCCs, and TEPS is unidirectional\n");
157 if(argc == 4 &&
string(argv[3]) ==
string(
"Double"))
166 else if(
string(argv[1]) ==
string(
"Gen"))
168 SpParHelper::Print(
"Using synthetic data, which we ALWAYS permute for load balance\n");
169 SpParHelper::Print(
"We only balance the original input, we don't repermute after each filter change\n");
170 SpParHelper::Print(
"BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
172 double initiator[4] = {.57, .19, .19, .05};
173 double t01 = MPI_Wtime();
177 unsigned scale =
static_cast<unsigned>(atoi(argv[2]));
179 outs <<
"Forcing scale to : " << scale << endl;
188 ostringstream loopinfo;
189 loopinfo <<
"Converted to Boolean and removed " << removed <<
" loops" << endl;
202 double t02 = MPI_Wtime();
204 tinfo <<
"I/O (or generation) took " << t02-t01 <<
" seconds" << endl;
209 ABool->
Reduce(oudegrees,
Column, plus<int64_t>(), static_cast<int64_t>(0));
210 ABool->
Reduce(indegrees,
Row, plus<int64_t>(), static_cast<int64_t>(0));
218 if(
string(argv[1]) ==
string(
"File"))
221 memset(&timeinfo, 0,
sizeof(
struct tm));
222 int year, month, day, hour, min, sec;
223 year = 2009; month = 7; day = 1;
224 hour = 0; min = 0; sec = 0;
226 timeinfo.tm_year = year - 1900;
227 timeinfo.tm_mon = month - 1 ;
228 timeinfo.tm_mday = day;
229 timeinfo.tm_hour = hour;
230 timeinfo.tm_min = min;
231 timeinfo.tm_sec = sec;
232 time_t mysincedate = timegm(&timeinfo);
234 PSpMat_Twitter
B = A;
236 PSpMat_Bool BBool = B;
238 BBool.
Reduce(oudegrees_filt,
Column, plus<int64_t>(), static_cast<int64_t>(0));
239 BBool.
Reduce(indegrees_filt,
Row, plus<int64_t>(), static_cast<int64_t>(0));
243 degrees.
EWiseApply(oudegrees, plus<int64_t>());
249 outs <<
"Load balance: " << balance << endl;
252 if(
string(argv[1]) == string(
"Gen"))
266 A(*nonisov, *nonisov,
true);
267 SpParHelper::Print(
"Dropped isolated vertices from input\n");
269 indegrees = indegrees(*nonisov);
270 oudegrees = oudegrees(*nonisov);
271 degrees = degrees(*nonisov);
276 PSpMat_Twitter
B = A;
278 PSpMat_Bool BBool = B;
282 outs <<
"Load balance of " <<
static_cast<float>(keep[i])/100 <<
"% filtered case: " << balance << endl;
283 SpParHelper::Print(outs.str());
286 BBool.
Reduce(degrees_filt[i],
Column, plus<int64_t>(), static_cast<int64_t>(0));
292 ostringstream outs_former;
293 outs_former <<
"Load balance: " << balance_former << endl;
296 MPI_Barrier(MPI_COMM_WORLD);
297 double t1 = MPI_Wtime();
300 double nver = (double) degrees.TotalLength();
303 for(
int trials =0; trials < MAXTRIALS; trials++)
305 if(
string(argv[1]) ==
string(
"Gen"))
327 MPI_Barrier(MPI_COMM_WORLD);
328 double t1 = MPI_Wtime();
330 fringe.SetElement(Cands[i], Cands[i]);
331 parents.SetElement(Cands[i],
ParentType(Cands[i]));
333 while(fringe.getnnz() > 0)
338 SpMV<LatestRetwitterBFS>(A, fringe, fringe,
false);
347 MPI_Barrier(MPI_COMM_WORLD);
348 double t2 = MPI_Wtime();
358 int64_t nedges, in_nedges, ou_nedges;
359 if(
string(argv[1]) == string(
"Gen"))
368 in_nedges = intraversed.
Reduce(plus<int64_t>(), (
int64_t) 0);
369 ou_nedges = outraversed.Reduce(plus<int64_t>(), (
int64_t) 0);
370 nedges = in_nedges + ou_nedges;
374 int64_t ou_nedges_processed = ouprocessed.Reduce(plus<int64_t>(), (
int64_t) 0);
375 int64_t nedges_processed = in_nedges_processed + ou_nedges_processed;
377 int64_t in_nedges, ou_nedges, nedges, in_nedges_processed, ou_nedges_processed, nedges_processed = 0;
388 parents.PrintInfo(
"Final parents array");
389 parents.DebugPrint();
392 ostringstream outnew;
393 outnew << i <<
"th starting vertex was " << Cands[i] << endl;
394 outnew <<
"Number iterations: " << iterations << endl;
395 outnew <<
"Number of vertices found: " << parentsp.getnnz() << endl;
396 outnew <<
"Number of edges traversed in both directions: " << nedges << endl;
397 if(
string(argv[1]) ==
string(
"File"))
398 outnew <<
"Number of edges traversed in one direction: " << ou_nedges << endl;
399 outnew <<
"Number of edges processed in both directions: " << nedges_processed << endl;
400 outnew <<
"Number of edges processed in one direction: " << ou_nedges_processed << endl;
401 outnew <<
"BFS time: " << t2-t1 <<
" seconds" << endl;
402 outnew <<
"MTEPS (bidirectional): " <<
static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
403 if(
string(argv[1]) == string(
"File"))
404 outnew <<
"MTEPS (unidirectional): " << static_cast<double>(ou_nedges) / (t2-t1) / 1000000.0 << endl;
405 outnew <<
"MPEPS (bidirectional): " <<
static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0 << endl;
406 outnew <<
"MPEPS (unidirectional): " <<
static_cast<double>(ou_nedges_processed) / (t2-t1) / 1000000.0 << endl;
409 TIMES[sruns] = t2-t1;
410 if(
string(argv[1]) == string(
"Gen"))
411 EDGES[sruns] = static_cast<double>(nedges);
413 EDGES[sruns] =
static_cast<double>(ou_nedges);
415 MTEPS[sruns] = EDGES[sruns] / (t2-t1) / 1000000.0;
416 MPEPS[sruns++] =
static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0;
428 os << sruns <<
" valid runs done" << endl;
429 os <<
"Connected component lower limite was " <<
CC_LIMIT << endl;
430 os <<
"Per iteration communication times: " << endl;
434 sort(EDGES, EDGES+sruns);
435 os <<
"--------------------------" << endl;
436 os <<
"Min nedges: " << EDGES[0] << endl;
437 os <<
"Median nedges: " << (EDGES[(sruns/2)-1] + EDGES[sruns/2])/2 << endl;
438 os <<
"Max nedges: " << EDGES[sruns-1] << endl;
439 double mean = accumulate( EDGES, EDGES+sruns, 0.0 )/ sruns;
440 vector<double> zero_mean(sruns);
441 transform(EDGES, EDGES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
443 double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
444 deviation = sqrt( deviation / (sruns-1) );
445 os <<
"Mean nedges: " << mean << endl;
446 os <<
"STDDEV nedges: " << deviation << endl;
447 os <<
"--------------------------" << endl;
449 sort(TIMES,TIMES+sruns);
450 os <<
"Filter keeps " <<
static_cast<double>(keep[trials])/100.0 <<
" percentage of edges" << endl;
451 os <<
"Min time: " << TIMES[0] <<
" seconds" << endl;
452 os <<
"Median time: " << (TIMES[(sruns/2)-1] + TIMES[sruns/2])/2 <<
" seconds" << endl;
453 os <<
"Max time: " << TIMES[sruns-1] <<
" seconds" << endl;
454 mean = accumulate( TIMES, TIMES+sruns, 0.0 )/ sruns;
455 transform(TIMES, TIMES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
456 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
457 deviation = sqrt( deviation / (sruns-1) );
458 os <<
"Mean time: " << mean <<
" seconds" << endl;
459 os <<
"STDDEV time: " << deviation <<
" seconds" << endl;
460 os <<
"--------------------------" << endl;
462 sort(MTEPS, MTEPS+sruns);
463 os <<
"Min MTEPS: " << MTEPS[0] << endl;
464 os <<
"Median MTEPS: " << (MTEPS[(sruns/2)-1] + MTEPS[sruns/2])/2 << endl;
465 os <<
"Max MTEPS: " << MTEPS[sruns-1] << endl;
467 double hteps =
static_cast<double>(sruns) / accumulate(INVMTEPS, INVMTEPS+sruns, 0.0);
468 os <<
"Harmonic mean of MTEPS: " << hteps << endl;
469 transform(INVMTEPS, INVMTEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hteps));
470 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
471 deviation = sqrt( deviation / (sruns-1) ) * (hteps*hteps);
472 os <<
"Harmonic standard deviation of MTEPS: " << deviation << endl;
474 sort(MPEPS, MPEPS+sruns);
475 os <<
"Bidirectional Processed Edges per second (to estimate sustained BW)"<< endl;
476 os <<
"Min MPEPS: " << MPEPS[0] << endl;
477 os <<
"Median MPEPS: " << (MPEPS[(sruns/2)-1] + MPEPS[sruns/2])/2 << endl;
478 os <<
"Max MPEPS: " << MPEPS[sruns-1] << endl;
480 double hpeps =
static_cast<double>(sruns) / accumulate(INVMPEPS, INVMPEPS+sruns, 0.0);
481 os <<
"Harmonic mean of MPEPS: " << hpeps << endl;
482 transform(INVMPEPS, INVMPEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hpeps));
483 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
484 deviation = sqrt( deviation / (sruns-1) ) * (hpeps*hpeps);
485 os <<
"Harmonic standard deviation of MPEPS: " << deviation << endl;
490 TAU_PROFILE_STOP(maintimer);