source: CIVL/text/include/mpi.cvl@ 2da7ec1

1.23 2.0 main test-branch
Last change on this file since 2da7ec1 was c75401d, checked in by Ziqing Luo <ziqing@…>, 12 years ago

A little change for wild card receive inside MPI_Recv() function

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@1249 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 12.5 KB
Line 
1#ifndef __MPI_CVL__
2#define __MPI_CVL__
3
4//TODO make a Datatype struct, which has a field "int size;" Define one of these objects for MPI_INT, MPI_DOUBLE, etc.
5//TODO Then provide methods like MPI provides for creating new ones.
6//TODO then support MPI_Type_contig(datatype, int n).
7
8#define BCAST_TAG 999
9#define REDUCE_TAG 998
10
11/* Completed definition for mpi-common.h */
12struct MPI_Status{
13 int MPI_SOURCE;
14 int MPI_TAG;
15 int MPI_ERROR;
16 int size;
17};
18
19/* Definition of CIVL-MPI */
20typedef enum {
21 __UNINIT,
22 __INIT,
23 __FINALIZED
24}__MPI_Sys_status__;
25
26struct MPI_Request{
27 int id;
28};
29
30/* Definition of CMPI_Gcomm and MPI_Comm */
31typedef struct CMPI_Gcomm {
32 $gcomm p2p; // point-to-point communication
33 $gcomm col; // collective communication
34 $gbarrier gbarrier;
35} CMPI_Gcomm;
36
37struct MPI_Comm {
38 $comm p2p; // point-to-point communication
39 $comm col; // collective communication
40 $barrier barrier;
41 __MPI_Sys_status__ status;
42};
43
44/********************************** State **************************************/
45
46/* The number of times the MPI_Wtime function has been called */
47int CMPI_time_count = 0;
48
49/****************************** Helper Functions **********************************/
50int sizeofDatatype(MPI_Datatype datatype) {
51 switch (datatype) {
52 case MPI_INT:
53 return sizeof(int);
54 case MPI_FLOAT:
55 return sizeof(float);
56 case MPI_DOUBLE:
57 return sizeof(double);
58 case MPI_CHAR:
59 return sizeof(char);
60 default:
61 $assert(0, "Unreachable");
62 }
63}
64
65/* Helpers for MPI_Reduce and MPI_Allreduce */
66void sumInt(int result[], int buf[], int real_count, int nprocs){
67 //init result array
68 $atomic{
69 for(int i=0; i<real_count; i++){
70 result[i] = 0;
71 }
72 //sum up
73 for(int i=0; i<nprocs; i++)
74 for(int j=0; j<real_count; j++)
75 result[j] = result[j] + buf[i * real_count + j];
76 }
77}
78
79void sumFloat(float result[], float buf[], int real_count, int nprocs){
80 $atomic{
81 for(int i=0; i<real_count; i++){
82 result[i] = 0;
83 }
84 //sum up
85 for(int i=0; i<nprocs; i++)
86 for(int j=0; j<real_count; j++)
87 result[j] = result[j] + buf[i * real_count + j];
88 }
89}
90
91void sumDouble(double result[], double buf[], int real_count, int nprocs){
92 $atomic{
93 for(int i=0; i<real_count; i++){
94 result[i] = 0;
95 }
96 //sum up
97 for(int i=0; i<nprocs; i++)
98 for(int j=0; j<real_count; j++)
99 result[j] = result[j] + buf[i * real_count + j];
100 }
101}
102
103
104void maxInt(int result[], int buf[], int real_count, int nprocs){
105 //init result arra
106 $atomic{
107 for(int i=0; i<real_count; i++){
108 result[i] = buf[i * real_count];
109 }
110
111 for(int i=0; i<nprocs; i++)
112 for(int j=0; j<real_count; j++){
113 if(buf[i * real_count + j] > result[j])
114 result[j] = buf[i * real_count + j];
115 }
116 }
117}
118
119void maxFloat(float result[], float buf[], int real_count, int nprocs){
120 //init result array
121 $atom{
122 for(int i=0; i<real_count; i++){
123 result[i] = buf[i * real_count];
124 }
125
126 for(int i=0; i<nprocs; i++)
127 for(int j=0; j<real_count; j++){
128 if(buf[i * real_count + j] > result[j])
129 result[j] = buf[i * real_count + j];
130 }
131 }
132}
133
134void maxDouble(double result[], double buf[], int real_count, int nprocs){
135 //init result array
136 $atom{
137 for(int i=0; i<real_count; i++){
138 result[i] = buf[i * real_count];
139 }
140
141 for(int i=0; i<nprocs; i++)
142 for(int j=0; j<real_count; j++){
143 if(buf[i * real_count + j] > result[j])
144 result[j] = buf[i * real_count + j];
145 }
146 }
147}
148
149void minInt(int result[], int buf[], int real_count, int nprocs){
150 //init result array
151 $atom{
152 for(int i=0; i<real_count; i++){
153 result[i] = buf[i * real_count];
154 }
155
156 for(int i=0; i<nprocs; i++)
157 for(int j=0; j<real_count; j++){
158 if(buf[i * real_count + j] < result[j])
159 result[j] = buf[i * real_count + j];
160 }
161 }
162}
163
164void minFloat(float result[], float buf[], int real_count, int nprocs){
165 //init result array
166 $atom{
167 for(int i=0; i<real_count; i++){
168 result[i] = buf[i * real_count];
169 }
170
171 for(int i=0; i<nprocs; i++)
172 for(int j=0; j<real_count; j++){
173 if(buf[i * real_count + j] < result[j])
174 result[j] = buf[i * real_count + j];
175 }
176 }
177}
178
179void minDouble(double result[], double buf[], int real_count, int nprocs){
180 //init result array
181 $atom{
182 for(int i=0; i<real_count; i++){
183 result[i] = buf[i * real_count];
184 }
185
186 for(int i=0; i<nprocs; i++)
187 for(int j=0; j<real_count; j++){
188 if(buf[i * real_count + j] < result[j])
189 result[j] = buf[i * real_count + j];
190 }
191 }
192}
193
194/************************** MPI LIB Implementations *******************************/
195
196$abstract double CMPI_time(int count);
197
198double MPI_Wtime() {
199 double result = CMPI_time(CMPI_time_count);
200
201 CMPI_time_count++;
202 return result;
203}
204
205CMPI_Gcomm CMPI_Gcomm_create($scope scope, int size) {
206 CMPI_Gcomm result;
207
208 result.p2p = $gcomm_create(scope, size);
209 result.col = $gcomm_create(scope, size);
210 result.gbarrier = $gbarrier_create(scope, size);
211 return result;
212}
213
214void CMPI_Gcomm_destroy(CMPI_Gcomm gc) {
215 $gcomm_destroy(gc.p2p);
216 $gcomm_destroy(gc.col);
217 $gbarrier_destroy(gc.gbarrier);
218}
219
220MPI_Comm CMPI_Comm_create($scope scope, CMPI_Gcomm gc, int rank) {
221 MPI_Comm result;
222
223 result.p2p = $comm_create(scope, gc.p2p, rank);
224 result.col = $comm_create(scope, gc.col, rank);
225 result.barrier = $barrier_create(scope, gc.gbarrier, rank);
226 result.status = __UNINIT;
227 return result;
228}
229
230void CMPI_Comm_destroy(MPI_Comm comm) {
231 $comm_destroy(comm.p2p);
232 $comm_destroy(comm.col);
233 $barrier_destroy(comm.barrier);
234}
235
236int __MPI_Init(MPI_Comm *comm) {
237 comm->status = __INIT;
238 return 0;
239}
240
241int __MPI_Finalize(MPI_Comm *comm) {
242 comm->status = __FINALIZED;
243 return 0;
244}
245
246int MPI_Comm_size(MPI_Comm comm, int *size) {
247 $assert(comm.status == __INIT, "MPI_Comm_size() cannot be invoked without MPI_Init() being called before.\n");
248 *size = $comm_size(comm.p2p);
249 return 0;
250}
251
252int MPI_Comm_rank(MPI_Comm comm, int *rank) {
253 $assert(comm.status == __INIT, "MPI_Comm_rank() cannot be invoked without MPI_Init() being called before.\n");
254 *rank = $comm_place(comm.p2p);
255 return 0;
256}
257
258
259int CMPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
260 int tag, $comm comm) {
261 if (dest >= 0) {
262 int size = count*sizeofDatatype(datatype);
263 int place = $comm_place(comm);
264 $message out = $message_pack(place, dest, tag, buf, size);
265
266 $comm_enqueue(comm, out);
267 }
268 return 0;
269}
270
271int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
272 int tag, MPI_Comm comm) {
273 $assert(comm.status == __INIT, "MPI_Send() cannot be invoked without MPI_Init() being called before.\n");
274 return CMPI_Send(buf, count, datatype, dest, tag, comm.p2p);
275}
276
277
278int CMPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
279 int tag, $comm comm, MPI_Status *status) {
280
281 if (source >= 0 || source == -1) {
282 $message in = $comm_dequeue(comm, source, tag);
283 int size = count*sizeofDatatype(datatype);
284
285 $message_unpack(in, buf, size);
286 if (status != MPI_STATUS_IGNORE) {
287 status->size = $message_size(in);
288 status->MPI_SOURCE = $message_source(in);
289 status->MPI_TAG = $message_tag(in);
290 status->MPI_ERROR = 0;
291 }
292 }
293 return 0;
294}
295
296int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
297 int tag, MPI_Comm comm, MPI_Status *status) {
298 $assert(comm.status == __INIT, "MPI_Recv() cannot be invoked without MPI_Init() being called before.\n");
299 return CMPI_Recv(buf, count, datatype, source, tag, comm.p2p, status);
300}
301
302int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
303 //$assert(__my_status == __INIT, "MPI status is not INIT.\n");
304 *count = status->size/sizeofDatatype(datatype);
305 return 0;
306}
307
308int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
309 int dest, int sendtag,
310 void *recvbuf, int recvcount, MPI_Datatype recvtype,
311 int source, int recvtag,
312 MPI_Comm comm, MPI_Status *status) {
313 $assert(comm.status == __INIT, "MPI_Sendrecv() cannot be invoked without MPI_Init() being called before.\n");
314 MPI_Send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
315 MPI_Recv(recvbuf, recvcount, recvtype, source, recvtag, comm, status);
316 return 0;
317}
318
319/* Broadcasts a message from root to everyone else.
320 * Need to use a differnt comm.
321 */
322int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
323 MPI_Comm comm) {
324 int place = $comm_place(comm.col);
325
326 $assert(comm.status == __INIT, "MPI_Bcast() cannot be invoked without MPI_Init() being called before.\n");
327 place = $comm_place(comm.col);
328 if (place == root) {
329 int nprocs = $comm_size(comm.col);
330
331 for (int i=0; i<nprocs; i++)
332 if (i != root)
333 CMPI_Send(buf, count, datatype, i, BCAST_TAG, comm.col);
334 } else {
335 CMPI_Recv(buf, count, datatype, root, BCAST_TAG, comm.col,
336 MPI_STATUS_IGNORE);
337 }
338 return 0;
339}
340
341/* Reduces values on all processes to a single value */
342int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
343 MPI_Op op, int root, MPI_Comm comm){
344 int place;
345 MPI_Status status;
346
347 $assert(comm.status == __INIT, "MPI_Reduce() cannot be invoked without MPI_Init() being called before.\n");
348 place = $comm_place(comm.col);
349 //non-root
350 CMPI_Send(sendbuf, count, datatype, root, REDUCE_TAG, comm.col);
351 if(place != root)
352 return 0;
353 else{
354 //root
355 int nprocs = $comm_size(comm.col);
356 int real_count = -1; //the real count gotton from MPI_Status
357 //void ** buf; //Buffer stores data from other processes
358 void * recv; //Buffer used in MPI_Recv()
359 void * results; //Array of results
360 $scope here = $root;
361 struct Bundle_buf{
362 int I[nprocs * count];
363 float F[nprocs * count];
364 double D[nprocs * count];
365 } buf;
366
367 switch(datatype){
368 case MPI_INT :
369 recv = (int *)$malloc(here, count * sizeof(int));
370 results = (int *)$malloc(here, count * sizeof(int));
371 break;
372 case MPI_FLOAT :
373 recv = (float *)$malloc(here, count * sizeof(float));
374 results = (float *)$malloc(here, count * sizeof(float));
375 break;
376 case MPI_DOUBLE :
377 recv = (double *)$malloc(here, count * sizeof(double));
378 results = (double *)$malloc(here, count * sizeof(double));
379 break;
380 }
381 for(int i=0; i<nprocs; i++){
382 CMPI_Recv(recv, count, datatype, i, REDUCE_TAG, comm.col,
383 &status);
384 MPI_Get_count(&status, datatype, &real_count);
385 $assert(real_count == count);
386
387 switch(datatype){
388 case MPI_INT :
389 for(int j=0; j<real_count; j++)
390 buf.I[i * real_count + j] = ((int *)recv)[j];
391 break;
392 case MPI_FLOAT :
393 for(int j=0; j<real_count; j++)
394 buf.F[i * real_count + j] = ((float *)recv)[j];
395 break;
396 case MPI_DOUBLE :
397 for(int j=0; j<real_count; j++)
398 buf.D[i * real_count + j] = ((double *)recv)[j];
399 break;
400 }
401 }
402
403 //operations
404 switch(datatype){
405 case MPI_INT:
406 if(op == MPI_SUM)
407 sumInt(results, buf.I, real_count, nprocs);
408 if(op == MPI_MAX)
409 maxInt(results, buf.I, real_count, nprocs);
410 if(op == MPI_MIN)
411 minInt(results, buf.I, real_count, nprocs);
412 break;
413 case MPI_FLOAT:
414 if(op == MPI_SUM)
415 sumFloat(results, buf.F, real_count, nprocs);
416 if(op == MPI_MAX)
417 maxFloat(results, buf.F, real_count, nprocs);
418 if(op == MPI_MIN)
419 minFloat(results, buf.F, real_count, nprocs);
420 break;
421 case MPI_DOUBLE:
422 if(op == MPI_SUM)
423 sumDouble(results, buf.D, real_count, nprocs);
424 if(op == MPI_MAX)
425 maxDouble(results, buf.D, real_count, nprocs);
426 if(op == MPI_MIN)
427 minDouble(results, buf.D, real_count, nprocs);
428 break;
429 }
430 MPI_Sendrecv(results, real_count, datatype,
431 place, 0,
432 recvbuf, real_count, datatype,
433 place, 0,
434 comm, &status);
435 $free(recv);
436 $free(results);
437 }
438 return 0;
439}
440
441/* Combines values from all processes and distributes the result back to all processes */
442/* default root is 0 */
443int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
444 MPI_Op op, MPI_Comm comm){
445 int place;
446 int root;
447 MPI_Status status;
448
449 $assert(comm.status == __INIT, "MPI_Allreduce() cannot be invoked without MPI_Init() being called before.\n");
450 place = $comm_place(comm.col);
451 root = 0;
452 MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm);
453 if(place == root)
454 MPI_Sendrecv(recvbuf, count, datatype, place, 0,
455 recvbuf, count, datatype, place, 0,
456 comm, &status);
457 MPI_Bcast(recvbuf, count, datatype, root, comm);
458 return 0;
459}
460
461int MPI_Barrier(MPI_Comm comm){
462
463 $assert(comm.status == __INIT, "MPI_Allreduce() cannot be invoked without MPI_Init() being called before.\n");
464 $barrier_call(comm.barrier);
465}
466#endif
Note: See TracBrowser for help on using the repository browser.