1 #ifndef _BFS_FRIENDS_H_
2 #define _BFS_FRIENDS_H_
17 template <
class IT,
class NT,
class DER>
29 template <
typename IT,
typename VT>
31 int32_t * sendindbuf, VT * sendnumbuf,
int * cnts,
int * dspls,
int p_c)
33 if(A.
getnnz() > 0 && nnzx > 0)
38 vector< vector<int32_t> > indy(splits);
39 vector< vector< VT > > numy(splits);
40 int32_t nlocrows =
static_cast<int32_t
>(A.
getnrow());
41 int32_t perpiece = nlocrows / splits;
44 #pragma omp parallel for
46 for(
int i=0; i<splits; ++i)
54 int32_t perproc = nlocrows / p_c;
55 int32_t last_rec = p_c-1;
59 vector<int32_t> end_recs(splits);
60 for(
int i=0; i<splits; ++i)
65 end_recs[i] = min(indy[i].back() / perproc, last_rec);
68 int ** loc_rec_cnts =
new int *[splits];
70 #pragma omp parallel for
72 for(
int i=0; i<splits; ++i)
74 loc_rec_cnts[i] =
new int[p_c]();
77 int32_t cur_rec = min( indy[i].front() / perproc, last_rec);
78 int32_t lastdata = (cur_rec+1) * perproc;
79 for(
typename vector<int32_t>::iterator it = indy[i].begin(); it != indy[i].end(); ++it)
81 if( ( (*it) >= lastdata ) && cur_rec != last_rec)
83 cur_rec = min( (*it) / perproc, last_rec);
84 lastdata = (cur_rec+1) * perproc;
86 ++loc_rec_cnts[i][cur_rec];
91 #pragma omp parallel for
93 for(
int i=0; i<splits; ++i)
99 int32_t beg_rec = min( indy[i].front() / perproc, last_rec);
100 int32_t alreadysent = 0;
101 for(
int before = i-1; before >= 0; before--)
102 alreadysent += loc_rec_cnts[before][beg_rec];
104 if(beg_rec == end_recs[i])
106 transform(indy[i].begin(), indy[i].end(), indy[i].begin(), bind2nd(minus<int32_t>(), perproc*beg_rec));
107 copy(indy[i].begin(), indy[i].end(), sendindbuf + dspls[beg_rec] + alreadysent);
108 copy(numy[i].begin(), numy[i].end(), sendnumbuf + dspls[beg_rec] + alreadysent);
112 int32_t cur_rec = beg_rec;
113 int32_t lastdata = (cur_rec+1) * perproc;
114 for(
typename vector<int32_t>::iterator it = indy[i].begin(); it != indy[i].end(); ++it)
116 if( ( (*it) >= lastdata ) && cur_rec != last_rec )
118 cur_rec = min( (*it) / perproc, last_rec);
119 lastdata = (cur_rec+1) * perproc;
125 sendindbuf[ dspls[cur_rec] + alreadysent ] = (*it) - perproc*cur_rec;
126 sendnumbuf[ dspls[cur_rec] + (alreadysent++) ] = *(numy[i].begin() + (it-indy[i].begin()));
132 for(
int i=0; i< splits; ++i)
134 for(
int j=0; j< p_c; ++j)
135 cnts[j] += loc_rec_cnts[i][j];
136 delete [] loc_rec_cnts[i];
138 delete [] loc_rec_cnts;
142 cout <<
"Something is wrong, splits should be nonzero for multithreaded execution" << endl;
153 template<
typename VT,
typename IT,
typename UDER>
158 double t0=MPI_Wtime();
162 if(A.spSeq->getnsplit() > 0)
180 SpParHelper::Print(
"BFS only (no semiring) function only work with optimization buffers\n");
184 double t1=MPI_Wtime();
190 template <
typename IU,
typename VT>
194 double t0=MPI_Wtime();
197 vector<IU>().swap(y.ind);
198 vector<VT>().swap(y.num);
201 IU ysize = y.MyLocLength();
202 bool * isthere =
new bool[ysize];
203 vector< pair<IU,VT> > ts_pairs;
204 fill_n(isthere, ysize,
false);
208 for(
int i=0; i<rowneighs; ++i)
210 for(
int j=0; j< recvcnt[i]; ++j)
212 int32_t index = recvindbuf[rdispls[i] + j];
214 ts_pairs.push_back(make_pair(index, recvnumbuf[rdispls[i] + j]));
219 DeleteAll(isthere, recvindbuf, recvnumbuf);
220 __gnu_parallel::sort(ts_pairs.begin(), ts_pairs.end());
221 int nnzy = ts_pairs.size();
224 for(
int i=0; i< nnzy; ++i)
226 y.ind[i] = ts_pairs[i].first;
227 y.num[i] = ts_pairs[i].second;
233 int32_t inf = numeric_limits<int32_t>::min();
234 int32_t sup = numeric_limits<int32_t>::max();
235 KNHeap< int32_t, int32_t > sHeap(sup, inf);
236 int * processed =
new int[rowneighs]();
237 for(int32_t i=0; i<rowneighs; ++i)
242 sHeap.insert(recvindbuf[rdispls[i]], i);
249 sHeap.deleteMin(&key, &locv);
250 y.ind.push_back( static_cast<IU>(key));
251 y.num.push_back(recvnumbuf[rdispls[locv]]);
253 if( (++(processed[locv])) < recvcnt[locv] )
254 sHeap.insert(recvindbuf[rdispls[locv]+processed[locv]], locv);
265 sHeap.deleteMin(&key, &locv);
266 IU deref = rdispls[locv] + processed[locv];
267 if(y.ind.back() !=
static_cast<IU
>(key))
269 y.ind.push_back(static_cast<IU>(key));
270 y.num.push_back(recvnumbuf[deref]);
273 if( (++(processed[locv])) < recvcnt[locv] )
274 sHeap.insert(recvindbuf[rdispls[locv]+processed[locv]], locv);
283 double t1=MPI_Wtime();
294 template <
typename VT,
typename IT,
typename UDER>
300 MPI_Comm World = x.commGrid->GetWorld();
301 MPI_Comm ColWorld = x.commGrid->GetColWorld();
302 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
307 int32_t *trxinds, *indacc;
308 VT *trxnums, *numacc;
311 double t0=MPI_Wtime();
315 double t1=MPI_Wtime();
318 AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, indacc, numacc, accnz,
true);
321 int rowneighs; MPI_Comm_size(RowWorld,&rowneighs);
322 int * sendcnt =
new int[rowneighs]();
324 LocalSpMV(A, rowneighs, optbuf, indacc, numacc, sendcnt, accnz);
326 int * rdispls =
new int[rowneighs];
327 int * recvcnt =
new int[rowneighs];
328 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld);
332 for(
int i=0; i<rowneighs-1; ++i)
334 rdispls[i+1] = rdispls[i] + recvcnt[i];
336 int totrecv = accumulate(recvcnt,recvcnt+rowneighs,0);
337 int32_t * recvindbuf =
new int32_t[totrecv];
338 VT * recvnumbuf =
new VT[totrecv];
341 double t2=MPI_Wtime();
346 MPI_Alltoallv(optbuf.
nums, sendcnt, optbuf.
dspls, MPIType<VT>(), recvnumbuf, recvcnt, rdispls, MPIType<VT>(), RowWorld);
351 SpParHelper::Print(
"BFS only (no semiring) function only work with optimization buffers\n");
354 double t3=MPI_Wtime();