COMBINATORIAL_BLAS  1.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SpParMat.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.2 -------------------------------------------------*/
4 /* date: 10/06/2011 --------------------------------------------*/
5 /* authors: Aydin Buluc (abuluc@lbl.gov), Adam Lugowski --------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2011, Aydin Buluc
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 #include "SpParMat.h"
30 #include "ParFriends.h"
31 #include "Operations.h"
32 #include "FileHeader.h"
33 
34 #include <fstream>
35 #include <algorithm>
36 #include <stdexcept>
37 using namespace std;
38 
42 template <class IT, class NT, class DER>
43 SpParMat< IT,NT,DER >::SpParMat (ifstream & input, MPI_Comm & world)
44 {
45  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
46  if(!input.is_open())
47  {
48  perror("Input file doesn't exist\n");
49  exit(-1);
50  }
51  commGrid.reset(new CommGrid(world, 0, 0));
52  input >> (*spSeq);
53 }
54 
55 template <class IT, class NT, class DER>
56 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, MPI_Comm & world): spSeq(myseq)
57 {
58  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
59  commGrid.reset(new CommGrid(world, 0, 0));
60 }
61 
62 template <class IT, class NT, class DER>
63 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, shared_ptr<CommGrid> grid): spSeq(myseq)
64 {
65  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
66  commGrid = grid;
67 }
68 
69 template <class IT, class NT, class DER>
70 SpParMat< IT,NT,DER >::SpParMat (shared_ptr<CommGrid> grid)
71 {
72  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
73  spSeq = new DER();
74  commGrid = grid;
75 }
76 
81 template <class IT, class NT, class DER>
83 {
84 
85  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
86  spSeq = new DER();
87  commGrid.reset(new CommGrid(MPI_COMM_WORLD, 0, 0));
88 }
89 
90 template <class IT, class NT, class DER>
92 {
93  if(spSeq != NULL) delete spSeq;
94 }
95 
96 template <class IT, class NT, class DER>
98 {
99  if(spSeq != NULL) delete spSeq;
100  spSeq = NULL;
101 }
102 
103 
104 template <class IT, class NT, class DER>
105 void SpParMat< IT,NT,DER >::Dump(string filename) const
106 {
107  MPI_Comm World = commGrid->GetWorld();
108  int rank = commGrid->GetRank;
109  int nprocs = commGrid->GetSize();
110 
111  MPI_File thefile;
112  MPI_File_open(World, filename.c_str(), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
113 
114  int rankinrow = commGrid->GetRankInProcRow();
115  int rankincol = commGrid->GetRankInProcCol();
116  int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
117  int colneighs = commGrid->GetGridRows();
118 
119  IT * colcnts = new IT[rowneighs];
120  IT * rowcnts = new IT[colneighs];
121  rowcnts[rankincol] = getlocalrows();
122  colcnts[rankinrow] = getlocalcols();
123 
124  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), colcnts, 1, MPIType<IT>(), commGrid->GetRowWorld());
125  IT coloffset = accumulate(colcnts, colcnts+rankinrow, static_cast<IT>(0));
126 
127  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), rowcnts, 1, MPIType<IT>(), commGrid->GetColWorld());
128  IT rowoffset = accumulate(rowcnts, rowcnts+rankincol, static_cast<IT>(0));
129  DeleteAll(colcnts, rowcnts);
130 
131  IT * prelens = new IT[nprocs];
132  prelens[rank] = 2*getlocalnnz();
133  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
134  IT lengthuntil = accumulate(prelens, prelens+rank, static_cast<IT>(0));
135 
136  // The disp displacement argument specifies the position
137  // (absolute offset in bytes from the beginning of the file)
138  MPI_Offset disp = lengthuntil * sizeof(uint32_t);
139  MPI_File_set_view(thefile, disp, MPI_UNSIGNED, MPI_UNSIGNED, "native", MPI_INFO_NULL);
140  uint32_t * gen_edges = new uint32_t[prelens[rank]];
141 
142  IT k = 0;
143  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
144  {
145  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
146  {
147  gen_edges[k++] = (uint32_t) (nzit.rowid() + rowoffset);
148  gen_edges[k++] = (uint32_t) (colit.colid() + coloffset);
149  }
150  }
151  assert(k == prelens[rank]);
152  MPI_File_write(thefile, gen_edges, prelens[rank], MPI_UNSIGNED, NULL);
153  MPI_File_close(&thefile);
154 
155  delete [] prelens;
156  delete [] gen_edges;
157 }
158 
159 
160 template <class IT, class NT, class DER>
162 {
163  if(rhs.spSeq != NULL)
164  spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
165 
166  commGrid = rhs.commGrid;
167 }
168 
169 template <class IT, class NT, class DER>
171 {
172  if(this != &rhs)
173  {
176  if(spSeq != NULL) delete spSeq;
177  if(rhs.spSeq != NULL)
178  spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
179 
180  commGrid = rhs.commGrid;
181  }
182  return *this;
183 }
184 
185 template <class IT, class NT, class DER>
187 {
188  if(this != &rhs)
189  {
190  if(*commGrid == *rhs.commGrid)
191  {
192  (*spSeq) += (*(rhs.spSeq));
193  }
194  else
195  {
196  cout << "Grids are not comparable for parallel addition (A+B)" << endl;
197  }
198  }
199  else
200  {
201  cout<< "Missing feature (A+A): Use multiply with 2 instead !"<<endl;
202  }
203  return *this;
204 }
205 
206 template <class IT, class NT, class DER>
208 {
209  IT totnnz = getnnz(); // collective call
210  IT maxnnz = 0;
211  IT localnnz = (IT) spSeq->getnnz();
212  MPI_Allreduce( &localnnz, &maxnnz, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
213  if(totnnz == 0) return 1;
214  return static_cast<float>((commGrid->GetSize() * maxnnz)) / static_cast<float>(totnnz);
215 }
216 
217 template <class IT, class NT, class DER>
219 {
220  IT totalnnz = 0;
221  IT localnnz = spSeq->getnnz();
222  MPI_Allreduce( &localnnz, &totalnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
223  return totalnnz;
224 }
225 
226 template <class IT, class NT, class DER>
228 {
229  IT totalrows = 0;
230  IT localrows = spSeq->getnrow();
231  MPI_Allreduce( &localrows, &totalrows, 1, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
232  return totalrows;
233 }
234 
235 template <class IT, class NT, class DER>
237 {
238  IT totalcols = 0;
239  IT localcols = spSeq->getncol();
240  MPI_Allreduce( &localcols, &totalcols, 1, MPIType<IT>(), MPI_SUM, commGrid->GetRowWorld());
241  return totalcols;
242 }
243 
244 template <class IT, class NT, class DER>
245 template <typename _BinaryOperation>
246 void SpParMat<IT,NT,DER>::DimApply(Dim dim, const FullyDistVec<IT, NT>& x, _BinaryOperation __binary_op)
247 {
248 
249  if(!(*commGrid == *(x.commGrid)))
250  {
251  cout << "Grids are not comparable for SpParMat::DimApply" << endl;
252  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
253  }
254 
255  MPI_Comm World = x.commGrid->GetWorld();
256  MPI_Comm ColWorld = x.commGrid->GetColWorld();
257  MPI_Comm RowWorld = x.commGrid->GetRowWorld();
258  switch(dim)
259  {
260  case Column: // scale each column
261  {
262  int xsize = (int) x.LocArrSize();
263  int trxsize = 0;
264  int diagneigh = x.commGrid->GetComplementRank();
265  MPI_Status status;
266  MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
267 
268  NT * trxnums = new NT[trxsize];
269  MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
270 
271  int colneighs, colrank;
272  MPI_Comm_size(ColWorld, &colneighs);
273  MPI_Comm_rank(ColWorld, &colrank);
274  int * colsize = new int[colneighs];
275  colsize[colrank] = trxsize;
276 
277  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
278  int * dpls = new int[colneighs](); // displacements (zero initialized pid)
279  std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
280  int accsize = std::accumulate(colsize, colsize+colneighs, 0);
281  NT * scaler = new NT[accsize];
282 
283  MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), scaler, colsize, dpls, MPIType<NT>(), ColWorld);
284  DeleteAll(trxnums,colsize, dpls);
285 
286  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
287  {
288  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
289  {
290  nzit.value() = __binary_op(nzit.value(), scaler[colit.colid()]);
291  }
292  }
293  delete [] scaler;
294  break;
295  }
296  case Row:
297  {
298  int xsize = (int) x.LocArrSize();
299  int rowneighs, rowrank;
300  MPI_Comm_size(RowWorld, &rowneighs);
301  MPI_Comm_rank(RowWorld, &rowrank);
302  int * rowsize = new int[rowneighs];
303  rowsize[rowrank] = xsize;
304  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rowsize, 1, MPI_INT, RowWorld);
305  int * dpls = new int[rowneighs](); // displacements (zero initialized pid)
306  std::partial_sum(rowsize, rowsize+rowneighs-1, dpls+1);
307  int accsize = std::accumulate(rowsize, rowsize+rowneighs, 0);
308  NT * scaler = new NT[accsize];
309 
310  MPI_Allgatherv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), scaler, rowsize, dpls, MPIType<NT>(), RowWorld);
311  DeleteAll(rowsize, dpls);
312 
313  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
314  {
315  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
316  {
317  nzit.value() = __binary_op(nzit.value(), scaler[nzit.rowid()]);
318  }
319  }
320  delete [] scaler;
321  break;
322  }
323  default:
324  {
325  cout << "Unknown scaling dimension, returning..." << endl;
326  break;
327  }
328  }
329 }
330 
331 template <class IT, class NT, class DER>
332 template <typename _BinaryOperation, typename _UnaryOperation >
333 DenseParVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
334 {
335  DenseParVec<IT,NT> parvec(commGrid, id);
336  Reduce(parvec, dim, __binary_op, id, __unary_op);
337  return parvec;
338 }
339 
340 // default template arguments don't work with function templates
341 template <class IT, class NT, class DER>
342 template <typename _BinaryOperation>
343 DenseParVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id) const
344 {
345  DenseParVec<IT,NT> parvec(commGrid, id);
346  Reduce(parvec, dim, __binary_op, id, myidentity<NT>() );
347  return parvec;
348 }
349 
350 template <class IT, class NT, class DER>
351 template <typename VT, typename _BinaryOperation>
352 void SpParMat<IT,NT,DER>::Reduce(DenseParVec<IT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id) const
353 {
354  Reduce(rvec, dim, __binary_op, id, myidentity<NT>() );
355 }
356 
357 template <class IT, class NT, class DER>
358 template <typename VT, typename GIT, typename _BinaryOperation>
359 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id) const
360 {
361  Reduce(rvec, dim, __binary_op, id, myidentity<NT>() );
362 }
363 
364 template <class IT, class NT, class DER>
365 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
366 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op) const
367 {
368  if(*rvec.commGrid != *commGrid)
369  {
370  SpParHelper::Print("Grids are not comparable, SpParMat::Reduce() fails !");
371  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
372  }
373  switch(dim)
374  {
375  case Column: // pack along the columns, result is a vector of size n
376  {
377  // We can't use rvec's distribution (rows first, columns later) here
378  IT n_thiscol = getlocalcols(); // length assigned to this processor column
379  int colneighs = commGrid->GetGridRows(); // including oneself
380  int colrank = commGrid->GetRankInProcCol();
381 
382  GIT * loclens = new GIT[colneighs];
383  GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
384 
385  GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
386  if(colrank == colneighs-1)
387  loclens[colrank] = n_thiscol - (n_perproc*colrank);
388  else
389  loclens[colrank] = n_perproc;
390 
391  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
392  partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
393 
394  vector<VT> trarr;
395  typename DER::SpColIter colit = spSeq->begcol();
396  for(int i=0; i< colneighs; ++i)
397  {
398  VT * sendbuf = new VT[loclens[i]];
399  fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
400 
401  for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
402  {
403  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit) // all nonzeros in this column
404  {
405  sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
406  }
407  }
408  VT * recvbuf = NULL;
409  if(colrank == i)
410  {
411  trarr.resize(loclens[i]);
412  recvbuf = SpHelper::p2a(trarr);
413  }
414  MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), i, commGrid->GetColWorld()); // root = i
415  delete [] sendbuf;
416  }
417  DeleteAll(loclens, lensums);
418 
419  GIT reallen; // Now we have to transpose the vector
420  GIT trlen = trarr.size();
421  int diagneigh = commGrid->GetComplementRank();
422  MPI_Status status;
423  MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
424 
425  rvec.arr.resize(reallen);
426  MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
427  rvec.glen = getncol(); // ABAB: Put a sanity check here
428  break;
429 
430  }
431  case Row: // pack along the rows, result is a vector of size m
432  {
433  rvec.glen = getnrow();
434  int rowneighs = commGrid->GetGridCols();
435  int rowrank = commGrid->GetRankInProcRow();
436  GIT * loclens = new GIT[rowneighs];
437  GIT * lensums = new GIT[rowneighs+1](); // begin/end points of local lengths
438  loclens[rowrank] = rvec.MyLocLength();
439  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetRowWorld());
440  partial_sum(loclens, loclens+rowneighs, lensums+1);
441  try
442  {
443  rvec.arr.resize(loclens[rowrank], id);
444 
445  // keeping track of all nonzero iterators within columns at once is unscalable w.r.t. memory (due to sqrt(p) scaling)
446  // thus we'll do batches of column as opposed to all columns at once. 5 million columns take 80MB (two pointers per column)
447  #define MAXCOLUMNBATCH 5 * 1024 * 1024
448  typename DER::SpColIter begfinger = spSeq->begcol(); // beginning finger to columns
449 
450  // Each processor on the same processor row should execute the SAME number of reduce calls
451  int numreducecalls = (int) ceil(static_cast<float>(spSeq->getnzc()) / static_cast<float>(MAXCOLUMNBATCH));
452  int maxreducecalls;
453  MPI_Allreduce( &numreducecalls, &maxreducecalls, 1, MPI_INT, MPI_MAX, commGrid->GetRowWorld());
454 
455  for(int k=0; k< maxreducecalls; ++k)
456  {
457  vector<typename DER::SpColIter::NzIter> nziters;
458  typename DER::SpColIter curfinger = begfinger;
459  for(; curfinger != spSeq->endcol() && nziters.size() < MAXCOLUMNBATCH ; ++curfinger)
460  {
461  nziters.push_back(spSeq->begnz(curfinger));
462  }
463  for(int i=0; i< rowneighs; ++i) // step by step to save memory
464  {
465  VT * sendbuf = new VT[loclens[i]];
466  fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
467 
468  typename DER::SpColIter colit = begfinger;
469  IT colcnt = 0; // "processed column" counter
470  for(; colit != curfinger; ++colit, ++colcnt) // iterate over this batch of columns until curfinger
471  {
472  typename DER::SpColIter::NzIter nzit = nziters[colcnt];
473  for(; nzit != spSeq->endnz(colit) && nzit.rowid() < lensums[i+1]; ++nzit) // a portion of nonzeros in this column
474  {
475  sendbuf[nzit.rowid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[nzit.rowid()-lensums[i]]);
476  }
477  nziters[colcnt] = nzit; // set the new finger
478  }
479 
480  VT * recvbuf = NULL;
481  if(rowrank == i)
482  {
483  for(int j=0; j< loclens[i]; ++j)
484  {
485  sendbuf[j] = __binary_op(rvec.arr[j], sendbuf[j]); // rvec.arr will be overriden with MPI_Reduce, save its contents
486  }
487  recvbuf = SpHelper::p2a(rvec.arr);
488  }
489  MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), i, commGrid->GetRowWorld()); // root = i
490  delete [] sendbuf;
491  }
492  begfinger = curfinger; // set the next begfilter
493  }
494  DeleteAll(loclens, lensums);
495  }
496  catch (length_error& le)
497  {
498  cerr << "Length error: " << le.what() << endl;
499  }
500  break;
501  }
502  default:
503  {
504  cout << "Unknown reduction dimension, returning empty vector" << endl;
505  break;
506  }
507  }
508 }
509 
517 template <class IT, class NT, class DER>
518 template <typename VT, typename _BinaryOperation, typename _UnaryOperation>
519 void SpParMat<IT,NT,DER>::Reduce(DenseParVec<IT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op) const
520 {
521  if(rvec.zero != id)
522  {
523  ostringstream outs;
524  outs << "SpParMat::Reduce(): Return vector's zero is different than set id" << endl;
525  outs << "Setting rvec.zero to id (" << id << ") instead" << endl;
526  SpParHelper::Print(outs.str());
527  rvec.zero = id;
528  }
529  if(*rvec.commGrid != *commGrid)
530  {
531  SpParHelper::Print("Grids are not comparable, SpParMat::Reduce() fails !");
532  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
533  }
534  switch(dim)
535  {
536  case Column: // pack along the columns, result is a vector of size n
537  {
538  VT * sendbuf = new VT[getlocalcols()];
539  fill(sendbuf, sendbuf+getlocalcols(), id); // fill with identity
540 
541  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
542  {
543  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
544  {
545  sendbuf[colit.colid()] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()]);
546  }
547  }
548  VT * recvbuf = NULL;
549  int root = commGrid->GetDiagOfProcCol();
550  if(rvec.diagonal)
551  {
552  rvec.arr.resize(getlocalcols());
553  recvbuf = SpHelper::p2a(rvec.arr);
554  }
555  MPI_Reduce(sendbuf, recvbuf, getlocalcols(), MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), root, commGrid->GetColWorld());
556  delete [] sendbuf;
557  break;
558  }
559  case Row: // pack along the rows, result is a vector of size m
560  {
561  VT * sendbuf = new VT[getlocalrows()];
562  fill(sendbuf, sendbuf+getlocalrows(), id); // fill with identity
563 
564  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
565  {
566  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
567  {
568  sendbuf[nzit.rowid()] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[nzit.rowid()]);
569  }
570  }
571  VT * recvbuf = NULL;
572  int root = commGrid->GetDiagOfProcRow();
573  if(rvec.diagonal)
574  {
575  rvec.arr.resize(getlocalrows());
576  recvbuf = SpHelper::p2a(rvec.arr);
577  }
578  MPI_Reduce(sendbuf, recvbuf, getlocalrows(), MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), root, commGrid->GetRowWorld());
579  delete [] sendbuf;
580  break;
581  }
582  default:
583  {
584  cout << "Unknown reduction dimension, returning empty vector" << endl;
585  break;
586  }
587  }
588 }
589 
590 template <class IT, class NT, class DER>
591 template <typename NNT,typename NDER>
593 {
594  NDER * convert = new NDER(*spSeq);
595  return SpParMat<IT,NNT,NDER> (convert, commGrid);
596 }
597 
599 template <class IT, class NT, class DER>
600 template <typename NIT, typename NNT,typename NDER>
602 {
603  NDER * convert = new NDER(*spSeq);
604  return SpParMat<NIT,NNT,NDER> (convert, commGrid);
605 }
606 
611 template <class IT, class NT, class DER>
613 {
614  vector<IT> ri;
615  DER * tempseq = new DER((*spSeq)(ri, ci));
616  return SpParMat<IT,NT,DER> (tempseq, commGrid);
617 }
618 
626 template <class IT, class NT, class DER>
627 template <typename PTNTBOOL, typename PTBOOLNT>
629 {
630  // infer the concrete type SpMat<IT,IT>
631  typedef typename create_trait<DER, IT, bool>::T_inferred DER_IT;
632 
633  if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
634  {
635  SpParHelper::Print("Grids are not comparable, SpRef fails !");
636  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
637  }
638 
639  // Safety check
640  IT locmax_ri = 0;
641  IT locmax_ci = 0;
642  if(!ri.arr.empty())
643  locmax_ri = *max_element(ri.arr.begin(), ri.arr.end());
644  if(!ci.arr.empty())
645  locmax_ci = *max_element(ci.arr.begin(), ci.arr.end());
646 
647  IT total_m = getnrow();
648  IT total_n = getncol();
649  if(locmax_ri > total_m || locmax_ci > total_n)
650  {
651  throw outofrangeexception();
652  }
653 
654  // The indices for FullyDistVec are offset'd to 1/p pieces
655  // The matrix indices are offset'd to 1/sqrt(p) pieces
656  // Add the corresponding offset before sending the data
657  IT roffset = ri.RowLenUntil();
658  IT rrowlen = ri.MyRowLength();
659  IT coffset = ci.RowLenUntil();
660  IT crowlen = ci.MyRowLength();
661 
662  // We create two boolean matrices P and Q
663  // Dimensions: P is size(ri) x m
664  // Q is n x size(ci)
665  // Range(ri) = {0,...,m-1}
666  // Range(ci) = {0,...,n-1}
667 
668  IT rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
669  IT totalm = getnrow(); // collective call
670  IT totaln = getncol();
671  IT m_perproccol = totalm / rowneighs;
672  IT n_perproccol = totaln / rowneighs;
673 
674  // Get the right local dimensions
675  IT diagneigh = commGrid->GetComplementRank();
676  IT mylocalrows = getlocalrows();
677  IT mylocalcols = getlocalcols();
678  IT trlocalrows;
679  MPI_Status status;
680  MPI_Sendrecv(&mylocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, &trlocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, commGrid->GetWorld(), &status);
681  // we don't need trlocalcols because Q.Transpose() will take care of it
682 
683  vector< vector<IT> > rowid(rowneighs); // reuse for P and Q
684  vector< vector<IT> > colid(rowneighs);
685 
686  // Step 1: Create P
687  IT locvec = ri.arr.size(); // nnz in local vector
688  for(typename vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
689  {
690  // numerical values (permutation indices) are 0-based
691  // recipient alone progessor row
692  IT rowrec = (m_perproccol!=0) ? std::min(ri.arr[i] / m_perproccol, rowneighs-1) : (rowneighs-1);
693 
694  // ri's numerical values give the colids and its local indices give rowids
695  rowid[rowrec].push_back( i + roffset);
696  colid[rowrec].push_back(ri.arr[i] - (rowrec * m_perproccol));
697  }
698 
699  int * sendcnt = new int[rowneighs]; // reuse in Q as well
700  int * recvcnt = new int[rowneighs];
701  for(IT i=0; i<rowneighs; ++i)
702  sendcnt[i] = rowid[i].size();
703 
704  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
705  int * sdispls = new int[rowneighs]();
706  int * rdispls = new int[rowneighs]();
707  partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
708  partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
709  IT p_nnz = accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
710 
711  // create space for incoming data ...
712  IT * p_rows = new IT[p_nnz];
713  IT * p_cols = new IT[p_nnz];
714  IT * senddata = new IT[locvec]; // re-used for both rows and columns
715  for(int i=0; i<rowneighs; ++i)
716  {
717  copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
718  vector<IT>().swap(rowid[i]); // clear memory of rowid
719  }
720  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
721 
722  for(int i=0; i<rowneighs; ++i)
723  {
724  copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
725  vector<IT>().swap(colid[i]); // clear memory of colid
726  }
727  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
728  delete [] senddata;
729 
730  tuple<IT,IT,bool> * p_tuples = new tuple<IT,IT,bool>[p_nnz];
731  for(IT i=0; i< p_nnz; ++i)
732  {
733  p_tuples[i] = make_tuple(p_rows[i], p_cols[i], 1);
734  }
735  DeleteAll(p_rows, p_cols);
736 
737  DER_IT * PSeq = new DER_IT();
738  PSeq->Create( p_nnz, rrowlen, trlocalrows, p_tuples); // deletion of tuples[] is handled by SpMat::Create
739 
741  if(&ri == &ci) // Symmetric permutation
742  {
743  DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
744  #ifdef SPREFDEBUG
745  SpParHelper::Print("Symmetric permutation\n");
746  #endif
747  SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
748  if(inplace)
749  {
750  #ifdef SPREFDEBUG
751  SpParHelper::Print("In place multiplication\n");
752  #endif
753  *this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this, false, true); // clear the memory of *this
754 
755  //ostringstream outb;
756  //outb << "P_after_" << commGrid->myrank;
757  //ofstream ofb(outb.str().c_str());
758  //P.put(ofb);
759 
760  P.Transpose();
761  *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, P, true, true); // clear the memory of both *this and P
762  return SpParMat<IT,NT,DER>(); // dummy return to match signature
763  }
764  else
765  {
766  PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P,*this);
767  P.Transpose();
768  return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, P);
769  }
770  }
771  else
772  {
773  // Intermediate step (to save memory): Form PA and store it in P
774  // Distributed matrix generation (collective call)
775  SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
776 
777  // Do parallel matrix-matrix multiply
778  PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this);
779  } // P is destructed here
780 #ifndef NDEBUG
781  PA.PrintInfo();
782 #endif
783  // Step 2: Create Q (use the same row-wise communication and transpose at the end)
784  // This temporary to-be-transposed Q is size(ci) x n
785  locvec = ci.arr.size(); // nnz in local vector (reset variable)
786  for(typename vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
787  {
788  // numerical values (permutation indices) are 0-based
789  IT rowrec = (n_perproccol!=0) ? std::min(ci.arr[i] / n_perproccol, rowneighs-1) : (rowneighs-1);
790 
791  // ri's numerical values give the colids and its local indices give rowids
792  rowid[rowrec].push_back( i + coffset);
793  colid[rowrec].push_back(ci.arr[i] - (rowrec * n_perproccol));
794  }
795 
796  for(IT i=0; i<rowneighs; ++i)
797  sendcnt[i] = rowid[i].size(); // update with new sizes
798 
799  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
800  fill(sdispls, sdispls+rowneighs, 0); // reset
801  fill(rdispls, rdispls+rowneighs, 0);
802  partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
803  partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
804  IT q_nnz = accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
805 
806  // create space for incoming data ...
807  IT * q_rows = new IT[q_nnz];
808  IT * q_cols = new IT[q_nnz];
809  senddata = new IT[locvec];
810  for(int i=0; i<rowneighs; ++i)
811  {
812  copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
813  vector<IT>().swap(rowid[i]); // clear memory of rowid
814  }
815  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
816 
817  for(int i=0; i<rowneighs; ++i)
818  {
819  copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
820  vector<IT>().swap(colid[i]); // clear memory of colid
821  }
822  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
823  DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
824 
825  tuple<IT,IT,bool> * q_tuples = new tuple<IT,IT,bool>[q_nnz];
826  for(IT i=0; i< q_nnz; ++i)
827  {
828  q_tuples[i] = make_tuple(q_rows[i], q_cols[i], 1);
829  }
830  DeleteAll(q_rows, q_cols);
831  DER_IT * QSeq = new DER_IT();
832  QSeq->Create( q_nnz, crowlen, mylocalcols, q_tuples); // Creating Q' instead
833 
834  // Step 3: Form PAQ
835  // Distributed matrix generation (collective call)
836  SpParMat<IT,bool,DER_IT> Q (QSeq, commGrid);
837  Q.Transpose();
838  if(inplace)
839  {
840  *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q, true, true); // clear the memory of both PA and P
841  return SpParMat<IT,NT,DER>(); // dummy return to match signature
842  }
843  else
844  {
845  return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q);
846  }
847 }
848 
849 
850 template <class IT, class NT, class DER>
852 {
853  typedef PlusTimesSRing<NT, NT> PTRing;
854 
855  if((*(ri.commGrid) != *(B.commGrid)) || (*(ci.commGrid) != *(B.commGrid)))
856  {
857  SpParHelper::Print("Grids are not comparable, SpAsgn fails !");
858  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
859  }
860  IT total_m_A = getnrow();
861  IT total_n_A = getncol();
862  IT total_m_B = B.getnrow();
863  IT total_n_B = B.getncol();
864 
865  if(total_m_B != ri.TotalLength())
866  {
867  SpParHelper::Print("First dimension of B does NOT match the length of ri, SpAsgn fails !");
868  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
869  }
870  if(total_n_B != ci.TotalLength())
871  {
872  SpParHelper::Print("Second dimension of B does NOT match the length of ci, SpAsgn fails !");
873  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
874  }
875  Prune(ri, ci); // make a hole
876 
877  // embed B to the size of A
879  rvec->iota(total_m_B, 0); // sparse() expects a zero based index
880 
881  SpParMat<IT,NT,DER> R(total_m_A, total_m_B, ri, *rvec, 1);
882  delete rvec; // free memory
883  SpParMat<IT,NT,DER> RB = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(R, B, true, false); // clear memory of R but not B
884 
886  qvec->iota(total_n_B, 0);
887  SpParMat<IT,NT,DER> Q(total_n_B, total_n_A, *qvec, ci, 1);
888  delete qvec; // free memory
889  SpParMat<IT,NT,DER> RBQ = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(RB, Q, true, true); // clear memory of RB and Q
890  *this += RBQ; // extend-add
891 }
892 
893 template <class IT, class NT, class DER>
895 {
896  typedef PlusTimesSRing<NT, NT> PTRing;
897 
898  if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
899  {
900  SpParHelper::Print("Grids are not comparable, Prune fails !");
901  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
902  }
903 
904  // Safety check
905  IT locmax_ri = 0;
906  IT locmax_ci = 0;
907  if(!ri.arr.empty())
908  locmax_ri = *max_element(ri.arr.begin(), ri.arr.end());
909  if(!ci.arr.empty())
910  locmax_ci = *max_element(ci.arr.begin(), ci.arr.end());
911 
912  IT total_m = getnrow();
913  IT total_n = getncol();
914  if(locmax_ri > total_m || locmax_ci > total_n)
915  {
916  throw outofrangeexception();
917  }
918 
919  SpParMat<IT,NT,DER> S(total_m, total_m, ri, ri, 1);
920  SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(S, *this, true, false); // clear memory of S but not *this
921 
922  SpParMat<IT,NT,DER> T(total_n, total_n, ci, ci, 1);
923  SpParMat<IT,NT,DER> SAT = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(SA, T, true, true); // clear memory of SA and T
924  EWiseMult(SAT, true); // In-place EWiseMult with not(SAT)
925 }
926 
927 
928 // In-place version where rhs type is the same (no need for type promotion)
929 template <class IT, class NT, class DER>
931 {
932  if(*commGrid == *rhs.commGrid)
933  {
934  spSeq->EWiseMult(*(rhs.spSeq), exclude); // Dimension compatibility check performed by sequential function
935  }
936  else
937  {
938  cout << "Grids are not comparable, EWiseMult() fails !" << endl;
939  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
940  }
941 }
942 
943 
944 template <class IT, class NT, class DER>
946 {
947  if(*commGrid == *rhs.commGrid)
948  {
949  spSeq->EWiseScale(rhs.array, rhs.m, rhs.n); // Dimension compatibility check performed by sequential function
950  }
951  else
952  {
953  cout << "Grids are not comparable, EWiseScale() fails !" << endl;
954  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
955  }
956 }
957 
958 template <class IT, class NT, class DER>
959 template <typename _BinaryOperation>
960 void SpParMat<IT,NT,DER>::UpdateDense(DenseParMat<IT, NT> & rhs, _BinaryOperation __binary_op) const
961 {
962  if(*commGrid == *rhs.commGrid)
963  {
964  if(getlocalrows() == rhs.m && getlocalcols() == rhs.n)
965  {
966  spSeq->UpdateDense(rhs.array, __binary_op);
967  }
968  else
969  {
970  cout << "Matrices have different dimensions, UpdateDense() fails !" << endl;
971  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
972  }
973  }
974  else
975  {
976  cout << "Grids are not comparable, UpdateDense() fails !" << endl;
977  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
978  }
979 }
980 
981 template <class IT, class NT, class DER>
983 {
984  IT mm = getnrow();
985  IT nn = getncol();
986  IT nznz = getnnz();
987 
988  if (commGrid->myrank == 0)
989  cout << "As a whole: " << mm << " rows and "<< nn <<" columns and "<< nznz << " nonzeros" << endl;
990 
991 #ifdef DEBUG
992  IT allprocs = commGrid->grrows * commGrid->grcols;
993  for(IT i=0; i< allprocs; ++i)
994  {
995  if (commGrid->myrank == i)
996  {
997  cout << "Processor (" << commGrid->GetRankInProcRow() << "," << commGrid->GetRankInProcCol() << ")'s data: " << endl;
998  spSeq->PrintInfo();
999  }
1000  MPI_Barrier(commGrid->GetWorld());
1001  }
1002 #endif
1003 }
1004 
1005 template <class IT, class NT, class DER>
1007 {
1008  int local = static_cast<int>((*spSeq) == (*(rhs.spSeq)));
1009  int whole = 1;
1010  MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, commGrid->GetWorld());
1011  return static_cast<bool>(whole);
1012 }
1013 
1014 
1019 template <class IT, class NT, class DER>
1020 void SpParMat< IT,NT,DER >::SparseCommon(vector< vector < tuple<IT,IT,NT> > > & data, IT locsize, IT total_m, IT total_n)
1021 {
1022  int nprocs = commGrid->GetSize();
1023  int * sendcnt = new int[nprocs];
1024  int * recvcnt = new int[nprocs];
1025  for(int i=0; i<nprocs; ++i)
1026  sendcnt[i] = data[i].size(); // sizes are all the same
1027 
1028  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
1029  int * sdispls = new int[nprocs]();
1030  int * rdispls = new int[nprocs]();
1031  partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
1032  partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
1033 
1034  tuple<IT,IT,NT> * senddata = new tuple<IT,IT,NT>[locsize]; // re-used for both rows and columns
1035  for(int i=0; i<nprocs; ++i)
1036  {
1037  copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
1038  vector< tuple<IT,IT,NT> >().swap(data[i]); // clear memory
1039  }
1040  MPI_Datatype MPI_triple;
1041  MPI_Type_contiguous(sizeof(tuple<IT,IT,NT>), MPI_CHAR, &MPI_triple);
1042  MPI_Type_commit(&MPI_triple);
1043 
1044  IT totrecv = accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
1045  tuple<IT,IT,NT> * recvdata = new tuple<IT,IT,NT>[totrecv];
1046  MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_triple, recvdata, recvcnt, rdispls, MPI_triple, commGrid->GetWorld());
1047 
1048  DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
1049  MPI_Type_free(&MPI_triple);
1050 
1051  int r = commGrid->GetGridRows();
1052  int s = commGrid->GetGridCols();
1053  IT m_perproc = total_m / r;
1054  IT n_perproc = total_n / s;
1055  int myprocrow = commGrid->GetRankInProcCol();
1056  int myproccol = commGrid->GetRankInProcRow();
1057  IT locrows, loccols;
1058  if(myprocrow != r-1) locrows = m_perproc;
1059  else locrows = total_m - myprocrow * m_perproc;
1060  if(myproccol != s-1) loccols = n_perproc;
1061  else loccols = total_n - myproccol * n_perproc;
1062 
1063  SpTuples<IT,NT> A(totrecv, locrows, loccols, recvdata); // It is ~SpTuples's job to deallocate
1064  spSeq = new DER(A,false); // Convert SpTuples to DER
1065 }
1066 
1067 
1069 template <class IT, class NT, class DER>
1070 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
1071  const FullyDistVec<IT,IT> & distcols, const FullyDistVec<IT,NT> & distvals)
1072 {
1073  if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
1074  {
1075  SpParHelper::Print("Grids are not comparable, Sparse() fails !");
1076  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1077  }
1078  if((distrows.TotalLength() != distcols.TotalLength()) || (distcols.TotalLength() != distvals.TotalLength()))
1079  {
1080  SpParHelper::Print("Vectors have different sizes, Sparse() fails !");
1081  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1082  }
1083 
1084  commGrid = distrows.commGrid;
1085  int nprocs = commGrid->GetSize();
1086  vector< vector < tuple<IT,IT,NT> > > data(nprocs);
1087 
1088  IT locsize = distrows.LocArrSize();
1089  for(IT i=0; i<locsize; ++i)
1090  {
1091  IT lrow, lcol;
1092  int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
1093  data[owner].push_back(make_tuple(lrow,lcol,distvals.arr[i]));
1094  }
1095  SparseCommon(data, locsize, total_m, total_n);
1096 }
1097 
1098 
1099 
1100 template <class IT, class NT, class DER>
1101 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
1102  const FullyDistVec<IT,IT> & distcols, const NT & val)
1103 {
1104  if((*(distrows.commGrid) != *(distcols.commGrid)) )
1105  {
1106  SpParHelper::Print("Grids are not comparable, Sparse() fails !");
1107  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1108  }
1109  if((distrows.TotalLength() != distcols.TotalLength()) )
1110  {
1111  SpParHelper::Print("Vectors have different sizes, Sparse() fails !");
1112  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1113  }
1114  commGrid = distrows.commGrid;
1115  int nprocs = commGrid->GetSize();
1116  vector< vector < tuple<IT,IT,NT> > > data(nprocs);
1117 
1118  IT locsize = distrows.LocArrSize();
1119  for(IT i=0; i<locsize; ++i)
1120  {
1121  IT lrow, lcol;
1122  int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
1123  data[owner].push_back(make_tuple(lrow,lcol,val));
1124  }
1125  SparseCommon(data, locsize, total_m, total_n);
1126 }
1127 
1128 template <class IT, class NT, class DER>
1129 template <class DELIT>
1131 {
1132  commGrid = DEL.commGrid;
1133  typedef typename DER::LocalIT LIT;
1134 
1135  int nprocs = commGrid->GetSize();
1136  int r = commGrid->GetGridRows();
1137  int s = commGrid->GetGridCols();
1138  vector< vector<LIT> > data(nprocs); // enties are pre-converted to local indices before getting pushed into "data"
1139 
1140  LIT m_perproc = DEL.getGlobalV() / r;
1141  LIT n_perproc = DEL.getGlobalV() / s;
1142 
1143  if(sizeof(LIT) < sizeof(DELIT))
1144  {
1145  ostringstream outs;
1146  outs << "Warning: Using smaller indices for the matrix than DistEdgeList\n";
1147  outs << "Local matrices are " << m_perproc << "-by-" << n_perproc << endl;
1148  SpParHelper::Print(outs.str());
1149  }
1150 
1151  // to lower memory consumption, form sparse matrix in stages
1152  LIT stages = MEM_EFFICIENT_STAGES;
1153 
1154  // even if local indices (LIT) are 32-bits, we should work with 64-bits for global info
1155  int64_t perstage = DEL.nedges / stages;
1156  LIT totrecv = 0;
1157  vector<LIT> alledges;
1158 
1159  int maxr = r-1;
1160  int maxs = s-1;
1161  for(LIT s=0; s< stages; ++s)
1162  {
1163  int64_t n_befor = s*perstage;
1164  int64_t n_after= ((s==(stages-1))? DEL.nedges : ((s+1)*perstage));
1165 
1166  // clear the source vertex by setting it to -1
1167  int realedges = 0; // these are "local" realedges
1168 
1169  if(DEL.pedges)
1170  {
1171  for (int64_t i = n_befor; i < n_after; i++)
1172  {
1173  int64_t fr = get_v0_from_edge(&(DEL.pedges[i]));
1174  int64_t to = get_v1_from_edge(&(DEL.pedges[i]));
1175 
1176  if(fr >= 0 && to >= 0) // otherwise skip
1177  {
1178  int rowowner = min(static_cast<int>(fr / m_perproc), maxr);
1179  int colowner = min(static_cast<int>(to / n_perproc), maxs);
1180  int owner = commGrid->GetRank(rowowner, colowner);
1181  LIT rowid = fr - (rowowner * m_perproc);
1182  LIT colid = to - (colowner * n_perproc);
1183  data[owner].push_back(rowid); // row_id
1184  data[owner].push_back(colid); // col_id
1185  ++realedges;
1186  }
1187  }
1188  }
1189  else
1190  {
1191  for (int64_t i = n_befor; i < n_after; i++)
1192  {
1193  if(DEL.edges[2*i+0] >= 0 && DEL.edges[2*i+1] >= 0) // otherwise skip
1194  {
1195  int rowowner = min(static_cast<int>(DEL.edges[2*i+0] / m_perproc), maxr);
1196  int colowner = min(static_cast<int>(DEL.edges[2*i+1] / n_perproc), maxs);
1197  int owner = commGrid->GetRank(rowowner, colowner);
1198  LIT rowid = DEL.edges[2*i+0]- (rowowner * m_perproc);
1199  LIT colid = DEL.edges[2*i+1]- (colowner * n_perproc);
1200  data[owner].push_back(rowid);
1201  data[owner].push_back(colid);
1202  ++realedges;
1203  }
1204  }
1205  }
1206 
1207  LIT * sendbuf = new LIT[2*realedges];
1208  int * sendcnt = new int[nprocs];
1209  int * sdispls = new int[nprocs];
1210  for(int i=0; i<nprocs; ++i)
1211  sendcnt[i] = data[i].size();
1212 
1213  int * rdispls = new int[nprocs];
1214  int * recvcnt = new int[nprocs];
1215  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT,commGrid->GetWorld()); // share the counts
1216 
1217  sdispls[0] = 0;
1218  rdispls[0] = 0;
1219  for(int i=0; i<nprocs-1; ++i)
1220  {
1221  sdispls[i+1] = sdispls[i] + sendcnt[i];
1222  rdispls[i+1] = rdispls[i] + recvcnt[i];
1223  }
1224  for(int i=0; i<nprocs; ++i)
1225  copy(data[i].begin(), data[i].end(), sendbuf+sdispls[i]);
1226 
1227  // clear memory
1228  for(int i=0; i<nprocs; ++i)
1229  vector<LIT>().swap(data[i]);
1230 
1231  // ABAB: Total number of edges received might not be LIT-addressible
1232  // However, each edge_id is LIT-addressible
1233  IT thisrecv = accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0)); // thisrecv = 2*locedges
1234  LIT * recvbuf = new LIT[thisrecv];
1235  totrecv += thisrecv;
1236 
1237  MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<LIT>(), recvbuf, recvcnt, rdispls, MPIType<LIT>(), commGrid->GetWorld());
1238  DeleteAll(sendcnt, recvcnt, sdispls, rdispls,sendbuf);
1239  copy (recvbuf,recvbuf+thisrecv,back_inserter(alledges)); // copy to all edges
1240  delete [] recvbuf;
1241  }
1242 
1243  int myprocrow = commGrid->GetRankInProcCol();
1244  int myproccol = commGrid->GetRankInProcRow();
1245  LIT locrows, loccols;
1246  if(myprocrow != r-1) locrows = m_perproc;
1247  else locrows = DEL.getGlobalV() - myprocrow * m_perproc;
1248  if(myproccol != s-1) loccols = n_perproc;
1249  else loccols = DEL.getGlobalV() - myproccol * n_perproc;
1250 
1251  SpTuples<LIT,NT> A(totrecv/2, locrows, loccols, alledges, removeloops); // alledges is empty upon return
1252  spSeq = new DER(A,false); // Convert SpTuples to DER
1253 }
1254 
1255 template <class IT, class NT, class DER>
1257 {
1258  MPI_Comm DiagWorld = commGrid->GetDiagWorld();
1259  IT totrem;
1260  IT removed = 0;
1261  if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
1262  {
1263  SpTuples<IT,NT> tuples(*spSeq);
1264  delete spSeq;
1265  removed = tuples.RemoveLoops();
1266  spSeq = new DER(tuples, false); // Convert to DER
1267  }
1268  MPI_Allreduce( &removed, & totrem, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
1269  return totrem;
1270 }
1271 
1272 template <class IT, class NT, class DER>
1273 template <typename LIT, typename OT>
1275 {
1276  if(spSeq->getnsplit() > 0)
1277  {
1278  SpParHelper::Print("Can not declare preallocated buffers for multithreaded execution");
1279  return;
1280  }
1281 
1282  // Set up communication buffers, one for all
1283  typename DER::LocalIT mA = spSeq->getnrow();
1284  typename DER::LocalIT p_c = commGrid->GetGridCols();
1285  vector<bool> isthere(mA, false); // perhaps the only appropriate use of this crippled data structure
1286  typename DER::LocalIT perproc = mA / p_c;
1287  vector<int> maxlens(p_c,0); // maximum data size to be sent to any neighbor along the processor row
1288 
1289  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
1290  {
1291  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
1292  {
1293  typename DER::LocalIT rowid = nzit.rowid();
1294  if(!isthere[rowid])
1295  {
1296  typename DER::LocalIT owner = min(nzit.rowid() / perproc, p_c-1);
1297  maxlens[owner]++;
1298  isthere[rowid] = true;
1299  }
1300  }
1301  }
1302  SpParHelper::Print("Optimization buffers set\n");
1303  optbuf.Set(maxlens,mA);
1304 }
1305 
1306 template <class IT, class NT, class DER>
1308 {
1309  spSeq->RowSplit(numsplits);
1310 }
1311 
1312 
1317 template <class IT, class NT, class DER>
1318 template <typename SR>
1320 {
1321  int stages, dummy; // last two parameters of productgrid are ignored for synchronous multiplication
1322  shared_ptr<CommGrid> Grid = ProductGrid(commGrid.get(), commGrid.get(), stages, dummy, dummy);
1323 
1324  IT AA_m = spSeq->getnrow();
1325  IT AA_n = spSeq->getncol();
1326 
1327  DER seqTrn = spSeq->TransposeConst(); // will be automatically discarded after going out of scope
1328 
1329  MPI_Barrier(commGrid->GetWorld());
1330 
1331  IT ** NRecvSizes = SpHelper::allocate2D<IT>(DER::esscount, stages);
1332  IT ** TRecvSizes = SpHelper::allocate2D<IT>(DER::esscount, stages);
1333 
1334  SpParHelper::GetSetSizes( *spSeq, NRecvSizes, commGrid->GetRowWorld());
1335  SpParHelper::GetSetSizes( seqTrn, TRecvSizes, commGrid->GetColWorld());
1336 
1337  // Remotely fetched matrices are stored as pointers
1338  DER * NRecv;
1339  DER * TRecv;
1340  vector< SpTuples<IT,NT> *> tomerge;
1341 
1342  int Nself = commGrid->GetRankInProcRow();
1343  int Tself = commGrid->GetRankInProcCol();
1344 
1345  for(int i = 0; i < stages; ++i)
1346  {
1347  vector<IT> ess;
1348  if(i == Nself)
1349  {
1350  NRecv = spSeq; // shallow-copy
1351  }
1352  else
1353  {
1354  ess.resize(DER::esscount);
1355  for(int j=0; j< DER::esscount; ++j)
1356  {
1357  ess[j] = NRecvSizes[j][i]; // essentials of the ith matrix in this row
1358  }
1359  NRecv = new DER(); // first, create the object
1360  }
1361 
1362  SpParHelper::BCastMatrix(Grid->GetRowWorld(), *NRecv, ess, i); // then, broadcast its elements
1363  ess.clear();
1364 
1365  if(i == Tself)
1366  {
1367  TRecv = &seqTrn; // shallow-copy
1368  }
1369  else
1370  {
1371  ess.resize(DER::esscount);
1372  for(int j=0; j< DER::esscount; ++j)
1373  {
1374  ess[j] = TRecvSizes[j][i];
1375  }
1376  TRecv = new DER();
1377  }
1378  SpParHelper::BCastMatrix(Grid->GetColWorld(), *TRecv, ess, i);
1379 
1380  SpTuples<IT,NT> * AA_cont = MultiplyReturnTuples<SR, NT>(*NRecv, *TRecv, false, true);
1381  if(!AA_cont->isZero())
1382  tomerge.push_back(AA_cont);
1383 
1384  if(i != Nself)
1385  {
1386  delete NRecv;
1387  }
1388  if(i != Tself)
1389  {
1390  delete TRecv;
1391  }
1392  }
1393 
1394  SpHelper::deallocate2D(NRecvSizes, DER::esscount);
1395  SpHelper::deallocate2D(TRecvSizes, DER::esscount);
1396 
1397  delete spSeq;
1398  spSeq = new DER(MergeAll<SR>(tomerge, AA_m, AA_n), false); // First get the result in SpTuples, then convert to UDER
1399  for(unsigned int i=0; i<tomerge.size(); ++i)
1400  {
1401  delete tomerge[i];
1402  }
1403 }
1404 
1405 
1406 template <class IT, class NT, class DER>
1408 {
1409  if(commGrid->myproccol == commGrid->myprocrow) // Diagonal
1410  {
1411  spSeq->Transpose();
1412  }
1413  else
1414  {
1415  typedef typename DER::LocalIT LIT;
1416  SpTuples<LIT,NT> Atuples(*spSeq);
1417  LIT locnnz = Atuples.getnnz();
1418  LIT * rows = new LIT[locnnz];
1419  LIT * cols = new LIT[locnnz];
1420  NT * vals = new NT[locnnz];
1421  for(LIT i=0; i < locnnz; ++i)
1422  {
1423  rows[i] = Atuples.colindex(i); // swap (i,j) here
1424  cols[i] = Atuples.rowindex(i);
1425  vals[i] = Atuples.numvalue(i);
1426  }
1427  LIT locm = getlocalcols();
1428  LIT locn = getlocalrows();
1429  delete spSeq;
1430 
1431  LIT remotem, remoten, remotennz;
1432  swap(locm,locn);
1433  int diagneigh = commGrid->GetComplementRank();
1434 
1435  MPI_Status status;
1436  MPI_Sendrecv(&locnnz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, &remotennz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
1437  MPI_Sendrecv(&locn, 1, MPIType<LIT>(), diagneigh, TRTAGM, &remotem, 1, MPIType<LIT>(), diagneigh, TRTAGM, commGrid->GetWorld(), &status);
1438  MPI_Sendrecv(&locm, 1, MPIType<LIT>(), diagneigh, TRTAGN, &remoten, 1, MPIType<LIT>(), diagneigh, TRTAGN, commGrid->GetWorld(), &status);
1439 
1440  LIT * rowsrecv = new LIT[remotennz];
1441  MPI_Sendrecv(rows, locnnz, MPIType<LIT>(), diagneigh, TRTAGROWS, rowsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGROWS, commGrid->GetWorld(), &status);
1442  delete [] rows;
1443 
1444  LIT * colsrecv = new LIT[remotennz];
1445  MPI_Sendrecv(cols, locnnz, MPIType<LIT>(), diagneigh, TRTAGCOLS, colsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGCOLS, commGrid->GetWorld(), &status);
1446  delete [] cols;
1447 
1448  NT * valsrecv = new NT[remotennz];
1449  MPI_Sendrecv(vals, locnnz, MPIType<NT>(), diagneigh, TRTAGVALS, valsrecv, remotennz, MPIType<NT>(), diagneigh, TRTAGVALS, commGrid->GetWorld(), &status);
1450  delete [] vals;
1451 
1452  tuple<LIT,LIT,NT> * arrtuples = new tuple<LIT,LIT,NT>[remotennz];
1453  for(LIT i=0; i< remotennz; ++i)
1454  {
1455  arrtuples[i] = make_tuple(rowsrecv[i], colsrecv[i], valsrecv[i]);
1456  }
1457  DeleteAll(rowsrecv, colsrecv, valsrecv);
1458  ColLexiCompare<LIT,NT> collexicogcmp;
1459  sort(arrtuples , arrtuples+remotennz, collexicogcmp ); // sort w.r.t columns here
1460 
1461  spSeq = new DER();
1462  spSeq->Create( remotennz, remotem, remoten, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
1463  }
1464 }
1465 
1466 
1467 template <class IT, class NT, class DER>
1468 template <class HANDLER>
1469 void SpParMat< IT,NT,DER >::SaveGathered(string filename, HANDLER handler, bool transpose) const
1470 {
1471  int proccols = commGrid->GetGridCols();
1472  int procrows = commGrid->GetGridRows();
1473  IT totalm = getnrow();
1474  IT totaln = getncol();
1475  IT totnnz = getnnz();
1476  int flinelen = 0;
1477  ofstream out;
1478  if(commGrid->GetRank() == 0)
1479  {
1480  std::string s;
1481  std::stringstream strm;
1482  strm << totalm << " " << totaln << " " << totnnz << endl;
1483  s = strm.str();
1484  out.open(filename.c_str(),ios_base::trunc);
1485  flinelen = s.length();
1486  out.write(s.c_str(), flinelen);
1487  out.close();
1488  }
1489  int colrank = commGrid->GetRankInProcCol();
1490  int colneighs = commGrid->GetGridRows();
1491  IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
1492  locnrows[colrank] = (IT) getlocalrows();
1493  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
1494  IT roffset = accumulate(locnrows, locnrows+colrank, 0);
1495  delete [] locnrows;
1496 
1497  MPI_Datatype datatype;
1498  MPI_Type_contiguous(sizeof(pair<IT,NT>), MPI_CHAR, &datatype);
1499  MPI_Type_commit(&datatype);
1500 
1501  for(int i = 0; i < procrows; i++) // for all processor row (in order)
1502  {
1503  if(commGrid->GetRankInProcCol() == i) // only the ith processor row
1504  {
1505  IT localrows = spSeq->getnrow(); // same along the processor row
1506  vector< vector< pair<IT,NT> > > csr(localrows);
1507  if(commGrid->GetRankInProcRow() == 0) // get the head of processor row
1508  {
1509  IT localcols = spSeq->getncol(); // might be different on the last processor on this processor row
1510  MPI_Bcast(&localcols, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
1511  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
1512  {
1513  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
1514  {
1515  csr[nzit.rowid()].push_back( make_pair(colit.colid(), nzit.value()) );
1516  }
1517  }
1518  }
1519  else // get the rest of the processors
1520  {
1521  IT n_perproc;
1522  MPI_Bcast(&n_perproc, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
1523  IT noffset = commGrid->GetRankInProcRow() * n_perproc;
1524  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
1525  {
1526  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
1527  {
1528  csr[nzit.rowid()].push_back( make_pair(colit.colid() + noffset, nzit.value()) );
1529  }
1530  }
1531  }
1532  pair<IT,NT> * ents = NULL;
1533  int * gsizes = NULL, * dpls = NULL;
1534  if(commGrid->GetRankInProcRow() == 0) // only the head of processor row
1535  {
1536  out.open(filename.c_str(),std::ios_base::app);
1537  gsizes = new int[proccols];
1538  dpls = new int[proccols](); // displacements (zero initialized pid)
1539  }
1540  for(int j = 0; j < localrows; ++j)
1541  {
1542  IT rowcnt = 0;
1543  sort(csr[j].begin(), csr[j].end());
1544  int mysize = csr[j].size();
1545  MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, 0, commGrid->GetRowWorld());
1546  if(commGrid->GetRankInProcRow() == 0)
1547  {
1548  rowcnt = std::accumulate(gsizes, gsizes+proccols, static_cast<IT>(0));
1549  std::partial_sum(gsizes, gsizes+proccols-1, dpls+1);
1550  ents = new pair<IT,NT>[rowcnt]; // nonzero entries in the j'th local row
1551  }
1552 
1553  // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype,
1554  // void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
1555  MPI_Gatherv(SpHelper::p2a(csr[j]), mysize, datatype, ents, gsizes, dpls, datatype, 0, commGrid->GetRowWorld());
1556  if(commGrid->GetRankInProcRow() == 0)
1557  {
1558  for(int k=0; k< rowcnt; ++k)
1559  {
1560  //out << j + roffset + 1 << "\t" << ents[k].first + 1 <<"\t" << ents[k].second << endl;
1561  if (!transpose)
1562  // regular
1563  out << j + roffset + 1 << "\t" << ents[k].first + 1 << "\t";
1564  else
1565  // transpose row/column
1566  out << ents[k].first + 1 << "\t" << j + roffset + 1 << "\t";
1567  handler.save(out, ents[k].second, j + roffset, ents[k].first);
1568  out << endl;
1569  }
1570  delete [] ents;
1571  }
1572  }
1573  if(commGrid->GetRankInProcRow() == 0)
1574  {
1575  DeleteAll(gsizes, dpls);
1576  out.close();
1577  }
1578  } // end_if the ith processor row
1579  MPI_Barrier(commGrid->GetWorld()); // signal the end of ith processor row iteration (so that all processors block)
1580  }
1581 }
1582 
1583 
1584 // Prints in the following format suitable for I/O with PaToH
1585 // 1st line: <index_base(0 or 1)> <|V|> <|N|> <pins>
1586 // For row-wise sparse matrix partitioning (our case): 1 <num_rows> <num_cols> <nnz>
1587 // For the rest of the file, (i+1)th line is a list of vertices that are members of the ith net
1588 // Treats the matrix as binary (for now)
1589 template <class IT, class NT, class DER>
1590 void SpParMat< IT,NT,DER >::PrintForPatoh(string filename) const
1591 {
1592  int proccols = commGrid->GetGridCols();
1593  int procrows = commGrid->GetGridRows();
1594  IT * gsizes;
1595 
1596  // collective calls
1597  IT totalm = getnrow();
1598  IT totaln = getncol();
1599  IT totnnz = getnnz();
1600  int flinelen = 0;
1601  if(commGrid->GetRank() == 0)
1602  {
1603  std::string s;
1604  std::stringstream strm;
1605  strm << 0 << " " << totalm << " " << totaln << " " << totnnz << endl;
1606  s = strm.str();
1607  std::ofstream out(filename.c_str(),std::ios_base::app);
1608  flinelen = s.length();
1609  out.write(s.c_str(), flinelen);
1610  out.close();
1611  }
1612 
1613  IT nzc = 0; // nonempty column counts per processor column
1614  for(int i = 0; i < proccols; i++) // for all processor columns (in order)
1615  {
1616  if(commGrid->GetRankInProcRow() == i) // only the ith processor column
1617  {
1618  if(commGrid->GetRankInProcCol() == 0) // get the head of column
1619  {
1620  std::ofstream out(filename.c_str(),std::ios_base::app);
1621  std::ostream_iterator<IT> dest(out, " ");
1622 
1623  gsizes = new IT[procrows];
1624  IT localrows = spSeq->getnrow();
1625  MPI_Bcast(&localrows, 1, MPIType<IT>(), 0, commGrid->GetColWorld());
1626  IT netid = 0;
1627  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
1628  {
1629  IT mysize;
1630  IT * vertices;
1631  while(netid <= colit.colid())
1632  {
1633  if(netid < colit.colid()) // empty subcolumns
1634  {
1635  mysize = 0;
1636  }
1637  else
1638  {
1639  mysize = colit.nnz();
1640  vertices = new IT[mysize];
1641  IT j = 0;
1642  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
1643  {
1644  vertices[j] = nzit.rowid();
1645  ++j;
1646  }
1647  }
1648 
1649  MPI_Gather(&mysize, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetColWorld());
1650  IT colcnt = std::accumulate(gsizes, gsizes+procrows, 0);
1651  IT * ents = new IT[colcnt]; // nonzero entries in the netid'th column
1652  IT * dpls = new IT[procrows](); // displacements (zero initialized pid)
1653  std::partial_sum(gsizes, gsizes+procrows-1, dpls+1);
1654 
1655  // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype,
1656  // void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
1657  MPI_Gatherv(vertices, mysize, MPIType<IT>(), ents, gsizes, dpls, MPIType<IT>(), 0, commGrid->GetColWorld());
1658  if(colcnt != 0)
1659  {
1660  std::copy(ents, ents+colcnt, dest);
1661  out << endl;
1662  ++nzc;
1663  }
1664  delete [] ents; delete [] dpls;
1665 
1666  if(netid == colit.colid()) delete [] vertices;
1667  ++netid;
1668  }
1669  }
1670  while(netid < spSeq->getncol())
1671  {
1672  IT mysize = 0;
1673  MPI_Gather(&mysize, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetColWorld());
1674  IT colcnt = std::accumulate(gsizes, gsizes+procrows, 0);
1675  IT * ents = new IT[colcnt]; // nonzero entries in the netid'th column
1676  IT * dpls = new IT[procrows](); // displacements (zero initialized pid)
1677  std::partial_sum(gsizes, gsizes+procrows-1, dpls+1);
1678 
1679  MPI_Gatherv(NULL, mysize, MPIType<IT>(), ents, gsizes, dpls, MPIType<IT>(), 0, commGrid->GetColWorld());
1680  if(colcnt != 0)
1681  {
1682  std::copy(ents, ents+colcnt, dest);
1683  out << endl;
1684  ++nzc;
1685  }
1686  delete [] ents; delete [] dpls;
1687  ++netid;
1688  }
1689  delete [] gsizes;
1690  out.close();
1691  }
1692  else // get the rest of the processors
1693  {
1694  IT m_perproc;
1695  MPI_Bcast(&m_perproc, 1, MPIType<IT>(), 0, commGrid->GetColWorld());
1696  IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1697  IT netid = 0;
1698  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1699  {
1700  IT mysize;
1701  IT * vertices;
1702  while(netid <= colit.colid())
1703  {
1704  if(netid < colit.colid()) // empty subcolumns
1705  {
1706  mysize = 0;
1707  }
1708  else
1709  {
1710  mysize = colit.nnz();
1711  vertices = new IT[mysize];
1712  IT j = 0;
1713  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
1714  {
1715  vertices[j] = nzit.rowid() + moffset;
1716  ++j;
1717  }
1718  }
1719  MPI_Gather(&mysize, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetColWorld());
1720 
1721  // rbuf, rcount, displs, rtype are only significant at the root
1722  MPI_Gatherv(vertices, mysize, MPIType<IT>(), NULL, NULL, NULL, MPIType<IT>(), 0, commGrid->GetColWorld());
1723 
1724  if(netid == colit.colid()) delete [] vertices;
1725  ++netid;
1726  }
1727  }
1728  while(netid < spSeq->getncol())
1729  {
1730  IT mysize = 0;
1731  MPI_Gather(&mysize, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetColWorld());
1732  MPI_Gatherv(NULL, mysize, MPIType<IT>(), NULL, NULL, NULL, MPIType<IT>(), 0, commGrid->GetColWorld());
1733  ++netid;
1734  }
1735  }
1736  } // end_if the ith processor column
1737 
1738  MPI_Barrier(commGrid->GetWorld()); // signal the end of ith processor column iteration (so that all processors block)
1739  if((i == proccols-1) && (commGrid->GetRankInProcCol() == 0)) // if that was the last iteration and we are the column heads
1740  {
1741  IT totalnzc = 0;
1742  MPI_Reduce(&nzc, &totalnzc, 1, MPIType<IT>(), MPI_SUM, 0, commGrid->GetRowWorld());
1743 
1744  if(commGrid->GetRank() == 0) // I am the master, hence I'll change the first line
1745  {
1746  // Don't just open with std::ios_base::app here
1747  // The std::ios::app flag causes fstream to call seekp(0, ios::end); at the start of every operator<<() inserter
1748  std::ofstream out(filename.c_str(),std::ios_base::in | std::ios_base::out | std::ios_base::binary);
1749  out.seekp(0, std::ios_base::beg);
1750 
1751  string s;
1752  std::stringstream strm;
1753  strm << 0 << " " << totalm << " " << totalnzc << " " << totnnz;
1754  s = strm.str();
1755  s.resize(flinelen-1, ' '); // in case totalnzc has fewer digits than totaln
1756 
1757  out.write(s.c_str(), flinelen-1);
1758  out.close();
1759  }
1760  }
1761  } // end_for all processor columns
1762 }
1763 
1767 template <class IT, class NT, class DER>
1768 template <class HANDLER>
1769 void SpParMat< IT,NT,DER >::ReadDistribute (const string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
1770 {
1771 #ifdef TAU_PROFILE
1772  TAU_PROFILE_TIMER(rdtimer, "ReadDistribute", "void SpParMat::ReadDistribute (const string & , int, bool, HANDLER, bool)", TAU_DEFAULT);
1773  TAU_PROFILE_START(rdtimer);
1774 #endif
1775 
1776  ifstream infile;
1777  FILE * binfile; // points to "past header" if the file is binary
1778  int seeklength = 0;
1779  HeaderInfo hfile;
1780  if(commGrid->GetRank() == master) // 1 processor
1781  {
1782  hfile = ParseHeader(filename, binfile, seeklength);
1783  }
1784  MPI_Bcast(&seeklength, 1, MPI_INT, master, commGrid->commWorld);
1785 
1786  IT total_m, total_n, total_nnz;
1787  IT m_perproc = 0, n_perproc = 0;
1788 
1789  int colneighs = commGrid->GetGridRows(); // number of neighbors along this processor column (including oneself)
1790  int rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
1791 
1792  IT buffpercolneigh = MEMORYINBYTES / (colneighs * (2 * sizeof(IT) + sizeof(NT)));
1793  IT buffperrowneigh = MEMORYINBYTES / (rowneighs * (2 * sizeof(IT) + sizeof(NT)));
1794  if(pario)
1795  {
1796  // since all colneighs will be reading the data at the same time
1797  // chances are they might all read the data that should go to one
1798  // in that case buffperrowneigh > colneighs * buffpercolneigh
1799  // in order not to overflow
1800  buffpercolneigh /= colneighs;
1801  if(seeklength == 0)
1802  SpParHelper::Print("COMBBLAS: Parallel I/O requested but binary header is corrupted\n");
1803  }
1804 
1805  // make sure that buffperrowneigh >= buffpercolneigh to cover for this patological case:
1806  // -- all data received by a given column head (by vertical communication) are headed to a single processor along the row
1807  // -- then making sure buffperrowneigh >= buffpercolneigh guarantees that the horizontal buffer will never overflow
1808  buffperrowneigh = std::max(buffperrowneigh, buffpercolneigh);
1809  if(std::max(buffpercolneigh * colneighs, buffperrowneigh * rowneighs) > numeric_limits<int>::max())
1810  {
1811  SpParHelper::Print("COMBBLAS: MPI doesn't support sending int64_t send/recv counts or displacements\n");
1812  }
1813 
1814  int * cdispls = new int[colneighs];
1815  for (IT i=0; i<colneighs; ++i) cdispls[i] = i*buffpercolneigh;
1816  int * rdispls = new int[rowneighs];
1817  for (IT i=0; i<rowneighs; ++i) rdispls[i] = i*buffperrowneigh;
1818 
1819  int *ccurptrs = NULL, *rcurptrs = NULL;
1820  int recvcount = 0;
1821  IT * rows = NULL;
1822  IT * cols = NULL;
1823  NT * vals = NULL;
1824 
1825  // Note: all other column heads that initiate the horizontal communication has the same "rankinrow" with the master
1826  int rankincol = commGrid->GetRankInProcCol(master); // get master's rank in its processor column
1827  int rankinrow = commGrid->GetRankInProcRow(master);
1828  vector< tuple<IT, IT, NT> > localtuples;
1829 
1830  if(commGrid->GetRank() == master) // 1 processor
1831  {
1832  if( !hfile.fileexists )
1833  {
1834  SpParHelper::Print( "COMBBLAS: Input file doesn't exist\n");
1835  total_n = 0; total_m = 0;
1836  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
1837  return;
1838  }
1839  if (hfile.headerexists && hfile.format == 1)
1840  {
1841  SpParHelper::Print("COMBBLAS: Ascii input with binary headers is not supported");
1842  total_n = 0; total_m = 0;
1843  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
1844  return;
1845  }
1846  if ( !hfile.headerexists ) // no header - ascii file (at this point, file exists)
1847  {
1848  infile.open(filename.c_str());
1849  char comment[256];
1850  infile.getline(comment,256);
1851  while(comment[0] == '%')
1852  {
1853  infile.getline(comment,256);
1854  }
1855  stringstream ss;
1856  ss << string(comment);
1857  ss >> total_m >> total_n >> total_nnz;
1858  if(pario)
1859  {
1860  SpParHelper::Print("COMBBLAS: Trying to read binary headerless file in parallel, aborting\n");
1861  total_n = 0; total_m = 0;
1862  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
1863  return;
1864  }
1865  }
1866  else // hfile.headerexists && hfile.format == 0
1867  {
1868  total_m = hfile.m;
1869  total_n = hfile.n;
1870  total_nnz = hfile.nnz;
1871  }
1872  m_perproc = total_m / colneighs;
1873  n_perproc = total_n / rowneighs;
1874  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
1875  AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
1876 
1877  if(seeklength > 0 && pario) // sqrt(p) processors also do parallel binary i/o
1878  {
1879  IT entriestoread = total_nnz / colneighs;
1880  #ifdef IODEBUG
1881  ofstream oput;
1882  commGrid->OpenDebugFile("Read", oput);
1883  oput << "Total nnz: " << total_nnz << " entries to read: " << entriestoread << endl;
1884  oput.close();
1885  #endif
1886  ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
1887  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
1888  }
1889  else // only this (master) is doing I/O (text or binary)
1890  {
1891  IT temprow, tempcol;
1892  NT tempval;
1893  IT ntrow = 0, ntcol = 0; // not transposed row and column index
1894  char line[1024];
1895  bool nonumline = nonum;
1896  IT cnz = 0;
1897  for(; cnz < total_nnz; ++cnz)
1898  {
1899  int colrec;
1900  size_t commonindex;
1901  stringstream linestream;
1902  if( (!hfile.headerexists) && (!infile.eof()))
1903  {
1904  // read one line at a time so that missing numerical values can be detected
1905  infile.getline(line, 1024);
1906  linestream << line;
1907  linestream >> temprow >> tempcol;
1908  if (!nonum)
1909  {
1910  // see if this line has a value
1911  linestream >> skipws;
1912  nonumline = linestream.eof();
1913  }
1914  --temprow; // file is 1-based where C-arrays are 0-based
1915  --tempcol;
1916  ntrow = temprow;
1917  ntcol = tempcol;
1918  }
1919  else if(hfile.headerexists && (!feof(binfile)) )
1920  {
1921  handler.binaryfill(binfile, temprow , tempcol, tempval);
1922  }
1923  if (transpose)
1924  {
1925  IT swap = temprow;
1926  temprow = tempcol;
1927  tempcol = swap;
1928  }
1929  colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
1930  commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
1931 
1932  rows[ commonindex ] = temprow;
1933  cols[ commonindex ] = tempcol;
1934  if( (!hfile.headerexists) && (!infile.eof()))
1935  {
1936  vals[ commonindex ] = nonumline ? handler.getNoNum(ntrow, ntcol) : handler.read(linestream, ntrow, ntcol); //tempval;
1937  }
1938  else if(hfile.headerexists && (!feof(binfile)) )
1939  {
1940  vals[ commonindex ] = tempval;
1941  }
1942  ++ (ccurptrs[colrec]);
1943  if(ccurptrs[colrec] == buffpercolneigh || (cnz == (total_nnz-1)) ) // one buffer is full, or file is done !
1944  {
1945  MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first, send the receive counts
1946 
1947  // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
1948  IT * temprows = new IT[recvcount];
1949  IT * tempcols = new IT[recvcount];
1950  NT * tempvals = new NT[recvcount];
1951 
1952  // then, send all buffers that to their recipients ...
1953  MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
1954  MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
1955  MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
1956 
1957  fill_n(ccurptrs, colneighs, 0); // finally, reset current pointers !
1958  DeleteAll(rows, cols, vals);
1959 
1960  HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
1961  buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
1962 
1963  if( cnz != (total_nnz-1) ) // otherwise the loop will exit with noone to claim memory back
1964  {
1965  // reuse these buffers for the next vertical communication
1966  rows = new IT [ buffpercolneigh * colneighs ];
1967  cols = new IT [ buffpercolneigh * colneighs ];
1968  vals = new NT [ buffpercolneigh * colneighs ];
1969  }
1970  } // end_if for "send buffer is full" case
1971  } // end_for for "cnz < entriestoread" case
1972  assert (cnz == total_nnz);
1973 
1974  // Signal the end of file to other processors along the column
1975  fill_n(ccurptrs, colneighs, numeric_limits<int>::max());
1976  MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
1977 
1978  // And along the row ...
1979  fill_n(rcurptrs, rowneighs, numeric_limits<int>::max());
1980  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
1981  } // end of "else" (only one processor reads) block
1982  } // end_if for "master processor" case
1983  else if( commGrid->OnSameProcCol(master) ) // (r-1) processors
1984  {
1985  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
1986  m_perproc = total_m / colneighs;
1987  n_perproc = total_n / rowneighs;
1988 
1989  if(seeklength > 0 && pario) // these processors also do parallel binary i/o
1990  {
1991  binfile = fopen(filename.c_str(), "rb");
1992  IT entrysize = handler.entrylength();
1993  int myrankincol = commGrid->GetRankInProcCol();
1994  IT perreader = total_nnz / colneighs;
1995  IT read_offset = entrysize * static_cast<IT>(myrankincol) * perreader + seeklength;
1996  IT entriestoread = perreader;
1997  if (myrankincol == colneighs-1)
1998  entriestoread = total_nnz - static_cast<IT>(myrankincol) * perreader;
1999  fseek(binfile, read_offset, SEEK_SET);
2000 
2001  #ifdef IODEBUG
2002  ofstream oput;
2003  commGrid->OpenDebugFile("Read", oput);
2004  oput << "Total nnz: " << total_nnz << " OFFSET : " << read_offset << " entries to read: " << entriestoread << endl;
2005  oput.close();
2006  #endif
2007 
2008  AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
2009  ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
2010  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
2011  }
2012  else // only master does the I/O
2013  {
2014  while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
2015  {
2016  // void MPI::Comm::Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], const MPI::Datatype& sendtype,
2017  // void* recvbuf, int recvcount, const MPI::Datatype & recvtype, int root) const
2018  // The outcome is as if the root executed n send operations,
2019  // MPI_Send(sendbuf + displs[i] * extent(sendtype), sendcounts[i], sendtype, i, ...)
2020  // and each process executed a receive,
2021  // MPI_Recv(recvbuf, recvcount, recvtype, root, ...)
2022  // The send buffer is ignored for all nonroot processes.
2023 
2024  MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first receive the receive counts ...
2025  if( recvcount == numeric_limits<int>::max()) break;
2026 
2027  // create space for incoming data ...
2028  IT * temprows = new IT[recvcount];
2029  IT * tempcols = new IT[recvcount];
2030  NT * tempvals = new NT[recvcount];
2031 
2032  // receive actual data ... (first 4 arguments are ignored in the receiver side)
2033  MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
2034  MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
2035  MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
2036 
2037  // now, send the data along the horizontal
2038  rcurptrs = new int[rowneighs];
2039  fill_n(rcurptrs, rowneighs, 0);
2040 
2041  // HorizontalSend frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
2042  HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
2043  buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
2044  }
2045  }
2046 
2047  // Signal the end of file to other processors along the row
2048  fill_n(rcurptrs, rowneighs, numeric_limits<int>::max());
2049  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
2050  delete [] rcurptrs;
2051  }
2052  else // r * (s-1) processors that only participate in the horizontal communication step
2053  {
2054  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
2055  m_perproc = total_m / colneighs;
2056  n_perproc = total_n / rowneighs;
2057  while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
2058  {
2059  // receive the receive count
2060  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
2061  if( recvcount == numeric_limits<int>::max())
2062  break;
2063 
2064  // create space for incoming data ...
2065  IT * temprows = new IT[recvcount];
2066  IT * tempcols = new IT[recvcount];
2067  NT * tempvals = new NT[recvcount];
2068 
2069  MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
2070  MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
2071  MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
2072 
2073  // now push what is ours to tuples
2074  IT moffset = commGrid->myprocrow * m_perproc;
2075  IT noffset = commGrid->myproccol * n_perproc;
2076 
2077  for(IT i=0; i< recvcount; ++i)
2078  {
2079  localtuples.push_back( make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
2080  }
2081  DeleteAll(temprows,tempcols,tempvals);
2082  }
2083  }
2084  DeleteAll(cdispls, rdispls);
2085  tuple<IT,IT,NT> * arrtuples = new tuple<IT,IT,NT>[localtuples.size()]; // the vector will go out of scope, make it stick !
2086  copy(localtuples.begin(), localtuples.end(), arrtuples);
2087 
2088  IT localm = (commGrid->myprocrow != (commGrid->grrows-1))? m_perproc: (total_m - (m_perproc * (commGrid->grrows-1)));
2089  IT localn = (commGrid->myproccol != (commGrid->grcols-1))? n_perproc: (total_n - (n_perproc * (commGrid->grcols-1)));
2090  spSeq->Create( localtuples.size(), localm, localn, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
2091 
2092 #ifdef TAU_PROFILE
2093  TAU_PROFILE_STOP(rdtimer);
2094 #endif
2095  return;
2096 }
2097 
2098 template <class IT, class NT, class DER>
2099 void SpParMat<IT,NT,DER>::AllocateSetBuffers(IT * & rows, IT * & cols, NT * & vals, int * & rcurptrs, int * & ccurptrs, int rowneighs, int colneighs, IT buffpercolneigh)
2100 {
2101  // allocate buffers on the heap as stack space is usually limited
2102  rows = new IT [ buffpercolneigh * colneighs ];
2103  cols = new IT [ buffpercolneigh * colneighs ];
2104  vals = new NT [ buffpercolneigh * colneighs ];
2105 
2106  ccurptrs = new int[colneighs];
2107  rcurptrs = new int[rowneighs];
2108  fill_n(ccurptrs, colneighs, 0); // fill with zero
2109  fill_n(rcurptrs, rowneighs, 0);
2110 }
2111 
2112 template <class IT, class NT, class DER>
2113 void SpParMat<IT,NT,DER>::BcastEssentials(MPI_Comm & world, IT & total_m, IT & total_n, IT & total_nnz, int master)
2114 {
2115  MPI_Bcast(&total_m, 1, MPIType<IT>(), master, world);
2116  MPI_Bcast(&total_n, 1, MPIType<IT>(), master, world);
2117  MPI_Bcast(&total_nnz, 1, MPIType<IT>(), master, world);
2118 }
2119 
2120 /*
2121  * @post {rows, cols, vals are pre-allocated on the heap after this call}
2122  * @post {ccurptrs are set to zero; so that if another call is made to this function without modifying ccurptrs, no data will be send from this procesor}
2123  */
2124 template <class IT, class NT, class DER>
2125 void SpParMat<IT,NT,DER>::VerticalSend(IT * & rows, IT * & cols, NT * & vals, vector< tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
2126  IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, int rankinrow)
2127 {
2128  // first, send/recv the counts ...
2129  int * colrecvdispls = new int[colneighs];
2130  int * colrecvcounts = new int[colneighs];
2131  MPI_Alltoall(ccurptrs, 1, MPI_INT, colrecvcounts, 1, MPI_INT, commGrid->colWorld); // share the request counts
2132  int totrecv = accumulate(colrecvcounts,colrecvcounts+colneighs,0);
2133  colrecvdispls[0] = 0; // receive displacements are exact whereas send displacements have slack
2134  for(int i=0; i<colneighs-1; ++i)
2135  colrecvdispls[i+1] = colrecvdispls[i] + colrecvcounts[i];
2136 
2137  // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
2138  IT * temprows = new IT[totrecv];
2139  IT * tempcols = new IT[totrecv];
2140  NT * tempvals = new NT[totrecv];
2141 
2142  // then, exchange all buffers that to their recipients ...
2143  MPI_Alltoallv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
2144  MPI_Alltoallv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
2145  MPI_Alltoallv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, colrecvcounts, colrecvdispls, MPIType<NT>(), commGrid->colWorld);
2146 
2147  // finally, reset current pointers !
2148  fill_n(ccurptrs, colneighs, 0);
2149  DeleteAll(colrecvdispls, colrecvcounts);
2150  DeleteAll(rows, cols, vals);
2151 
2152  // rcurptrs/rdispls are zero initialized scratch space
2153  HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls, buffperrowneigh, rowneighs, totrecv, m_perproc, n_perproc, rankinrow);
2154 
2155  // reuse these buffers for the next vertical communication
2156  rows = new IT [ buffpercolneigh * colneighs ];
2157  cols = new IT [ buffpercolneigh * colneighs ];
2158  vals = new NT [ buffpercolneigh * colneighs ];
2159 }
2160 
2161 
2168 template <class IT, class NT, class DER>
2169 template <class HANDLER>
2170 void SpParMat<IT,NT,DER>::ReadAllMine(FILE * binfile, IT * & rows, IT * & cols, NT * & vals, vector< tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
2171  IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, IT entriestoread, HANDLER handler, int rankinrow, bool transpose)
2172 {
2173  assert(entriestoread != 0);
2174  IT cnz = 0;
2175  IT temprow, tempcol;
2176  NT tempval;
2177  int finishedglobal = 1;
2178  while(cnz < entriestoread && !feof(binfile)) // this loop will execute at least once
2179  {
2180  handler.binaryfill(binfile, temprow , tempcol, tempval);
2181  if (transpose)
2182  {
2183  IT swap = temprow;
2184  temprow = tempcol;
2185  tempcol = swap;
2186  }
2187  int colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
2188  size_t commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
2189  rows[ commonindex ] = temprow;
2190  cols[ commonindex ] = tempcol;
2191  vals[ commonindex ] = tempval;
2192  ++ (ccurptrs[colrec]);
2193  if(ccurptrs[colrec] == buffpercolneigh || (cnz == (entriestoread-1)) ) // one buffer is full, or this processor's share is done !
2194  {
2195  #ifdef IODEBUG
2196  ofstream oput;
2197  commGrid->OpenDebugFile("Read", oput);
2198  oput << "To column neighbors: ";
2199  copy(ccurptrs, ccurptrs+colneighs, ostream_iterator<int>(oput, " ")); oput << endl;
2200  oput.close();
2201  #endif
2202 
2203  VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
2204  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
2205 
2206  if(cnz == (entriestoread-1)) // last execution of the outer loop
2207  {
2208  int finishedlocal = 1; // I am done, but let me check others
2209  MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
2210  while(!finishedglobal)
2211  {
2212  #ifdef DEBUG
2213  ofstream oput;
2214  commGrid->OpenDebugFile("Read", oput);
2215  oput << "To column neighbors: ";
2216  copy(ccurptrs, ccurptrs+colneighs, ostream_iterator<int>(oput, " ")); oput << endl;
2217  oput.close();
2218  #endif
2219 
2220  // postcondition of VerticalSend: ccurptrs are set to zero
2221  // if another call is made to this function without modifying ccurptrs, no data will be send from this procesor
2222  VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
2223  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
2224 
2225  MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
2226  }
2227  }
2228  else // the other loop will continue executing
2229  {
2230  int finishedlocal = 0;
2231  MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
2232  }
2233  } // end_if for "send buffer is full" case
2234  ++cnz;
2235  }
2236 
2237  // signal the end to row neighbors
2238  fill_n(rcurptrs, rowneighs, numeric_limits<int>::max());
2239  int recvcount;
2240  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
2241 }
2242 
2243 
2250 template <class IT, class NT, class DER>
2251 void SpParMat<IT,NT,DER>::HorizontalSend(IT * & rows, IT * & cols, NT * & vals, IT * & temprows, IT * & tempcols, NT * & tempvals, vector < tuple <IT,IT,NT> > & localtuples,
2252  int * rcurptrs, int * rdispls, IT buffperrowneigh, int rowneighs, int recvcount, IT m_perproc, IT n_perproc, int rankinrow)
2253 {
2254  rows = new IT [ buffperrowneigh * rowneighs ];
2255  cols = new IT [ buffperrowneigh * rowneighs ];
2256  vals = new NT [ buffperrowneigh * rowneighs ];
2257 
2258  // prepare to send the data along the horizontal
2259  for(int i=0; i< recvcount; ++i)
2260  {
2261  int rowrec = std::min(static_cast<int>(tempcols[i] / n_perproc), rowneighs-1);
2262  rows[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = temprows[i];
2263  cols[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempcols[i];
2264  vals[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempvals[i];
2265  ++ (rcurptrs[rowrec]);
2266  }
2267 
2268  #ifdef IODEBUG
2269  ofstream oput;
2270  commGrid->OpenDebugFile("Read", oput);
2271  oput << "To row neighbors: ";
2272  copy(rcurptrs, rcurptrs+rowneighs, ostream_iterator<int>(oput, " ")); oput << endl;
2273  oput << "Row displacements were: ";
2274  copy(rdispls, rdispls+rowneighs, ostream_iterator<int>(oput, " ")); oput << endl;
2275  oput.close();
2276  #endif
2277 
2278  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld); // Send the receive counts for horizontal communication
2279 
2280  // the data is now stored in rows/cols/vals, can reset temporaries
2281  // sets size and capacity to new recvcount
2282  DeleteAll(temprows, tempcols, tempvals);
2283  temprows = new IT[recvcount];
2284  tempcols = new IT[recvcount];
2285  tempvals = new NT[recvcount];
2286 
2287  // then, send all buffers that to their recipients ...
2288  MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
2289  MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
2290  MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
2291 
2292  // now push what is ours to tuples
2293  IT moffset = commGrid->myprocrow * m_perproc;
2294  IT noffset = commGrid->myproccol * n_perproc;
2295 
2296  for(int i=0; i< recvcount; ++i)
2297  {
2298  localtuples.push_back( make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
2299  }
2300 
2301  fill_n(rcurptrs, rowneighs, 0);
2302  DeleteAll(rows, cols, vals, temprows, tempcols, tempvals);
2303 }
2304 
2305 
2308 template <class IT, class NT, class DER>
2310 {
2311  if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
2312  {
2313  SpParHelper::Print("Grids are not comparable, Find() fails !");
2314  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2315  }
2316  IT globallen = getnnz();
2317  SpTuples<IT,NT> Atuples(*spSeq);
2318 
2319  FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
2320  FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
2321  FullyDistVec<IT,NT> nvals ( distvals.commGrid, globallen, NT());
2322 
2323  IT prelen = Atuples.getnnz();
2324  //IT postlen = nrows.MyLocLength();
2325 
2326  int rank = commGrid->GetRank();
2327  int nprocs = commGrid->GetSize();
2328  IT * prelens = new IT[nprocs];
2329  prelens[rank] = prelen;
2330  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
2331  IT prelenuntil = accumulate(prelens, prelens+rank, static_cast<IT>(0));
2332 
2333  int * sendcnt = new int[nprocs](); // zero initialize
2334  IT * rows = new IT[prelen];
2335  IT * cols = new IT[prelen];
2336  NT * vals = new NT[prelen];
2337 
2338  int rowrank = commGrid->GetRankInProcRow();
2339  int colrank = commGrid->GetRankInProcCol();
2340  int rowneighs = commGrid->GetGridCols();
2341  int colneighs = commGrid->GetGridRows();
2342  IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
2343  IT * locncols = new IT[rowneighs];
2344  locnrows[colrank] = getlocalrows();
2345  locncols[rowrank] = getlocalcols();
2346 
2347  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
2348  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
2349 
2350  IT roffset = accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
2351  IT coffset = accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
2352 
2353  DeleteAll(locnrows, locncols);
2354  for(int i=0; i< prelen; ++i)
2355  {
2356  IT locid; // ignore local id, data will come in order
2357  int owner = nrows.Owner(prelenuntil+i, locid);
2358  sendcnt[owner]++;
2359 
2360  rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
2361  cols[i] = Atuples.colindex(i) + coffset; // need the global col index
2362  vals[i] = Atuples.numvalue(i);
2363  }
2364 
2365  int * recvcnt = new int[nprocs];
2366  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
2367 
2368  int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
2369  int * rdpls = new int[nprocs]();
2370  partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
2371  partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
2372 
2373  MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
2374  MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
2375  MPI_Alltoallv(vals, sendcnt, sdpls, MPIType<NT>(), SpHelper::p2a(nvals.arr), recvcnt, rdpls, MPIType<NT>(), commGrid->GetWorld());
2376 
2377  DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
2378  DeleteAll(prelens, rows, cols, vals);
2379  distrows = nrows;
2380  distcols = ncols;
2381  distvals = nvals;
2382 }
2383 
2386 template <class IT, class NT, class DER>
2388 {
2389  if((*(distrows.commGrid) != *(distcols.commGrid)) )
2390  {
2391  SpParHelper::Print("Grids are not comparable, Find() fails !");
2392  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2393  }
2394  IT globallen = getnnz();
2395  SpTuples<IT,NT> Atuples(*spSeq);
2396 
2397  FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
2398  FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
2399 
2400  IT prelen = Atuples.getnnz();
2401 
2402  int rank = commGrid->GetRank();
2403  int nprocs = commGrid->GetSize();
2404  IT * prelens = new IT[nprocs];
2405  prelens[rank] = prelen;
2406  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
2407  IT prelenuntil = accumulate(prelens, prelens+rank, static_cast<IT>(0));
2408 
2409  int * sendcnt = new int[nprocs](); // zero initialize
2410  IT * rows = new IT[prelen];
2411  IT * cols = new IT[prelen];
2412  NT * vals = new NT[prelen];
2413 
2414  int rowrank = commGrid->GetRankInProcRow();
2415  int colrank = commGrid->GetRankInProcCol();
2416  int rowneighs = commGrid->GetGridCols();
2417  int colneighs = commGrid->GetGridRows();
2418  IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
2419  IT * locncols = new IT[rowneighs];
2420  locnrows[colrank] = getlocalrows();
2421  locncols[rowrank] = getlocalcols();
2422 
2423  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
2424  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetColWorld());
2425  IT roffset = accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
2426  IT coffset = accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
2427 
2428  DeleteAll(locnrows, locncols);
2429  for(int i=0; i< prelen; ++i)
2430  {
2431  IT locid; // ignore local id, data will come in order
2432  int owner = nrows.Owner(prelenuntil+i, locid);
2433  sendcnt[owner]++;
2434 
2435  rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
2436  cols[i] = Atuples.colindex(i) + coffset; // need the global col index
2437  }
2438 
2439  int * recvcnt = new int[nprocs];
2440  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
2441 
2442  int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
2443  int * rdpls = new int[nprocs]();
2444  partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
2445  partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
2446 
2447  MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
2448  MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
2449 
2450  DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
2451  DeleteAll(prelens, rows, cols, vals);
2452  distrows = nrows;
2453  distcols = ncols;
2454 }
2455 
2456 template <class IT, class NT, class DER>
2457 ofstream& SpParMat<IT,NT,DER>::put(ofstream& outfile) const
2458 {
2459  outfile << (*spSeq) << endl;
2460  return outfile;
2461 }
2462 
2463 template <class IU, class NU, class UDER>
2464 ofstream& operator<<(ofstream& outfile, const SpParMat<IU, NU, UDER> & s)
2465 {
2466  return s.put(outfile) ; // use the right put() function
2467 
2468 }
2469 
2477 template <class IT, class NT,class DER>
2478 int SpParMat<IT,NT,DER>::Owner(IT total_m, IT total_n, IT grow, IT gcol, IT & lrow, IT & lcol) const
2479 {
2480  int procrows = commGrid->GetGridRows();
2481  int proccols = commGrid->GetGridCols();
2482  IT m_perproc = total_m / procrows;
2483  IT n_perproc = total_n / proccols;
2484 
2485  int own_procrow; // owner's processor row
2486  if(m_perproc != 0)
2487  {
2488  own_procrow = std::min(static_cast<int>(grow / m_perproc), procrows-1); // owner's processor row
2489  }
2490  else // all owned by the last processor row
2491  {
2492  own_procrow = procrows -1;
2493  }
2494  int own_proccol;
2495  if(n_perproc != 0)
2496  {
2497  own_proccol = std::min(static_cast<int>(gcol / n_perproc), proccols-1);
2498  }
2499  else
2500  {
2501  own_proccol = proccols-1;
2502  }
2503  lrow = grow - (own_procrow * m_perproc);
2504  lcol = gcol - (own_proccol * n_perproc);
2505  return commGrid->GetRank(own_procrow, own_proccol);
2506 }