source: CIVL/text/include/mpi.cvl@ 4b4bbb3

1.23 2.0 main test-branch
Last change on this file since 4b4bbb3 was c574547, checked in by Manchun Zheng <zmanchun@…>, 12 years ago

added assertions to report error if MPI functions are called without calling MPI_Init() beforehand.

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@951 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 13.3 KB
Line 
1#ifdef __MPI__
2#else
3 #include <mpi-common.h>
4 #ifdef __CIVLC__
5 #else
6 #include <civlc.h>
7 #endif
8#endif
9
10/****************************** Helper Functions **********************************/
11int sizeofDatatype(MPI_Datatype datatype) {
12 switch (datatype) {
13 case MPI_INT:
14 return sizeof(int);
15 case MPI_FLOAT:
16 return sizeof(float);
17 case MPI_DOUBLE:
18 return sizeof(double);
19 case MPI_CHAR:
20 return sizeof(char);
21 default:
22 $assert(0, "Unreachable");
23 }
24}
25
26/* Helpers for MPI_Reduce and MPI_Allreduce */
27void sumInt(int result[], int buf[], int real_count, int nprocs){
28 //init result array
29 $atomic{
30 for(int i=0; i<real_count; i++){
31 result[i] = 0;
32 }
33 //sum up
34 for(int i=0; i<nprocs; i++)
35 for(int j=0; j<real_count; j++)
36 result[j] = result[j] + buf[i * real_count + j];
37 }
38}
39
40void sumFloat(float result[], float buf[], int real_count, int nprocs){
41 $atomic{
42 for(int i=0; i<real_count; i++){
43 result[i] = 0;
44 }
45 //sum up
46 for(int i=0; i<nprocs; i++)
47 for(int j=0; j<real_count; j++)
48 result[j] = result[j] + buf[i * real_count + j];
49 }
50}
51
52void sumDouble(double result[], double buf[], int real_count, int nprocs){
53 $atomic{
54 for(int i=0; i<real_count; i++){
55 result[i] = 0;
56 }
57 //sum up
58 for(int i=0; i<nprocs; i++)
59 for(int j=0; j<real_count; j++)
60 result[j] = result[j] + buf[i * real_count + j];
61 }
62}
63
64
65void maxInt(int result[], int buf[], int real_count, int nprocs){
66 //init result arra
67 $atomic{
68 for(int i=0; i<real_count; i++){
69 result[i] = buf[i * real_count];
70 }
71
72 for(int i=0; i<nprocs; i++)
73 for(int j=0; j<real_count; j++){
74 if(buf[i * real_count + j] > result[j])
75 result[j] = buf[i * real_count + j];
76 }
77 }
78}
79
80void maxFloat(float result[], float buf[], int real_count, int nprocs){
81 //init result array
82 $atom{
83 for(int i=0; i<real_count; i++){
84 result[i] = buf[i * real_count];
85 }
86
87 for(int i=0; i<nprocs; i++)
88 for(int j=0; j<real_count; j++){
89 if(buf[i * real_count + j] > result[j])
90 result[j] = buf[i * real_count + j];
91 }
92 }
93}
94
95void maxDouble(double result[], double buf[], int real_count, int nprocs){
96 //init result array
97 $atom{
98 for(int i=0; i<real_count; i++){
99 result[i] = buf[i * real_count];
100 }
101
102 for(int i=0; i<nprocs; i++)
103 for(int j=0; j<real_count; j++){
104 if(buf[i * real_count + j] > result[j])
105 result[j] = buf[i * real_count + j];
106 }
107 }
108}
109
110void minInt(int result[], int buf[], int real_count, int nprocs){
111 //init result array
112 $atom{
113 for(int i=0; i<real_count; i++){
114 result[i] = buf[i * real_count];
115 }
116
117 for(int i=0; i<nprocs; i++)
118 for(int j=0; j<real_count; j++){
119 if(buf[i * real_count + j] < result[j])
120 result[j] = buf[i * real_count + j];
121 }
122 }
123}
124
125void minFloat(float result[], float buf[], int real_count, int nprocs){
126 //init result array
127 $atom{
128 for(int i=0; i<real_count; i++){
129 result[i] = buf[i * real_count];
130 }
131
132 for(int i=0; i<nprocs; i++)
133 for(int j=0; j<real_count; j++){
134 if(buf[i * real_count + j] < result[j])
135 result[j] = buf[i * real_count + j];
136 }
137 }
138}
139
140void minDouble(double result[], double buf[], int real_count, int nprocs){
141 //init result array
142 $atom{
143 for(int i=0; i<real_count; i++){
144 result[i] = buf[i * real_count];
145 }
146
147 for(int i=0; i<nprocs; i++)
148 for(int j=0; j<real_count; j++){
149 if(buf[i * real_count + j] < result[j])
150 result[j] = buf[i * real_count + j];
151 }
152 }
153}
154
155/************************** MPI LIB Implementations *******************************/
156CMPI_Gcomm CMPI_Gcomm_create($scope scope, int size) {
157 CMPI_Gcomm result;
158
159 result.p2p = $gcomm_create(scope, size);
160 result.col = $gcomm_create(scope, size);
161 return result;
162}
163
164void CMPI_Gcomm_destroy(CMPI_Gcomm gc) {
165 $gcomm_destroy(gc.p2p);
166 $gcomm_destroy(gc.col);
167}
168
169MPI_Comm MPI_Comm_create($scope scope, CMPI_Gcomm gc, int rank) {
170 MPI_Comm result;
171
172 result.p2p = $comm_create(scope, gc.p2p, rank);
173 result.col = $comm_create(scope, gc.col, rank);
174 result.status = __UNINIT;
175 return result;
176}
177
178void MPI_Comm_destroy(MPI_Comm comm) {
179 $comm_destroy(comm.p2p);
180 $comm_destroy(comm.col);
181}
182
183int __MPI_Init(MPI_Comm *comm) {
184 comm->status = __INIT;
185 return 0;
186}
187
188int __MPI_Finalize(MPI_Comm *comm) {
189 comm->status = __FINALIZED;
190 return 0;
191}
192
193int MPI_Comm_size(MPI_Comm comm, int *size) {
194 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
195 *size = $comm_size(comm.p2p);
196 return 0;
197}
198
199int MPI_Comm_rank(MPI_Comm comm, int *rank) {
200 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
201 *rank = $comm_place(comm.p2p);
202 return 0;
203}
204
205
206int CMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
207 int tag, $comm comm) {
208 if (dest >= 0) {
209 int size = count*sizeofDatatype(datatype);
210 int place = $comm_place(comm);
211 $message out = $message_pack(place, dest, tag, buf, size);
212
213 $comm_enqueue(comm, out);
214 }
215 return 0;
216}
217
218int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
219 int tag, MPI_Comm comm) {
220 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
221 return CMPI_Send(buf, count, datatype, dest, tag, comm.p2p);
222}
223
224
225int CMPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
226 int tag, $comm comm, MPI_Status *status) {
227 if (source >= 0) {
228 $message in = $comm_dequeue(comm, source, tag);
229 int size = count*sizeofDatatype(datatype);
230
231 $message_unpack(in, buf, size);
232 if (status != MPI_STATUS_IGNORE) {
233 status->size = $message_size(in);
234 status->MPI_SOURCE = $message_source(in);
235 status->MPI_TAG = $message_tag(in);
236 status->MPI_ERROR = 0;
237 }
238 }
239 return 0;
240}
241
242int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
243 int tag, MPI_Comm comm, MPI_Status *status) {
244 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
245 return CMPI_Recv(buf, count, datatype, source, tag, comm.p2p, status);
246}
247
248int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
249 //$assert(__my_status == __INIT, "MPI status is not INIT.\n");
250 *count = status->size/sizeofDatatype(datatype);
251 return 0;
252}
253
254int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
255 int dest, int sendtag,
256 void *recvbuf, int recvcount, MPI_Datatype recvtype,
257 int source, int recvtag,
258 MPI_Comm comm, MPI_Status *status) {
259 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
260 MPI_Send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
261 MPI_Recv(recvbuf, recvcount, recvtype, source, recvtag, comm, status);
262 return 0;
263}
264
265/* Broadcasts a message from root to everyone else.
266 * Need to use a differnt comm.
267 */
268int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
269 MPI_Comm comm) {
270 int place = $comm_place(comm.col);
271
272 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
273 place = $comm_place(comm.col);
274 if (place == root) {
275 int nprocs = $comm_size(comm.col);
276
277 for (int i=0; i<nprocs; i++)
278 if (i != root)
279 CMPI_Send(buf, count, datatype, i, BCAST_TAG, comm.col);
280 } else {
281 CMPI_Recv(buf, count, datatype, root, BCAST_TAG, comm.col,
282 MPI_STATUS_IGNORE);
283 }
284 return 0;
285}
286
287/* Reduces values on all processes to a single value */
288int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
289 MPI_Op op, int root, MPI_Comm comm){
290 int place;
291 MPI_Status status;
292
293 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
294 place = $comm_place(comm.col);
295 //non-root
296 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
297 if(place != root)
298 return 0;
299 else{
300 //root
301 int nprocs = $comm_size(comm.col);
302 int real_count = -1; //the real count gotton from MPI_Status
303 //void ** buf; //Buffer stores data from other processes
304 void * recv; //Buffer used in MPI_Recv()
305 void * results; //Array of results
306 $scope here = $root;
307 struct Bundle_buf{
308 int I[nprocs * count];
309 float F[nprocs * count];
310 double D[nprocs * count];
311 } buf;
312
313 switch(datatype){
314 case MPI_INT :
315 recv = (int *)$malloc(here, count * sizeof(int));
316 results = (int *)$malloc(here, count * sizeof(int));
317 break;
318 case MPI_FLOAT :
319 recv = (float *)$malloc(here, count * sizeof(float));
320 results = (float *)$malloc(here, count * sizeof(float));
321 break;
322 case MPI_DOUBLE :
323 recv = (double *)$malloc(here, count * sizeof(double));
324 results = (double *)$malloc(here, count * sizeof(double));
325 break;
326 }
327 for(int i=0; i<nprocs; i++){
328 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
329 &status);
330 MPI_Get_count(&status, datatype, &real_count);
331 $assert(real_count == count);
332
333 switch(datatype){
334 case MPI_INT :
335 for(int j=0; j<real_count; j++)
336 buf.I[i * real_count + j] = ((int *)recv)[j];
337 break;
338 case MPI_FLOAT :
339 for(int j=0; j<real_count; j++)
340 buf.F[i * real_count + j] = ((float *)recv)[j];
341 break;
342 case MPI_DOUBLE :
343 for(int j=0; j<real_count; j++)
344 buf.D[i * real_count + j] = ((double *)recv)[j];
345 break;
346 }
347 }
348
349 //operations
350 switch(datatype){
351 case MPI_INT:
352 if(op == MPI_SUM)
353 sumInt(results, buf.I, real_count, nprocs);
354 if(op == MPI_MAX)
355 maxInt(results, buf.I, real_count, nprocs);
356 if(op == MPI_MIN)
357 minInt(results, buf.I, real_count, nprocs);
358 break;
359 case MPI_FLOAT:
360 if(op == MPI_SUM)
361 sumFloat(results, buf.F, real_count, nprocs);
362 if(op == MPI_MAX)
363 maxFloat(results, buf.F, real_count, nprocs);
364 if(op == MPI_MIN)
365 minFloat(results, buf.F, real_count, nprocs);
366 break;
367 case MPI_DOUBLE:
368 if(op == MPI_SUM)
369 sumDouble(results, buf.D, real_count, nprocs);
370 if(op == MPI_MAX)
371 maxDouble(results, buf.D, real_count, nprocs);
372 if(op == MPI_MIN)
373 minDouble(results, buf.D, real_count, nprocs);
374 break;
375 }
376 MPI_Sendrecv(results, real_count, MPI_INT,
377 place, 0,
378 recvbuf, real_count, MPI_INT,
379 place, 0,
380 comm, &status);
381 $free(recv);
382 $free(results);
383 }
384 return 0;
385}
386
387/* Combines values from all processes and distributes the result back to all processes */
388/* default root is 0 */
389int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
390 MPI_Op op, MPI_Comm comm){
391 int place;
392 int root;
393 MPI_Status status;
394
395 $assert(comm.status == __INIT, "MPI status is not INIT.\n");
396 place = $comm_place(comm.col);
397 root = 0;
398 //non-root
399 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
400 if(place != root){
401 MPI_Bcast(recvbuf, count, datatype, root,
402 comm);
403 return 0;
404 }
405 else{
406 //root
407 int nprocs = $comm_size(comm.col);
408 int real_count = -1; //the real count gotton from MPI_Status
409 //void ** buf; //Buffer stores data from other processes
410 void * recv; //Buffer used in MPI_Recv()
411 void * results; //Array of results
412 $scope here = $root;
413 struct Bundle_buf{
414 int I[nprocs * count];
415 float F[nprocs * count];
416 double D[nprocs * count];
417 } buf;
418
419 switch(datatype){
420 case MPI_INT :
421 recv = (int *)$malloc(here, count * sizeof(int));
422 results = (int *)$malloc(here, count * sizeof(int));
423 break;
424 case MPI_FLOAT :
425 recv = (float *)$malloc(here, count * sizeof(float));
426 results = (float *)$malloc(here, count * sizeof(float));
427 break;
428 case MPI_DOUBLE :
429 recv = (double *)$malloc(here, count * sizeof(double));
430 results = (double *)$malloc(here, count * sizeof(double));
431 break;
432 }
433 //recv = (void *)$malloc(here, count * sizeofDatatype(datatype));
434 //results = (void *)$malloc(here, count * sizeofDatatype(datatype));
435
436 for(int i=0; i<nprocs; i++){
437 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
438 &status);
439 MPI_Get_count(&status, datatype, &real_count);
440 $assert(real_count == count);
441
442 switch(datatype){
443 case MPI_INT :
444 for(int j=0; j<real_count; j++)
445 buf.I[i * real_count + j] = ((int *)recv)[j];
446 break;
447 case MPI_FLOAT :
448 for(int j=0; j<real_count; j++)
449 buf.F[i * real_count + j] = ((float *)recv)[j];
450 break;
451 case MPI_DOUBLE :
452 for(int j=0; j<real_count; j++)
453 buf.D[i * real_count + j] = ((double *)recv)[j];
454 break;
455 }
456 }
457
458 //operations
459 switch(datatype){
460 case MPI_INT:
461 if(op == MPI_SUM)
462 sumInt(results, buf.I, real_count, nprocs);
463 if(op == MPI_MAX)
464 maxInt(results, buf.I, real_count, nprocs);
465 if(op == MPI_MIN)
466 minInt(results, buf.I, real_count, nprocs);
467 break;
468 case MPI_FLOAT:
469 if(op == MPI_SUM)
470 sumFloat(results, buf.F, real_count, nprocs);
471 if(op == MPI_MAX)
472 maxFloat(results, buf.F, real_count, nprocs);
473 if(op == MPI_MIN)
474 minFloat(results, buf.F, real_count, nprocs);
475 break;
476 case MPI_DOUBLE:
477 if(op == MPI_SUM)
478 sumDouble(results, buf.D, real_count, nprocs);
479 if(op == MPI_MAX)
480 maxDouble(results, buf.D, real_count, nprocs);
481 if(op == MPI_MIN)
482 minDouble(results, buf.D, real_count, nprocs);
483 break;
484 }
485 MPI_Sendrecv(results, real_count, MPI_INT,
486 place, 0,
487 recvbuf, real_count, MPI_INT,
488 place, 0,
489 comm, &status);
490 MPI_Bcast(results, real_count, datatype, root,
491 comm);
492 $free(recv);
493 $free(results);
494 }
495 return 0;
496}
Note: See TracBrowser for help on using the repository browser.