source: CIVL/text/include/mpi.cvl@ 4fc1b8d

1.23 2.0 main test-branch
Last change on this file since 4fc1b8d was 1fbac22, checked in by Ziqing Luo <ziqing@…>, 12 years ago

the new cleaned up mpi.cvl

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@1179 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 12.2 KB
Line 
1#ifndef __MPI_CVL__
2#define __MPI_CVL__
3
4//TODO make a Datatype struct, which has a field "int size;" Define one of these objects for MPI_INT, MPI_DOUBLE, etc.
5//TODO Then provide methods like MPI provides for creating new ones.
6//TODO then support MPI_Type_contig(datatype, int n).
7
8#define BCAST_TAG 999
9#define REDUCE_TAG 998
10
11/* Completed definition for mpi-common.h */
12struct MPI_Status{
13 int MPI_SOURCE;
14 int MPI_TAG;
15 int MPI_ERROR;
16 int size;
17};
18
19/* Definition of CIVL-MPI */
20typedef enum {
21 __UNINIT,
22 __INIT,
23 __FINALIZED
24}__MPI_Sys_status__;
25
26struct MPI_Request{
27 int id;
28};
29
30/* Definition of CMPI_Gcomm and MPI_Comm */
31typedef struct CMPI_Gcomm {
32 $gcomm p2p; // point-to-point communication
33 $gcomm col; // collective communication
34 $gbarrier gbarrier;
35} CMPI_Gcomm;
36
37struct MPI_Comm {
38 $comm p2p; // point-to-point communication
39 $comm col; // collective communication
40 $barrier barrier;
41 __MPI_Sys_status__ status;
42};
43
44/****************************** Helper Functions **********************************/
45int sizeofDatatype(MPI_Datatype datatype) {
46 switch (datatype) {
47 case MPI_INT:
48 return sizeof(int);
49 case MPI_FLOAT:
50 return sizeof(float);
51 case MPI_DOUBLE:
52 return sizeof(double);
53 case MPI_CHAR:
54 return sizeof(char);
55 default:
56 $assert(0, "Unreachable");
57 }
58}
59
60/* Helpers for MPI_Reduce and MPI_Allreduce */
61void sumInt(int result[], int buf[], int real_count, int nprocs){
62 //init result array
63 $atomic{
64 for(int i=0; i<real_count; i++){
65 result[i] = 0;
66 }
67 //sum up
68 for(int i=0; i<nprocs; i++)
69 for(int j=0; j<real_count; j++)
70 result[j] = result[j] + buf[i * real_count + j];
71 }
72}
73
74void sumFloat(float result[], float buf[], int real_count, int nprocs){
75 $atomic{
76 for(int i=0; i<real_count; i++){
77 result[i] = 0;
78 }
79 //sum up
80 for(int i=0; i<nprocs; i++)
81 for(int j=0; j<real_count; j++)
82 result[j] = result[j] + buf[i * real_count + j];
83 }
84}
85
86void sumDouble(double result[], double buf[], int real_count, int nprocs){
87 $atomic{
88 for(int i=0; i<real_count; i++){
89 result[i] = 0;
90 }
91 //sum up
92 for(int i=0; i<nprocs; i++)
93 for(int j=0; j<real_count; j++)
94 result[j] = result[j] + buf[i * real_count + j];
95 }
96}
97
98
99void maxInt(int result[], int buf[], int real_count, int nprocs){
100 //init result arra
101 $atomic{
102 for(int i=0; i<real_count; i++){
103 result[i] = buf[i * real_count];
104 }
105
106 for(int i=0; i<nprocs; i++)
107 for(int j=0; j<real_count; j++){
108 if(buf[i * real_count + j] > result[j])
109 result[j] = buf[i * real_count + j];
110 }
111 }
112}
113
114void maxFloat(float result[], float buf[], int real_count, int nprocs){
115 //init result array
116 $atom{
117 for(int i=0; i<real_count; i++){
118 result[i] = buf[i * real_count];
119 }
120
121 for(int i=0; i<nprocs; i++)
122 for(int j=0; j<real_count; j++){
123 if(buf[i * real_count + j] > result[j])
124 result[j] = buf[i * real_count + j];
125 }
126 }
127}
128
129void maxDouble(double result[], double buf[], int real_count, int nprocs){
130 //init result array
131 $atom{
132 for(int i=0; i<real_count; i++){
133 result[i] = buf[i * real_count];
134 }
135
136 for(int i=0; i<nprocs; i++)
137 for(int j=0; j<real_count; j++){
138 if(buf[i * real_count + j] > result[j])
139 result[j] = buf[i * real_count + j];
140 }
141 }
142}
143
144void minInt(int result[], int buf[], int real_count, int nprocs){
145 //init result array
146 $atom{
147 for(int i=0; i<real_count; i++){
148 result[i] = buf[i * real_count];
149 }
150
151 for(int i=0; i<nprocs; i++)
152 for(int j=0; j<real_count; j++){
153 if(buf[i * real_count + j] < result[j])
154 result[j] = buf[i * real_count + j];
155 }
156 }
157}
158
159void minFloat(float result[], float buf[], int real_count, int nprocs){
160 //init result array
161 $atom{
162 for(int i=0; i<real_count; i++){
163 result[i] = buf[i * real_count];
164 }
165
166 for(int i=0; i<nprocs; i++)
167 for(int j=0; j<real_count; j++){
168 if(buf[i * real_count + j] < result[j])
169 result[j] = buf[i * real_count + j];
170 }
171 }
172}
173
174void minDouble(double result[], double buf[], int real_count, int nprocs){
175 //init result array
176 $atom{
177 for(int i=0; i<real_count; i++){
178 result[i] = buf[i * real_count];
179 }
180
181 for(int i=0; i<nprocs; i++)
182 for(int j=0; j<real_count; j++){
183 if(buf[i * real_count + j] < result[j])
184 result[j] = buf[i * real_count + j];
185 }
186 }
187}
188
189/************************** MPI LIB Implementations *******************************/
190CMPI_Gcomm CMPI_Gcomm_create($scope scope, int size) {
191 CMPI_Gcomm result;
192
193 result.p2p = $gcomm_create(scope, size);
194 result.col = $gcomm_create(scope, size);
195 result.gbarrier = $gbarrier_create(scope, size);
196 return result;
197}
198
199void CMPI_Gcomm_destroy(CMPI_Gcomm gc) {
200 $gcomm_destroy(gc.p2p);
201 $gcomm_destroy(gc.col);
202 $gbarrier_destroy(gc.gbarrier);
203}
204
205MPI_Comm CMPI_Comm_create($scope scope, CMPI_Gcomm gc, int rank) {
206 MPI_Comm result;
207
208 result.p2p = $comm_create(scope, gc.p2p, rank);
209 result.col = $comm_create(scope, gc.col, rank);
210 result.barrier = $barrier_create(scope, gc.gbarrier, rank);
211 result.status = __UNINIT;
212 return result;
213}
214
215void CMPI_Comm_destroy(MPI_Comm comm) {
216 $comm_destroy(comm.p2p);
217 $comm_destroy(comm.col);
218 $barrier_destroy(comm.barrier);
219}
220
221int __MPI_Init(MPI_Comm *comm) {
222 comm->status = __INIT;
223 return 0;
224}
225
226int __MPI_Finalize(MPI_Comm *comm) {
227 comm->status = __FINALIZED;
228 return 0;
229}
230
231int MPI_Comm_size(MPI_Comm comm, int *size) {
232 $assert(comm.status == __INIT, "MPI_Comm_size() cannot be invoked without MPI_Init() being called before.\n");
233 *size = $comm_size(comm.p2p);
234 return 0;
235}
236
237int MPI_Comm_rank(MPI_Comm comm, int *rank) {
238 $assert(comm.status == __INIT, "MPI_Comm_rank() cannot be invoked without MPI_Init() being called before.\n");
239 *rank = $comm_place(comm.p2p);
240 return 0;
241}
242
243
244int CMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
245 int tag, $comm comm) {
246 if (dest >= 0) {
247 int size = count*sizeofDatatype(datatype);
248 int place = $comm_place(comm);
249 $message out = $message_pack(place, dest, tag, buf, size);
250
251 $comm_enqueue(comm, out);
252 }
253 return 0;
254}
255
256int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
257 int tag, MPI_Comm comm) {
258 $assert(comm.status == __INIT, "MPI_Send() cannot be invoked without MPI_Init() being called before.\n");
259 return CMPI_Send(buf, count, datatype, dest, tag, comm.p2p);
260}
261
262
263int CMPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
264 int tag, $comm comm, MPI_Status *status) {
265 if (source >= 0) {
266 $message in = $comm_dequeue(comm, source, tag);
267 int size = count*sizeofDatatype(datatype);
268
269 $message_unpack(in, buf, size);
270 if (status != MPI_STATUS_IGNORE) {
271 status->size = $message_size(in);
272 status->MPI_SOURCE = $message_source(in);
273 status->MPI_TAG = $message_tag(in);
274 status->MPI_ERROR = 0;
275 }
276 }
277 return 0;
278}
279
280int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
281 int tag, MPI_Comm comm, MPI_Status *status) {
282 $assert(comm.status == __INIT, "MPI_Recv() cannot be invoked without MPI_Init() being called before.\n");
283 return CMPI_Recv(buf, count, datatype, source, tag, comm.p2p, status);
284}
285
286int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
287 //$assert(__my_status == __INIT, "MPI status is not INIT.\n");
288 *count = status->size/sizeofDatatype(datatype);
289 return 0;
290}
291
292int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
293 int dest, int sendtag,
294 void *recvbuf, int recvcount, MPI_Datatype recvtype,
295 int source, int recvtag,
296 MPI_Comm comm, MPI_Status *status) {
297 $assert(comm.status == __INIT, "MPI_Sendrecv() cannot be invoked without MPI_Init() being called before.\n");
298 MPI_Send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
299 MPI_Recv(recvbuf, recvcount, recvtype, source, recvtag, comm, status);
300 return 0;
301}
302
303/* Broadcasts a message from root to everyone else.
304 * Need to use a differnt comm.
305 */
306int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
307 MPI_Comm comm) {
308 int place = $comm_place(comm.col);
309
310 $assert(comm.status == __INIT, "MPI_Bcast() cannot be invoked without MPI_Init() being called before.\n");
311 place = $comm_place(comm.col);
312 if (place == root) {
313 int nprocs = $comm_size(comm.col);
314
315 for (int i=0; i<nprocs; i++)
316 if (i != root)
317 CMPI_Send(buf, count, datatype, i, BCAST_TAG, comm.col);
318 } else {
319 CMPI_Recv(buf, count, datatype, root, BCAST_TAG, comm.col,
320 MPI_STATUS_IGNORE);
321 }
322 return 0;
323}
324
325/* Reduces values on all processes to a single value */
326int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
327 MPI_Op op, int root, MPI_Comm comm){
328 int place;
329 MPI_Status status;
330
331 $assert(comm.status == __INIT, "MPI_Reduce() cannot be invoked without MPI_Init() being called before.\n");
332 place = $comm_place(comm.col);
333 //non-root
334 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
335 if(place != root)
336 return 0;
337 else{
338 //root
339 int nprocs = $comm_size(comm.col);
340 int real_count = -1; //the real count gotton from MPI_Status
341 //void ** buf; //Buffer stores data from other processes
342 void * recv; //Buffer used in MPI_Recv()
343 void * results; //Array of results
344 $scope here = $root;
345 struct Bundle_buf{
346 int I[nprocs * count];
347 float F[nprocs * count];
348 double D[nprocs * count];
349 } buf;
350
351 switch(datatype){
352 case MPI_INT :
353 recv = (int *)$malloc(here, count * sizeof(int));
354 results = (int *)$malloc(here, count * sizeof(int));
355 break;
356 case MPI_FLOAT :
357 recv = (float *)$malloc(here, count * sizeof(float));
358 results = (float *)$malloc(here, count * sizeof(float));
359 break;
360 case MPI_DOUBLE :
361 recv = (double *)$malloc(here, count * sizeof(double));
362 results = (double *)$malloc(here, count * sizeof(double));
363 break;
364 }
365 for(int i=0; i<nprocs; i++){
366 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
367 &status);
368 MPI_Get_count(&status, datatype, &real_count);
369 $assert(real_count == count);
370
371 switch(datatype){
372 case MPI_INT :
373 for(int j=0; j<real_count; j++)
374 buf.I[i * real_count + j] = ((int *)recv)[j];
375 break;
376 case MPI_FLOAT :
377 for(int j=0; j<real_count; j++)
378 buf.F[i * real_count + j] = ((float *)recv)[j];
379 break;
380 case MPI_DOUBLE :
381 for(int j=0; j<real_count; j++)
382 buf.D[i * real_count + j] = ((double *)recv)[j];
383 break;
384 }
385 }
386
387 //operations
388 switch(datatype){
389 case MPI_INT:
390 if(op == MPI_SUM)
391 sumInt(results, buf.I, real_count, nprocs);
392 if(op == MPI_MAX)
393 maxInt(results, buf.I, real_count, nprocs);
394 if(op == MPI_MIN)
395 minInt(results, buf.I, real_count, nprocs);
396 break;
397 case MPI_FLOAT:
398 if(op == MPI_SUM)
399 sumFloat(results, buf.F, real_count, nprocs);
400 if(op == MPI_MAX)
401 maxFloat(results, buf.F, real_count, nprocs);
402 if(op == MPI_MIN)
403 minFloat(results, buf.F, real_count, nprocs);
404 break;
405 case MPI_DOUBLE:
406 if(op == MPI_SUM)
407 sumDouble(results, buf.D, real_count, nprocs);
408 if(op == MPI_MAX)
409 maxDouble(results, buf.D, real_count, nprocs);
410 if(op == MPI_MIN)
411 minDouble(results, buf.D, real_count, nprocs);
412 break;
413 }
414 MPI_Sendrecv(results, real_count, datatype,
415 place, 0,
416 recvbuf, real_count, datatype,
417 place, 0,
418 comm, &status);
419 $free(recv);
420 $free(results);
421 }
422 return 0;
423}
424
425/* Combines values from all processes and distributes the result back to all processes */
426/* default root is 0 */
427int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
428 MPI_Op op, MPI_Comm comm){
429 int place;
430 int root;
431 MPI_Status status;
432
433 $assert(comm.status == __INIT, "MPI_Allreduce() cannot be invoked without MPI_Init() being called before.\n");
434 place = $comm_place(comm.col);
435 root = 0;
436 MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm);
437 if(place == root)
438 MPI_Sendrecv(recvbuf, count, datatype, place, 0,
439 recvbuf, count, datatype, place, 0,
440 comm, &status);
441 MPI_Bcast(recvbuf, count, datatype, root, comm);
442 return 0;
443}
444
445int MPI_Barrier(MPI_Comm comm){
446
447 $assert(comm.status == __INIT, "MPI_Allreduce() cannot be invoked without MPI_Init() being called before.\n");
448 $barrier_call(comm.barrier);
449}
450#endif
Note: See TracBrowser for help on using the repository browser.