COMBINATORIAL_BLAS  1.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BFSFriends.h
Go to the documentation of this file.
1 #ifndef _BFS_FRIENDS_H_
2 #define _BFS_FRIENDS_H_
3 
4 #include "mpi.h"
5 #include <iostream>
6 #include "SpParMat.h"
7 #include "SpParHelper.h"
8 #include "MPIType.h"
9 #include "Friends.h"
10 #include "OptBuf.h"
11 #include "ParFriends.h"
12 #include "SpImplNoSR.h"
13 // AL: doesn't compile on OSX. #include <parallel/algorithm>
14 
15 using namespace std;
16 
17 template <class IT, class NT, class DER>
18 class SpParMat;
19 
20 /*************************************************************************************************/
21 /*********************** FRIEND FUNCTIONS FOR BFS ONLY (NO SEMIRINGS) RUNS **********************/
22 /***************************** BOTH PARALLEL AND SEQUENTIAL FUNCTIONS ****************************/
23 /*************************************************************************************************/
24 
29 template <typename IT, typename VT>
30 void dcsc_gespmv_threaded_setbuffers (const SpDCCols<IT, bool> & A, const int32_t * indx, const VT * numx, int32_t nnzx,
31  int32_t * sendindbuf, VT * sendnumbuf, int * cnts, int * dspls, int p_c)
32 {
33  if(A.getnnz() > 0 && nnzx > 0)
34  {
35  int splits = A.getnsplit();
36  if(splits > 0)
37  {
38  vector< vector<int32_t> > indy(splits);
39  vector< vector< VT > > numy(splits);
40  int32_t nlocrows = static_cast<int32_t>(A.getnrow());
41  int32_t perpiece = nlocrows / splits;
42 
43  #ifdef _OPENMP
44  #pragma omp parallel for
45  #endif
46  for(int i=0; i<splits; ++i)
47  {
48  if(i != splits-1)
49  SpMXSpV_ForThreading(*(A.GetDCSC(i)), perpiece, indx, numx, nnzx, indy[i], numy[i], i*perpiece);
50  else
51  SpMXSpV_ForThreading(*(A.GetDCSC(i)), nlocrows - perpiece*i, indx, numx, nnzx, indy[i], numy[i], i*perpiece);
52  }
53 
54  int32_t perproc = nlocrows / p_c;
55  int32_t last_rec = p_c-1;
56 
57  // keep recipients of last entries in each split (-1 for an empty split)
58  // so that we can delete indy[] and numy[] contents as soon as they are processed
59  vector<int32_t> end_recs(splits);
60  for(int i=0; i<splits; ++i)
61  {
62  if(indy[i].empty())
63  end_recs[i] = -1;
64  else
65  end_recs[i] = min(indy[i].back() / perproc, last_rec);
66  }
67 
68  int ** loc_rec_cnts = new int *[splits];
69  #ifdef _OPENMP
70  #pragma omp parallel for
71  #endif
72  for(int i=0; i<splits; ++i)
73  {
74  loc_rec_cnts[i] = new int[p_c](); // thread-local recipient data
75  if(!indy[i].empty()) // guarantee that .begin() and .end() are not null
76  {
77  int32_t cur_rec = min( indy[i].front() / perproc, last_rec);
78  int32_t lastdata = (cur_rec+1) * perproc; // one past last entry that goes to this current recipient
79  for(typename vector<int32_t>::iterator it = indy[i].begin(); it != indy[i].end(); ++it)
80  {
81  if( ( (*it) >= lastdata ) && cur_rec != last_rec)
82  {
83  cur_rec = min( (*it) / perproc, last_rec);
84  lastdata = (cur_rec+1) * perproc;
85  }
86  ++loc_rec_cnts[i][cur_rec];
87  }
88  }
89  }
90  #ifdef _OPENMP
91  #pragma omp parallel for
92  #endif
93  for(int i=0; i<splits; ++i)
94  {
95  if(!indy[i].empty()) // guarantee that .begin() and .end() are not null
96  {
97  // FACT: Data is sorted, so if the recipient of begin is the same as the owner of end,
98  // then the whole data is sent to the same processor
99  int32_t beg_rec = min( indy[i].front() / perproc, last_rec);
100  int32_t alreadysent = 0; // already sent per recipient
101  for(int before = i-1; before >= 0; before--)
102  alreadysent += loc_rec_cnts[before][beg_rec];
103 
104  if(beg_rec == end_recs[i]) // fast case
105  {
106  transform(indy[i].begin(), indy[i].end(), indy[i].begin(), bind2nd(minus<int32_t>(), perproc*beg_rec));
107  copy(indy[i].begin(), indy[i].end(), sendindbuf + dspls[beg_rec] + alreadysent);
108  copy(numy[i].begin(), numy[i].end(), sendnumbuf + dspls[beg_rec] + alreadysent);
109  }
110  else // slow case
111  {
112  int32_t cur_rec = beg_rec;
113  int32_t lastdata = (cur_rec+1) * perproc; // one past last entry that goes to this current recipient
114  for(typename vector<int32_t>::iterator it = indy[i].begin(); it != indy[i].end(); ++it)
115  {
116  if( ( (*it) >= lastdata ) && cur_rec != last_rec )
117  {
118  cur_rec = min( (*it) / perproc, last_rec);
119  lastdata = (cur_rec+1) * perproc;
120 
121  // if this split switches to a new recipient after sending some data
122  // then it's sure that no data has been sent to that recipient yet
123  alreadysent = 0;
124  }
125  sendindbuf[ dspls[cur_rec] + alreadysent ] = (*it) - perproc*cur_rec; // convert to receiver's local index
126  sendnumbuf[ dspls[cur_rec] + (alreadysent++) ] = *(numy[i].begin() + (it-indy[i].begin()));
127  }
128  }
129  }
130  }
131  // Deallocated rec counts serially once all threads complete
132  for(int i=0; i< splits; ++i)
133  {
134  for(int j=0; j< p_c; ++j)
135  cnts[j] += loc_rec_cnts[i][j];
136  delete [] loc_rec_cnts[i];
137  }
138  delete [] loc_rec_cnts;
139  }
140  else
141  {
142  cout << "Something is wrong, splits should be nonzero for multithreaded execution" << endl;
143  }
144  }
145 }
146 
153 template<typename VT, typename IT, typename UDER>
154 void LocalSpMV(const SpParMat<IT,bool,UDER> & A, int rowneighs, OptBuf<int32_t, VT > & optbuf, int32_t * & indacc, VT * & numacc, int * sendcnt, int accnz)
155 {
156 
157 #ifdef TIMING
158  double t0=MPI_Wtime();
159 #endif
160  if(optbuf.totmax > 0) // graph500 optimization enabled
161  {
162  if(A.spSeq->getnsplit() > 0)
163  {
164  // optbuf.{inds/nums/dspls} and sendcnt are all pre-allocated and only filled by dcsc_gespmv_threaded
165  dcsc_gespmv_threaded_setbuffers (*(A.spSeq), indacc, numacc, accnz, optbuf.inds, optbuf.nums, sendcnt, optbuf.dspls, rowneighs);
166  }
167  else
168  {
169  // by-pass dcsc_gespmv call
170  if(A.getlocalnnz() > 0 && accnz > 0)
171  {
172  SpMXSpV(*((A.spSeq)->GetDCSC()), (int32_t) A.getlocalrows(), indacc, numacc,
173  accnz, optbuf.inds, optbuf.nums, sendcnt, optbuf.dspls, rowneighs, optbuf.isthere);
174  }
175  }
176  DeleteAll(indacc,numacc);
177  }
178  else
179  {
180  SpParHelper::Print("BFS only (no semiring) function only work with optimization buffers\n");
181  }
182 
183 #ifdef TIMING
184  double t1=MPI_Wtime();
185  cblas_localspmvtime += (t1-t0);
186 #endif
187 }
188 
189 
190 template <typename IU, typename VT>
191 void MergeContributions(FullyDistSpVec<IU,VT> & y, int * & recvcnt, int * & rdispls, int32_t * & recvindbuf, VT * & recvnumbuf, int rowneighs)
192 {
193 #ifdef TIMING
194  double t0=MPI_Wtime();
195 #endif
196  // free memory of y, in case it was aliased
197  vector<IU>().swap(y.ind);
198  vector<VT>().swap(y.num);
199 
200 #ifndef HEAPMERGE
201  IU ysize = y.MyLocLength(); // my local length is only O(n/p)
202  bool * isthere = new bool[ysize];
203  vector< pair<IU,VT> > ts_pairs;
204  fill_n(isthere, ysize, false);
205 
206  // We don't need to keep a "merger" because minimum will always come from the processor
207  // with the smallest rank; so a linear sweep over the received buffer is enough
208  for(int i=0; i<rowneighs; ++i)
209  {
210  for(int j=0; j< recvcnt[i]; ++j)
211  {
212  int32_t index = recvindbuf[rdispls[i] + j];
213  if(!isthere[index])
214  ts_pairs.push_back(make_pair(index, recvnumbuf[rdispls[i] + j]));
215 
216  }
217  }
218  DeleteAll(recvcnt, rdispls);
219  DeleteAll(isthere, recvindbuf, recvnumbuf);
220  __gnu_parallel::sort(ts_pairs.begin(), ts_pairs.end());
221  int nnzy = ts_pairs.size();
222  y.ind.resize(nnzy);
223  y.num.resize(nnzy);
224  for(int i=0; i< nnzy; ++i)
225  {
226  y.ind[i] = ts_pairs[i].first;
227  y.num[i] = ts_pairs[i].second;
228  }
229 
230 #else
231  // Alternative 2: Heap-merge
232  int32_t hsize = 0;
233  int32_t inf = numeric_limits<int32_t>::min();
234  int32_t sup = numeric_limits<int32_t>::max();
235  KNHeap< int32_t, int32_t > sHeap(sup, inf);
236  int * processed = new int[rowneighs]();
237  for(int32_t i=0; i<rowneighs; ++i)
238  {
239  if(recvcnt[i] > 0)
240  {
241  // key, proc_id
242  sHeap.insert(recvindbuf[rdispls[i]], i);
243  ++hsize;
244  }
245  }
246  int32_t key, locv;
247  if(hsize > 0)
248  {
249  sHeap.deleteMin(&key, &locv);
250  y.ind.push_back( static_cast<IU>(key));
251  y.num.push_back(recvnumbuf[rdispls[locv]]); // nothing is processed yet
252 
253  if( (++(processed[locv])) < recvcnt[locv] )
254  sHeap.insert(recvindbuf[rdispls[locv]+processed[locv]], locv);
255  else
256  --hsize;
257  }
258 
259 // ofstream oput;
260 // y.commGrid->OpenDebugFile("Merge", oput);
261 // oput << "From displacements: "; copy(rdispls, rdispls+rowneighs, ostream_iterator<int>(oput, " ")); oput << endl;
262 // oput << "From counts: "; copy(recvcnt, recvcnt+rowneighs, ostream_iterator<int>(oput, " ")); oput << endl;
263  while(hsize > 0)
264  {
265  sHeap.deleteMin(&key, &locv);
266  IU deref = rdispls[locv] + processed[locv];
267  if(y.ind.back() != static_cast<IU>(key)) // y.ind is surely not empty
268  {
269  y.ind.push_back(static_cast<IU>(key));
270  y.num.push_back(recvnumbuf[deref]);
271  }
272 
273  if( (++(processed[locv])) < recvcnt[locv] )
274  sHeap.insert(recvindbuf[rdispls[locv]+processed[locv]], locv);
275  else
276  --hsize;
277  }
278  DeleteAll(recvcnt, rdispls,processed);
279  DeleteAll(recvindbuf, recvnumbuf);
280 #endif
281 
282 #ifdef TIMING
283  double t1=MPI_Wtime();
284  cblas_mergeconttime += (t1-t0);
285 #endif
286 }
287 
294 template <typename VT, typename IT, typename UDER>
296 {
297  CheckSpMVCompliance(A,x);
298  optbuf.MarkEmpty();
299 
300  MPI_Comm World = x.commGrid->GetWorld();
301  MPI_Comm ColWorld = x.commGrid->GetColWorld();
302  MPI_Comm RowWorld = x.commGrid->GetRowWorld();
303 
304  int accnz;
305  int32_t trxlocnz;
306  IT lenuntil;
307  int32_t *trxinds, *indacc;
308  VT *trxnums, *numacc;
309 
310 #ifdef TIMING
311  double t0=MPI_Wtime();
312 #endif
313  TransposeVector(World, x, trxlocnz, lenuntil, trxinds, trxnums, true); // trxinds (and potentially trxnums) is allocated
314 #ifdef TIMING
315  double t1=MPI_Wtime();
316  cblas_transvectime += (t1-t0);
317 #endif
318  AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, indacc, numacc, accnz, true); // trxinds (and potentially trxnums) is deallocated, indacc/numacc allocated
319 
320  FullyDistSpVec<IT, VT> y ( x.commGrid, A.getnrow()); // identity doesn't matter for sparse vectors
321  int rowneighs; MPI_Comm_size(RowWorld,&rowneighs);
322  int * sendcnt = new int[rowneighs]();
323 
324  LocalSpMV(A, rowneighs, optbuf, indacc, numacc, sendcnt, accnz); // indacc/numacc deallocated
325 
326  int * rdispls = new int[rowneighs];
327  int * recvcnt = new int[rowneighs];
328  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld); // share the request counts
329 
330  // receive displacements are exact whereas send displacements have slack
331  rdispls[0] = 0;
332  for(int i=0; i<rowneighs-1; ++i)
333  {
334  rdispls[i+1] = rdispls[i] + recvcnt[i];
335  }
336  int totrecv = accumulate(recvcnt,recvcnt+rowneighs,0);
337  int32_t * recvindbuf = new int32_t[totrecv];
338  VT * recvnumbuf = new VT[totrecv];
339 
340 #ifdef TIMING
341  double t2=MPI_Wtime();
342 #endif
343  if(optbuf.totmax > 0 ) // graph500 optimization enabled
344  {
345  MPI_Alltoallv(optbuf.inds, sendcnt, optbuf.dspls, MPIType<int32_t>(), recvindbuf, recvcnt, rdispls, MPIType<int32_t>(), RowWorld);
346  MPI_Alltoallv(optbuf.nums, sendcnt, optbuf.dspls, MPIType<VT>(), recvnumbuf, recvcnt, rdispls, MPIType<VT>(), RowWorld);
347  delete [] sendcnt;
348  }
349  else
350  {
351  SpParHelper::Print("BFS only (no semiring) function only work with optimization buffers\n");
352  }
353 #ifdef TIMING
354  double t3=MPI_Wtime();
355  cblas_alltoalltime += (t3-t2);
356 #endif
357 
358  MergeContributions(y,recvcnt, rdispls, recvindbuf, recvnumbuf, rowneighs);
359  return y;
360 }
361 
362 #endif