source: CIVL/text/include/mpi.cvl@ 325d439

1.23 2.0 main test-branch
Last change on this file since 325d439 was 411e0b8, checked in by Manchun Zheng <zmanchun@…>, 12 years ago

cleaned up usage cycle; improved javadocs; improved error message in mpi.cvl.

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@1051 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 13.9 KB
Line 
1#ifdef __MPI__
2#else
3 #include <mpi-common.h>
4 #ifdef __CIVLC__
5 #else
6 #include <civlc.h>
7 #endif
8#endif
9
10//TODO make a Datatype struct, which has a field "int size;" Define one of these objects for MPI_INT, MPI_DOUBLE, etc.
11//TODO Then provide methods like MPI provides for creating new ones.
12//TODO then support MPI_Type_contig(datatype, int n).
13
14/****************************** Helper Functions **********************************/
15int sizeofDatatype(MPI_Datatype datatype) {
16 switch (datatype) {
17 case MPI_INT:
18 return sizeof(int);
19 case MPI_FLOAT:
20 return sizeof(float);
21 case MPI_DOUBLE:
22 return sizeof(double);
23 case MPI_CHAR:
24 return sizeof(char);
25 default:
26 $assert(0, "Unreachable");
27 }
28}
29
30/* Helpers for MPI_Reduce and MPI_Allreduce */
31void sumInt(int result[], int buf[], int real_count, int nprocs){
32 //init result array
33 $atomic{
34 for(int i=0; i<real_count; i++){
35 result[i] = 0;
36 }
37 //sum up
38 for(int i=0; i<nprocs; i++)
39 for(int j=0; j<real_count; j++)
40 result[j] = result[j] + buf[i * real_count + j];
41 }
42}
43
44void sumFloat(float result[], float buf[], int real_count, int nprocs){
45 $atomic{
46 for(int i=0; i<real_count; i++){
47 result[i] = 0;
48 }
49 //sum up
50 for(int i=0; i<nprocs; i++)
51 for(int j=0; j<real_count; j++)
52 result[j] = result[j] + buf[i * real_count + j];
53 }
54}
55
56void sumDouble(double result[], double buf[], int real_count, int nprocs){
57 $atomic{
58 for(int i=0; i<real_count; i++){
59 result[i] = 0;
60 }
61 //sum up
62 for(int i=0; i<nprocs; i++)
63 for(int j=0; j<real_count; j++)
64 result[j] = result[j] + buf[i * real_count + j];
65 }
66}
67
68
69void maxInt(int result[], int buf[], int real_count, int nprocs){
70 //init result arra
71 $atomic{
72 for(int i=0; i<real_count; i++){
73 result[i] = buf[i * real_count];
74 }
75
76 for(int i=0; i<nprocs; i++)
77 for(int j=0; j<real_count; j++){
78 if(buf[i * real_count + j] > result[j])
79 result[j] = buf[i * real_count + j];
80 }
81 }
82}
83
84void maxFloat(float result[], float buf[], int real_count, int nprocs){
85 //init result array
86 $atom{
87 for(int i=0; i<real_count; i++){
88 result[i] = buf[i * real_count];
89 }
90
91 for(int i=0; i<nprocs; i++)
92 for(int j=0; j<real_count; j++){
93 if(buf[i * real_count + j] > result[j])
94 result[j] = buf[i * real_count + j];
95 }
96 }
97}
98
99void maxDouble(double result[], double buf[], int real_count, int nprocs){
100 //init result array
101 $atom{
102 for(int i=0; i<real_count; i++){
103 result[i] = buf[i * real_count];
104 }
105
106 for(int i=0; i<nprocs; i++)
107 for(int j=0; j<real_count; j++){
108 if(buf[i * real_count + j] > result[j])
109 result[j] = buf[i * real_count + j];
110 }
111 }
112}
113
114void minInt(int result[], int buf[], int real_count, int nprocs){
115 //init result array
116 $atom{
117 for(int i=0; i<real_count; i++){
118 result[i] = buf[i * real_count];
119 }
120
121 for(int i=0; i<nprocs; i++)
122 for(int j=0; j<real_count; j++){
123 if(buf[i * real_count + j] < result[j])
124 result[j] = buf[i * real_count + j];
125 }
126 }
127}
128
129void minFloat(float result[], float buf[], int real_count, int nprocs){
130 //init result array
131 $atom{
132 for(int i=0; i<real_count; i++){
133 result[i] = buf[i * real_count];
134 }
135
136 for(int i=0; i<nprocs; i++)
137 for(int j=0; j<real_count; j++){
138 if(buf[i * real_count + j] < result[j])
139 result[j] = buf[i * real_count + j];
140 }
141 }
142}
143
144void minDouble(double result[], double buf[], int real_count, int nprocs){
145 //init result array
146 $atom{
147 for(int i=0; i<real_count; i++){
148 result[i] = buf[i * real_count];
149 }
150
151 for(int i=0; i<nprocs; i++)
152 for(int j=0; j<real_count; j++){
153 if(buf[i * real_count + j] < result[j])
154 result[j] = buf[i * real_count + j];
155 }
156 }
157}
158
159/************************** MPI LIB Implementations *******************************/
160CMPI_Gcomm CMPI_Gcomm_create($scope scope, int size) {
161 CMPI_Gcomm result;
162
163 result.p2p = $gcomm_create(scope, size);
164 result.col = $gcomm_create(scope, size);
165 return result;
166}
167
168void CMPI_Gcomm_destroy(CMPI_Gcomm gc) {
169 $gcomm_destroy(gc.p2p);
170 $gcomm_destroy(gc.col);
171}
172
173MPI_Comm MPI_Comm_create($scope scope, CMPI_Gcomm gc, int rank) {
174 MPI_Comm result;
175
176 result.p2p = $comm_create(scope, gc.p2p, rank);
177 result.col = $comm_create(scope, gc.col, rank);
178 result.status = __UNINIT;
179 return result;
180}
181
182void MPI_Comm_destroy(MPI_Comm comm) {
183 $comm_destroy(comm.p2p);
184 $comm_destroy(comm.col);
185}
186
187int __MPI_Init(MPI_Comm *comm) {
188 comm->status = __INIT;
189 return 0;
190}
191
192int __MPI_Finalize(MPI_Comm *comm) {
193 comm->status = __FINALIZED;
194 return 0;
195}
196
197int MPI_Comm_size(MPI_Comm comm, int *size) {
198 $assert(comm.status == __INIT, "MPI_Comm_size() cannot be invoked without MPI_Init() being called before.\n");
199 *size = $comm_size(comm.p2p);
200 return 0;
201}
202
203int MPI_Comm_rank(MPI_Comm comm, int *rank) {
204 $assert(comm.status == __INIT, "MPI_Comm_rank() cannot be invoked without MPI_Init() being called before.\n");
205 *rank = $comm_place(comm.p2p);
206 return 0;
207}
208
209
210int CMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
211 int tag, $comm comm) {
212 if (dest >= 0) {
213 int size = count*sizeofDatatype(datatype);
214 int place = $comm_place(comm);
215 $message out = $message_pack(place, dest, tag, buf, size);
216
217 $comm_enqueue(comm, out);
218 }
219 return 0;
220}
221
222int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
223 int tag, MPI_Comm comm) {
224 $assert(comm.status == __INIT, "MPI_Send() cannot be invoked without MPI_Init() being called before.\n");
225 return CMPI_Send(buf, count, datatype, dest, tag, comm.p2p);
226}
227
228
229int CMPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
230 int tag, $comm comm, MPI_Status *status) {
231 if (source >= 0) {
232 $message in = $comm_dequeue(comm, source, tag);
233 int size = count*sizeofDatatype(datatype);
234
235 $message_unpack(in, buf, size);
236 if (status != MPI_STATUS_IGNORE) {
237 status->size = $message_size(in);
238 status->MPI_SOURCE = $message_source(in);
239 status->MPI_TAG = $message_tag(in);
240 status->MPI_ERROR = 0;
241 }
242 }
243 return 0;
244}
245
246int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
247 int tag, MPI_Comm comm, MPI_Status *status) {
248 $assert(comm.status == __INIT, "MPI_Recv() cannot be invoked without MPI_Init() being called before.\n");
249 return CMPI_Recv(buf, count, datatype, source, tag, comm.p2p, status);
250}
251
252int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
253 //$assert(__my_status == __INIT, "MPI status is not INIT.\n");
254 *count = status->size/sizeofDatatype(datatype);
255 return 0;
256}
257
258int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
259 int dest, int sendtag,
260 void *recvbuf, int recvcount, MPI_Datatype recvtype,
261 int source, int recvtag,
262 MPI_Comm comm, MPI_Status *status) {
263 $assert(comm.status == __INIT, "MPI_Sendrecv() cannot be invoked without MPI_Init() being called before.\n");
264 MPI_Send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
265 MPI_Recv(recvbuf, recvcount, recvtype, source, recvtag, comm, status);
266 return 0;
267}
268
269/* Broadcasts a message from root to everyone else.
270 * Need to use a differnt comm.
271 */
272int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
273 MPI_Comm comm) {
274 int place = $comm_place(comm.col);
275
276 $assert(comm.status == __INIT, "MPI_Bcast() cannot be invoked without MPI_Init() being called before.\n");
277 place = $comm_place(comm.col);
278 if (place == root) {
279 int nprocs = $comm_size(comm.col);
280
281 for (int i=0; i<nprocs; i++)
282 if (i != root)
283 CMPI_Send(buf, count, datatype, i, BCAST_TAG, comm.col);
284 } else {
285 CMPI_Recv(buf, count, datatype, root, BCAST_TAG, comm.col,
286 MPI_STATUS_IGNORE);
287 }
288 return 0;
289}
290
291/* Reduces values on all processes to a single value */
292int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
293 MPI_Op op, int root, MPI_Comm comm){
294 int place;
295 MPI_Status status;
296
297 $assert(comm.status == __INIT, "MPI_Reduce() cannot be invoked without MPI_Init() being called before.\n");
298 place = $comm_place(comm.col);
299 //non-root
300 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
301 if(place != root)
302 return 0;
303 else{
304 //root
305 int nprocs = $comm_size(comm.col);
306 int real_count = -1; //the real count gotton from MPI_Status
307 //void ** buf; //Buffer stores data from other processes
308 void * recv; //Buffer used in MPI_Recv()
309 void * results; //Array of results
310 $scope here = $root;
311 struct Bundle_buf{
312 int I[nprocs * count];
313 float F[nprocs * count];
314 double D[nprocs * count];
315 } buf;
316
317 switch(datatype){
318 case MPI_INT :
319 recv = (int *)$malloc(here, count * sizeof(int));
320 results = (int *)$malloc(here, count * sizeof(int));
321 break;
322 case MPI_FLOAT :
323 recv = (float *)$malloc(here, count * sizeof(float));
324 results = (float *)$malloc(here, count * sizeof(float));
325 break;
326 case MPI_DOUBLE :
327 recv = (double *)$malloc(here, count * sizeof(double));
328 results = (double *)$malloc(here, count * sizeof(double));
329 break;
330 }
331 for(int i=0; i<nprocs; i++){
332 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
333 &status);
334 MPI_Get_count(&status, datatype, &real_count);
335 $assert(real_count == count);
336
337 switch(datatype){
338 case MPI_INT :
339 for(int j=0; j<real_count; j++)
340 buf.I[i * real_count + j] = ((int *)recv)[j];
341 break;
342 case MPI_FLOAT :
343 for(int j=0; j<real_count; j++)
344 buf.F[i * real_count + j] = ((float *)recv)[j];
345 break;
346 case MPI_DOUBLE :
347 for(int j=0; j<real_count; j++)
348 buf.D[i * real_count + j] = ((double *)recv)[j];
349 break;
350 }
351 }
352
353 //operations
354 switch(datatype){
355 case MPI_INT:
356 if(op == MPI_SUM)
357 sumInt(results, buf.I, real_count, nprocs);
358 if(op == MPI_MAX)
359 maxInt(results, buf.I, real_count, nprocs);
360 if(op == MPI_MIN)
361 minInt(results, buf.I, real_count, nprocs);
362 break;
363 case MPI_FLOAT:
364 if(op == MPI_SUM)
365 sumFloat(results, buf.F, real_count, nprocs);
366 if(op == MPI_MAX)
367 maxFloat(results, buf.F, real_count, nprocs);
368 if(op == MPI_MIN)
369 minFloat(results, buf.F, real_count, nprocs);
370 break;
371 case MPI_DOUBLE:
372 if(op == MPI_SUM)
373 sumDouble(results, buf.D, real_count, nprocs);
374 if(op == MPI_MAX)
375 maxDouble(results, buf.D, real_count, nprocs);
376 if(op == MPI_MIN)
377 minDouble(results, buf.D, real_count, nprocs);
378 break;
379 }
380 MPI_Sendrecv(results, real_count, datatype,
381 place, 0,
382 recvbuf, real_count, datatype,
383 place, 0,
384 comm, &status);
385 $free(recv);
386 $free(results);
387 }
388 return 0;
389}
390
391/* Combines values from all processes and distributes the result back to all processes */
392/* default root is 0 */
393int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
394 MPI_Op op, MPI_Comm comm){
395 int place;
396 int root;
397 MPI_Status status;
398
399 $assert(comm.status == __INIT, "MPI_Allreduce() cannot be invoked without MPI_Init() being called before.\n");
400 place = $comm_place(comm.col);
401 root = 0;
402 //non-root
403 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
404 if(place != root){
405 MPI_Bcast(recvbuf, count, datatype, root,
406 comm);
407 return 0;
408 }
409 else{
410 //root
411 int nprocs = $comm_size(comm.col);
412 int real_count = -1; //the real count gotton from MPI_Status
413 //void ** buf; //Buffer stores data from other processes
414 void * recv; //Buffer used in MPI_Recv()
415 void * results; //Array of results
416 $scope here = $root;
417 struct Bundle_buf{
418 int I[nprocs * count];
419 float F[nprocs * count];
420 double D[nprocs * count];
421 } buf;
422
423 switch(datatype){
424 case MPI_INT :
425 recv = (int *)$malloc(here, count * sizeof(int));
426 results = (int *)$malloc(here, count * sizeof(int));
427 break;
428 case MPI_FLOAT :
429 recv = (float *)$malloc(here, count * sizeof(float));
430 results = (float *)$malloc(here, count * sizeof(float));
431 break;
432 case MPI_DOUBLE :
433 recv = (double *)$malloc(here, count * sizeof(double));
434 results = (double *)$malloc(here, count * sizeof(double));
435 break;
436 }
437 //recv = (void *)$malloc(here, count * sizeofDatatype(datatype));
438 //results = (void *)$malloc(here, count * sizeofDatatype(datatype));
439
440 for(int i=0; i<nprocs; i++){
441 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
442 &status);
443 MPI_Get_count(&status, datatype, &real_count);
444 $assert(real_count == count);
445
446 switch(datatype){
447 case MPI_INT :
448 for(int j=0; j<real_count; j++)
449 buf.I[i * real_count + j] = ((int *)recv)[j];
450 break;
451 case MPI_FLOAT :
452 for(int j=0; j<real_count; j++)
453 buf.F[i * real_count + j] = ((float *)recv)[j];
454 break;
455 case MPI_DOUBLE :
456 for(int j=0; j<real_count; j++)
457 buf.D[i * real_count + j] = ((double *)recv)[j];
458 break;
459 }
460 }
461
462 //operations
463 switch(datatype){
464 case MPI_INT:
465 if(op == MPI_SUM)
466 sumInt(results, buf.I, real_count, nprocs);
467 if(op == MPI_MAX)
468 maxInt(results, buf.I, real_count, nprocs);
469 if(op == MPI_MIN)
470 minInt(results, buf.I, real_count, nprocs);
471 break;
472 case MPI_FLOAT:
473 if(op == MPI_SUM)
474 sumFloat(results, buf.F, real_count, nprocs);
475 if(op == MPI_MAX)
476 maxFloat(results, buf.F, real_count, nprocs);
477 if(op == MPI_MIN)
478 minFloat(results, buf.F, real_count, nprocs);
479 break;
480 case MPI_DOUBLE:
481 if(op == MPI_SUM)
482 sumDouble(results, buf.D, real_count, nprocs);
483 if(op == MPI_MAX)
484 maxDouble(results, buf.D, real_count, nprocs);
485 if(op == MPI_MIN)
486 minDouble(results, buf.D, real_count, nprocs);
487 break;
488 }
489 MPI_Sendrecv(results, real_count, datatype,
490 place, 0,
491 recvbuf, real_count, datatype,
492 place, 0,
493 comm, &status);
494 MPI_Bcast(results, real_count, datatype, root,
495 comm);
496 $free(recv);
497 $free(results);
498 }
499 return 0;
500}
Note: See TracBrowser for help on using the repository browser.