source: CIVL/text/include/mpi.cvl@ 59da0e2

1.23 2.0 main test-branch
Last change on this file since 59da0e2 was 20e1243, checked in by Stephen Siegel <siegel@…>, 12 years ago

Factored out some MPI type definitions from ABC to CIVL.

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@1160 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 14.2 KB
Line 
1#ifdef __MPI_CVL__
2#else
3#define __MPI_CVL__
4#include <mpi-common.h>
5#include <civlc.h>
6#endif
7
8//TODO make a Datatype struct, which has a field "int size;" Define one of these objects for MPI_INT, MPI_DOUBLE, etc.
9//TODO Then provide methods like MPI provides for creating new ones.
10//TODO then support MPI_Type_contig(datatype, int n).
11
12/* Definition of CMPI_Gcomm and MPI_Comm */
13struct CMPI_Gcomm {
14 $gcomm p2p; // point-to-point communication
15 $gcomm col; // collective communication
16};
17
18struct MPI_Comm {
19 $comm p2p; // point-to-point communication
20 $comm col; // collective communication
21 __MPI_Status__ status;
22};
23
24/****************************** Helper Functions **********************************/
25int sizeofDatatype(MPI_Datatype datatype) {
26 switch (datatype) {
27 case MPI_INT:
28 return sizeof(int);
29 case MPI_FLOAT:
30 return sizeof(float);
31 case MPI_DOUBLE:
32 return sizeof(double);
33 case MPI_CHAR:
34 return sizeof(char);
35 default:
36 $assert(0, "Unreachable");
37 }
38}
39
40/* Helpers for MPI_Reduce and MPI_Allreduce */
41void sumInt(int result[], int buf[], int real_count, int nprocs){
42 //init result array
43 $atomic{
44 for(int i=0; i<real_count; i++){
45 result[i] = 0;
46 }
47 //sum up
48 for(int i=0; i<nprocs; i++)
49 for(int j=0; j<real_count; j++)
50 result[j] = result[j] + buf[i * real_count + j];
51 }
52}
53
54void sumFloat(float result[], float buf[], int real_count, int nprocs){
55 $atomic{
56 for(int i=0; i<real_count; i++){
57 result[i] = 0;
58 }
59 //sum up
60 for(int i=0; i<nprocs; i++)
61 for(int j=0; j<real_count; j++)
62 result[j] = result[j] + buf[i * real_count + j];
63 }
64}
65
66void sumDouble(double result[], double buf[], int real_count, int nprocs){
67 $atomic{
68 for(int i=0; i<real_count; i++){
69 result[i] = 0;
70 }
71 //sum up
72 for(int i=0; i<nprocs; i++)
73 for(int j=0; j<real_count; j++)
74 result[j] = result[j] + buf[i * real_count + j];
75 }
76}
77
78
79void maxInt(int result[], int buf[], int real_count, int nprocs){
80 //init result arra
81 $atomic{
82 for(int i=0; i<real_count; i++){
83 result[i] = buf[i * real_count];
84 }
85
86 for(int i=0; i<nprocs; i++)
87 for(int j=0; j<real_count; j++){
88 if(buf[i * real_count + j] > result[j])
89 result[j] = buf[i * real_count + j];
90 }
91 }
92}
93
94void maxFloat(float result[], float buf[], int real_count, int nprocs){
95 //init result array
96 $atom{
97 for(int i=0; i<real_count; i++){
98 result[i] = buf[i * real_count];
99 }
100
101 for(int i=0; i<nprocs; i++)
102 for(int j=0; j<real_count; j++){
103 if(buf[i * real_count + j] > result[j])
104 result[j] = buf[i * real_count + j];
105 }
106 }
107}
108
109void maxDouble(double result[], double buf[], int real_count, int nprocs){
110 //init result array
111 $atom{
112 for(int i=0; i<real_count; i++){
113 result[i] = buf[i * real_count];
114 }
115
116 for(int i=0; i<nprocs; i++)
117 for(int j=0; j<real_count; j++){
118 if(buf[i * real_count + j] > result[j])
119 result[j] = buf[i * real_count + j];
120 }
121 }
122}
123
124void minInt(int result[], int buf[], int real_count, int nprocs){
125 //init result array
126 $atom{
127 for(int i=0; i<real_count; i++){
128 result[i] = buf[i * real_count];
129 }
130
131 for(int i=0; i<nprocs; i++)
132 for(int j=0; j<real_count; j++){
133 if(buf[i * real_count + j] < result[j])
134 result[j] = buf[i * real_count + j];
135 }
136 }
137}
138
139void minFloat(float result[], float buf[], int real_count, int nprocs){
140 //init result array
141 $atom{
142 for(int i=0; i<real_count; i++){
143 result[i] = buf[i * real_count];
144 }
145
146 for(int i=0; i<nprocs; i++)
147 for(int j=0; j<real_count; j++){
148 if(buf[i * real_count + j] < result[j])
149 result[j] = buf[i * real_count + j];
150 }
151 }
152}
153
154void minDouble(double result[], double buf[], int real_count, int nprocs){
155 //init result array
156 $atom{
157 for(int i=0; i<real_count; i++){
158 result[i] = buf[i * real_count];
159 }
160
161 for(int i=0; i<nprocs; i++)
162 for(int j=0; j<real_count; j++){
163 if(buf[i * real_count + j] < result[j])
164 result[j] = buf[i * real_count + j];
165 }
166 }
167}
168
169/************************** MPI LIB Implementations *******************************/
170CMPI_Gcomm CMPI_Gcomm_create($scope scope, int size) {
171 CMPI_Gcomm result;
172
173 result.p2p = $gcomm_create(scope, size);
174 result.col = $gcomm_create(scope, size);
175 return result;
176}
177
178void CMPI_Gcomm_destroy(CMPI_Gcomm gc) {
179 $gcomm_destroy(gc.p2p);
180 $gcomm_destroy(gc.col);
181}
182
183MPI_Comm MPI_Comm_create($scope scope, CMPI_Gcomm gc, int rank) {
184 MPI_Comm result;
185
186 result.p2p = $comm_create(scope, gc.p2p, rank);
187 result.col = $comm_create(scope, gc.col, rank);
188 result.status = __UNINIT;
189 return result;
190}
191
192void MPI_Comm_destroy(MPI_Comm comm) {
193 $comm_destroy(comm.p2p);
194 $comm_destroy(comm.col);
195}
196
197int __MPI_Init(MPI_Comm *comm) {
198 comm->status = __INIT;
199 return 0;
200}
201
202int __MPI_Finalize(MPI_Comm *comm) {
203 comm->status = __FINALIZED;
204 return 0;
205}
206
207int MPI_Comm_size(MPI_Comm comm, int *size) {
208 $assert(comm.status == __INIT, "MPI_Comm_size() cannot be invoked without MPI_Init() being called before.\n");
209 *size = $comm_size(comm.p2p);
210 return 0;
211}
212
213int MPI_Comm_rank(MPI_Comm comm, int *rank) {
214 $assert(comm.status == __INIT, "MPI_Comm_rank() cannot be invoked without MPI_Init() being called before.\n");
215 *rank = $comm_place(comm.p2p);
216 return 0;
217}
218
219
220int CMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
221 int tag, $comm comm) {
222 if (dest >= 0) {
223 int size = count*sizeofDatatype(datatype);
224 int place = $comm_place(comm);
225 $message out = $message_pack(place, dest, tag, buf, size);
226
227 $comm_enqueue(comm, out);
228 }
229 return 0;
230}
231
232int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
233 int tag, MPI_Comm comm) {
234 $assert(comm.status == __INIT, "MPI_Send() cannot be invoked without MPI_Init() being called before.\n");
235 return CMPI_Send(buf, count, datatype, dest, tag, comm.p2p);
236}
237
238
239int CMPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
240 int tag, $comm comm, MPI_Status *status) {
241 if (source >= 0) {
242 $message in = $comm_dequeue(comm, source, tag);
243 int size = count*sizeofDatatype(datatype);
244
245 $message_unpack(in, buf, size);
246 if (status != MPI_STATUS_IGNORE) {
247 status->size = $message_size(in);
248 status->MPI_SOURCE = $message_source(in);
249 status->MPI_TAG = $message_tag(in);
250 status->MPI_ERROR = 0;
251 }
252 }
253 return 0;
254}
255
256int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
257 int tag, MPI_Comm comm, MPI_Status *status) {
258 $assert(comm.status == __INIT, "MPI_Recv() cannot be invoked without MPI_Init() being called before.\n");
259 return CMPI_Recv(buf, count, datatype, source, tag, comm.p2p, status);
260}
261
262int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
263 //$assert(__my_status == __INIT, "MPI status is not INIT.\n");
264 *count = status->size/sizeofDatatype(datatype);
265 return 0;
266}
267
268int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
269 int dest, int sendtag,
270 void *recvbuf, int recvcount, MPI_Datatype recvtype,
271 int source, int recvtag,
272 MPI_Comm comm, MPI_Status *status) {
273 $assert(comm.status == __INIT, "MPI_Sendrecv() cannot be invoked without MPI_Init() being called before.\n");
274 MPI_Send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
275 MPI_Recv(recvbuf, recvcount, recvtype, source, recvtag, comm, status);
276 return 0;
277}
278
279/* Broadcasts a message from root to everyone else.
280 * Need to use a differnt comm.
281 */
282int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
283 MPI_Comm comm) {
284 int place = $comm_place(comm.col);
285
286 $assert(comm.status == __INIT, "MPI_Bcast() cannot be invoked without MPI_Init() being called before.\n");
287 place = $comm_place(comm.col);
288 if (place == root) {
289 int nprocs = $comm_size(comm.col);
290
291 for (int i=0; i<nprocs; i++)
292 if (i != root)
293 CMPI_Send(buf, count, datatype, i, BCAST_TAG, comm.col);
294 } else {
295 CMPI_Recv(buf, count, datatype, root, BCAST_TAG, comm.col,
296 MPI_STATUS_IGNORE);
297 }
298 return 0;
299}
300
301/* Reduces values on all processes to a single value */
302int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
303 MPI_Op op, int root, MPI_Comm comm){
304 int place;
305 MPI_Status status;
306
307 $assert(comm.status == __INIT, "MPI_Reduce() cannot be invoked without MPI_Init() being called before.\n");
308 place = $comm_place(comm.col);
309 //non-root
310 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
311 if(place != root)
312 return 0;
313 else{
314 //root
315 int nprocs = $comm_size(comm.col);
316 int real_count = -1; //the real count gotton from MPI_Status
317 //void ** buf; //Buffer stores data from other processes
318 void * recv; //Buffer used in MPI_Recv()
319 void * results; //Array of results
320 $scope here = $root;
321 struct Bundle_buf{
322 int I[nprocs * count];
323 float F[nprocs * count];
324 double D[nprocs * count];
325 } buf;
326
327 switch(datatype){
328 case MPI_INT :
329 recv = (int *)$malloc(here, count * sizeof(int));
330 results = (int *)$malloc(here, count * sizeof(int));
331 break;
332 case MPI_FLOAT :
333 recv = (float *)$malloc(here, count * sizeof(float));
334 results = (float *)$malloc(here, count * sizeof(float));
335 break;
336 case MPI_DOUBLE :
337 recv = (double *)$malloc(here, count * sizeof(double));
338 results = (double *)$malloc(here, count * sizeof(double));
339 break;
340 }
341 for(int i=0; i<nprocs; i++){
342 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
343 &status);
344 MPI_Get_count(&status, datatype, &real_count);
345 $assert(real_count == count);
346
347 switch(datatype){
348 case MPI_INT :
349 for(int j=0; j<real_count; j++)
350 buf.I[i * real_count + j] = ((int *)recv)[j];
351 break;
352 case MPI_FLOAT :
353 for(int j=0; j<real_count; j++)
354 buf.F[i * real_count + j] = ((float *)recv)[j];
355 break;
356 case MPI_DOUBLE :
357 for(int j=0; j<real_count; j++)
358 buf.D[i * real_count + j] = ((double *)recv)[j];
359 break;
360 }
361 }
362
363 //operations
364 switch(datatype){
365 case MPI_INT:
366 if(op == MPI_SUM)
367 sumInt(results, buf.I, real_count, nprocs);
368 if(op == MPI_MAX)
369 maxInt(results, buf.I, real_count, nprocs);
370 if(op == MPI_MIN)
371 minInt(results, buf.I, real_count, nprocs);
372 break;
373 case MPI_FLOAT:
374 if(op == MPI_SUM)
375 sumFloat(results, buf.F, real_count, nprocs);
376 if(op == MPI_MAX)
377 maxFloat(results, buf.F, real_count, nprocs);
378 if(op == MPI_MIN)
379 minFloat(results, buf.F, real_count, nprocs);
380 break;
381 case MPI_DOUBLE:
382 if(op == MPI_SUM)
383 sumDouble(results, buf.D, real_count, nprocs);
384 if(op == MPI_MAX)
385 maxDouble(results, buf.D, real_count, nprocs);
386 if(op == MPI_MIN)
387 minDouble(results, buf.D, real_count, nprocs);
388 break;
389 }
390 MPI_Sendrecv(results, real_count, datatype,
391 place, 0,
392 recvbuf, real_count, datatype,
393 place, 0,
394 comm, &status);
395 $free(recv);
396 $free(results);
397 }
398 return 0;
399}
400
401/* Combines values from all processes and distributes the result back to all processes */
402/* default root is 0 */
403int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
404 MPI_Op op, MPI_Comm comm){
405 int place;
406 int root;
407 MPI_Status status;
408
409 $assert(comm.status == __INIT, "MPI_Allreduce() cannot be invoked without MPI_Init() being called before.\n");
410 place = $comm_place(comm.col);
411 root = 0;
412 //non-root
413 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
414 if(place != root){
415 MPI_Bcast(recvbuf, count, datatype, root,
416 comm);
417 return 0;
418 }
419 else{
420 //root
421 int nprocs = $comm_size(comm.col);
422 int real_count = -1; //the real count gotton from MPI_Status
423 //void ** buf; //Buffer stores data from other processes
424 void * recv; //Buffer used in MPI_Recv()
425 void * results; //Array of results
426 $scope here = $root;
427 struct Bundle_buf{
428 int I[nprocs * count];
429 float F[nprocs * count];
430 double D[nprocs * count];
431 } buf;
432
433 switch(datatype){
434 case MPI_INT :
435 recv = (int *)$malloc(here, count * sizeof(int));
436 results = (int *)$malloc(here, count * sizeof(int));
437 break;
438 case MPI_FLOAT :
439 recv = (float *)$malloc(here, count * sizeof(float));
440 results = (float *)$malloc(here, count * sizeof(float));
441 break;
442 case MPI_DOUBLE :
443 recv = (double *)$malloc(here, count * sizeof(double));
444 results = (double *)$malloc(here, count * sizeof(double));
445 break;
446 }
447 //recv = (void *)$malloc(here, count * sizeofDatatype(datatype));
448 //results = (void *)$malloc(here, count * sizeofDatatype(datatype));
449
450 for(int i=0; i<nprocs; i++){
451 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
452 &status);
453 MPI_Get_count(&status, datatype, &real_count);
454 $assert(real_count == count);
455
456 switch(datatype){
457 case MPI_INT :
458 for(int j=0; j<real_count; j++)
459 buf.I[i * real_count + j] = ((int *)recv)[j];
460 break;
461 case MPI_FLOAT :
462 for(int j=0; j<real_count; j++)
463 buf.F[i * real_count + j] = ((float *)recv)[j];
464 break;
465 case MPI_DOUBLE :
466 for(int j=0; j<real_count; j++)
467 buf.D[i * real_count + j] = ((double *)recv)[j];
468 break;
469 }
470 }
471
472 //operations
473 switch(datatype){
474 case MPI_INT:
475 if(op == MPI_SUM)
476 sumInt(results, buf.I, real_count, nprocs);
477 if(op == MPI_MAX)
478 maxInt(results, buf.I, real_count, nprocs);
479 if(op == MPI_MIN)
480 minInt(results, buf.I, real_count, nprocs);
481 break;
482 case MPI_FLOAT:
483 if(op == MPI_SUM)
484 sumFloat(results, buf.F, real_count, nprocs);
485 if(op == MPI_MAX)
486 maxFloat(results, buf.F, real_count, nprocs);
487 if(op == MPI_MIN)
488 minFloat(results, buf.F, real_count, nprocs);
489 break;
490 case MPI_DOUBLE:
491 if(op == MPI_SUM)
492 sumDouble(results, buf.D, real_count, nprocs);
493 if(op == MPI_MAX)
494 maxDouble(results, buf.D, real_count, nprocs);
495 if(op == MPI_MIN)
496 minDouble(results, buf.D, real_count, nprocs);
497 break;
498 }
499 MPI_Sendrecv(results, real_count, datatype,
500 place, 0,
501 recvbuf, real_count, datatype,
502 place, 0,
503 comm, &status);
504 MPI_Bcast(results, real_count, datatype, root,
505 comm);
506 $free(recv);
507 $free(results);
508 }
509 return 0;
510}
511
512#endif
Note: See TracBrowser for help on using the repository browser.