source: CIVL/include/impls/civl-mpi-blocking.cvl@ 1aaefd4

main test-branch
Last change on this file since 1aaefd4 was ea777aa, checked in by Alex Wilton <awilton@…>, 3 years ago

Moved examples, include, build_default.properties, common.xml, and README out from dev.civl.com into the root of the repo.

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@5704 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 30.0 KB
RevLine 
[bf584ca]1#ifndef __CIVL_CIVLMPI__
[d109d6b]2#define __CIVL_CIVLMPI__
3
[9803bc1]4#include <civlc.cvh>
[d109d6b]5#include <concurrency.cvh>
[ef8e46c]6#include <collate.cvh>
[d109d6b]7#include <comm.cvh>
8#include <bundle.cvh>
9#include <mpi.h>
[3af26ac]10#include <civl-mpi.cvh>
[46766eb]11#include <string.h>
[792d583]12#include <pointer.cvh>
[d4d65d3]13#include <seq.cvh>
[cfa65dc]14#include <stdlib.h>
[b114f32]15#include <stdio.h>
[d109d6b]16
[566a657]17struct CIVL_MPI_Comm {
18 $comm p2p; // point-to-point communication
19 $comm col; // collective communication
20 $collator collator;
21 $barrier barrier;
22 int gcommIndex; //the index of the corresponding global communicator.
23};
24
[1e47fae]25/* Library private helper function declaration */
[a1f42a0]26char * $mpi_coroutine_name(int tag);
[4558569]27
[1b69190]28/**************************** Duplicated Part *************************************/
[d109d6b]29/* Duplicated definition with the same struct in mpi.h.
30 The reason of this duplication is to make civlmpi.cvl
31 independent with mpi.cvl. */
[1b50dad1]32
[d109d6b]33
[f3f0f1a]34/* Definition of CMPI_Gcomm (CMPI_Gcomm has a type of __CMPI_Gcomm)
35 and MPI_Comm */
[ff51d87]36struct $mpi_gcomm {
[d109d6b]37 $gcomm p2p; // point-to-point communication
38 $gcomm col; // collective communication
[4e534fd]39 $gcollator gcollator;
[d109d6b]40 $gbarrier gbarrier;
41};
42
43/****************************** Helper Functions **********************************/
[1b7d18d]44$state_f size_t $mpi_extentof(MPI_Datatype datatype) {
45 $abstract size_t $mpi_sizeof(MPI_Datatype datatype);
46
47 return $mpi_sizeof(datatype);
48}
49
[d109d6b]50int sizeofDatatype(MPI_Datatype datatype) {
[a09800d]51 size_t result;
[1b7d18d]52
53#ifdef _MPI_CONTRACT
54 /* In MPI contract mode, it's possible that datatype is
55 non-concrete. Then use an abstract function to represent the
56 extent of the datatype. */
57 if (!$is_concrete_int(datatype))
58 return $mpi_extentof(datatype);
59#endif
[a09800d]60
[d109d6b]61 switch (datatype) {
62 case MPI_INT:
[a09800d]63 result = sizeof(int); break;
[581546a]64 case MPI_2INT:
[a09800d]65 result = (sizeof(int)*2); break;
[d109d6b]66 case MPI_FLOAT:
[a09800d]67 result = sizeof(float); break;
[d109d6b]68 case MPI_DOUBLE:
[a09800d]69 result = sizeof(double); break;
[d109d6b]70 case MPI_CHAR:
[a09800d]71 result = sizeof(char); break;
[b95b3b9]72 case MPI_BYTE:
[a09800d]73 result = sizeof(char); break; // char is always one byte ?
[b95b3b9]74 case MPI_SHORT:
[a09800d]75 result = sizeof(short); break;
[b95b3b9]76 case MPI_LONG:
[a09800d]77 result = sizeof(long); break;
[b95b3b9]78 case MPI_LONG_DOUBLE:
[a09800d]79 result = sizeof(long double); break;
[b95b3b9]80 case MPI_LONG_LONG_INT:
[a09800d]81 result = sizeof(long long int); break;
[b95b3b9]82 case MPI_LONG_LONG:
[a09800d]83 result = sizeof(long long); break;
[b95b3b9]84 case MPI_UNSIGNED_LONG_LONG:
[a09800d]85 result = sizeof(unsigned long long); break;
[d109d6b]86 default:
[3ff27cf]87 $assert(0, "Unreachable");
[d109d6b]88 }
[1b7d18d]89#ifdef _MPI_CONTRACT
90 /* In the case that a datatype X which is previously non-concrete
91 and whose extent $mpi_sizeof(X) was used. Later X is simplified
92 to a concrete number representing 'int', we need somehow
93 associate SIZEOF_INT to $mpi_sizeof(X) by assume the following
94 predicate. */
95 $assume($mpi_extentof(datatype) == result);
96#endif
[a09800d]97 return result;
[d109d6b]98}
99
[1b69190]100void * $mpi_malloc(int count, MPI_Datatype datatype) {
[eac9892]101 switch (datatype) {
102 case MPI_INT:
103 case MPI_SHORT:
104 case MPI_LONG:
105 case MPI_LONG_LONG_INT:
106 case MPI_LONG_LONG:
107 case MPI_UNSIGNED_LONG_LONG:
108 return (int *)malloc(sizeof(int) * count);
109 case MPI_2INT:
110 return (int *)malloc(2 * count * sizeof(int));
111 case MPI_FLOAT:
112 case MPI_DOUBLE:
113 case MPI_LONG_DOUBLE:
114 return ($real *)malloc(count * sizeof($real));
115 case MPI_CHAR:
116 case MPI_BYTE:
117 return (char *)malloc(count * sizeof(char));
118 default:
119 $assert(0, "Unreachable");
[1b69190]120 }
[eac9892]121 return (void *)0;
122}
123
[d109d6b]124/************************** MPI LIB Implementations *******************************/
[ff51d87]125$mpi_gcomm $mpi_gcomm_create($scope scope, int size) {
126 $mpi_gcomm result;
[d109d6b]127
128 result.p2p = $gcomm_create(scope, size);
129 result.col = $gcomm_create(scope, size);
[ef8e46c]130 result.gcollator = $gcollator_create(scope, size);
[d109d6b]131 result.gbarrier = $gbarrier_create(scope, size);
132 return result;
133}
134
[ff51d87]135void $mpi_gcomm_destroy($mpi_gcomm gc) {
[07f7630]136 /* This function will report errors for any messages remaining the
137 $mpi_gcomm. Those messages are junk messages. */
[1e47fae]138 int numJunkRecord;
[07f7630]139 int numJunkMsg;
140 $message junkMsgs[]; // A CIVL-C sequence for junk messages.
[1e47fae]141
[07f7630]142 $seq_init(&junkMsgs, 0, NULL);
143 numJunkMsg = $gcomm_destroy(gc.p2p, &junkMsgs);
144 /* Informations of reporting junk messages in p2p communicator and
145 collective communicator are different: */
146 for(int i = 0; i < numJunkMsg; i++) {
147 int src, dest, tag;
148
149 src = $message_source(junkMsgs[i]);
150 dest = $message_dest(junkMsgs[i]);
151 tag = $message_tag(junkMsgs[i]);
152 $assert($false, "MPI message leak: There is a message from rank %d to rank %d with tag %d "
[5bc08d6]153 "has been sent but is never received in point-to-point communication.",
[07f7630]154 src, dest, tag);
155 }
156 numJunkMsg = $gcomm_destroy(gc.col, &junkMsgs);
157 for(int i = 0; i < numJunkMsg; i++) {
158 int src, tag;
159 char * routine;
160
161 src = $message_source(junkMsgs[i]);
162 tag = $message_tag(junkMsgs[i]);
[a1f42a0]163 routine = $mpi_coroutine_name(tag);
[07f7630]164 $assert($false, "MPI message leak: There is a message sent by rank %d for collective routine %s"
[5bc08d6]165 " that is never received.",
[07f7630]166 src, routine);
167 }
[1b7d18d]168 $gcollator_destroy(gc.gcollator);
[d109d6b]169 $gbarrier_destroy(gc.gbarrier);
170}
171
[ff51d87]172MPI_Comm $mpi_comm_create($scope scope, $mpi_gcomm gc, int rank) {
[1b50dad1]173 MPI_Comm result = (MPI_Comm)$malloc(scope, sizeof(*result));;
[d109d6b]174
[1b50dad1]175 result->p2p = $comm_create(scope, gc.p2p, rank);
176 result->col = $comm_create(scope, gc.col, rank);
177 result->collator = $collator_create(gc.gcollator, scope, rank);
178 result->barrier = $barrier_create(scope, gc.gbarrier, rank);
179 result->gcommIndex = 0;
[d109d6b]180 return result;
181}
182
[17ad6bf]183void $mpi_comm_destroy(MPI_Comm comm, $mpi_state mpi_state) {
184#ifndef _MPI_CONTRACT
[1b50dad1]185 if(comm->gcommIndex == 0)
[17ad6bf]186 $assert(mpi_state == _MPI_FINALIZED, "Process terminates without "
[e8f4e6a]187 "calling MPI_Finalize() first.");
[17ad6bf]188#endif
[1b50dad1]189 $comm_destroy(comm->p2p);
190 $comm_destroy(comm->col);
191 $free(comm->collator);
192 $barrier_destroy(comm->barrier);
193 $free(comm);
[d109d6b]194}
195
[e6d3df3]196void * $mpi_pointer_add(const void * ptr, int offset, MPI_Datatype datatype) {
[537f004]197#ifdef _MPI_CONTRACT
[1b7d18d]198 if (!$is_concrete_int(datatype)) {
199 int datatypeExtent = $mpi_extentof(datatype);
200
201 return $pointer_add(ptr, offset * datatypeExtent, sizeof(char));
202 }
203#endif
[430bac2]204
205 int type_size = sizeofDatatype(datatype);
206
[792d583]207 return $pointer_add(ptr, offset, type_size);
208}
209
[be4d6aa]210/********************* Lower level MPI routines *********************/
[178d986]211/* CMPI_Send and CMPI_Recv are a pair of send receives functions that
212 help implementing MPI routines. They should never be block which
213 means no potential deadlocks related to these functions */
[8715107]214int $mpi_send(const void *buf, int count, MPI_Datatype datatype, int dest,
[d7d8b9b]215 int tag, MPI_Comm comm) {
[1b7d18d]216 if (dest >= 0) {
[d109d6b]217 int size = count*sizeofDatatype(datatype);
[1b50dad1]218 int place = $comm_place(comm->p2p);
[b114f32]219 $message out;
[5cd4c07]220
[cc2ac3e]221 $elaborate(dest);
[bca683b]222 out = $message_pack(place, dest, tag, buf, size);
[1b50dad1]223 $comm_enqueue(comm->p2p, out);
[d109d6b]224 }
225 return 0;
226}
227
[ff51d87]228int $mpi_recv(void *buf, int count, MPI_Datatype datatype, int source,
[d7d8b9b]229 int tag, MPI_Comm comm, MPI_Status *status) {
[1b7d18d]230 if ((source >= 0 || source == MPI_ANY_SOURCE)) {
[5cd4c07]231 $message in;
[1b50dad1]232 int place = $comm_place(comm->p2p);
[42171ca]233 int deterministicTag;
[5a071cc]234
[42171ca]235 $assert(tag == -2 || tag >= 0, "Illegal MPI message receive tag %d.\n", tag);
236 deterministicTag = tag < 0 ? -2 : tag;
[cdfae0a]237 $elaborate(source);
[1b50dad1]238 in = $comm_dequeue(comm->p2p, source, deterministicTag);
[cc2ac3e]239
[d109d6b]240 int size = count*sizeofDatatype(datatype);
241
242 $message_unpack(in, buf, size);
243 if (status != MPI_STATUS_IGNORE) {
244 status->size = $message_size(in);
245 status->MPI_SOURCE = $message_source(in);
246 status->MPI_TAG = $message_tag(in);
247 status->MPI_ERROR = 0;
248 }
249 }
250 return 0;
251}
[46766eb]252
[5a071cc]253
[ff51d87]254int $mpi_sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
[62ea9fe]255 int dest, int sendtag, void *recvbuf, int recvcount,
256 MPI_Datatype recvtype, int source, int recvtag,
[d7d8b9b]257 MPI_Comm comm, MPI_Status *status) {
[42171ca]258 int deterministicRecvTag;
259
260 $assert(sendtag >= 0, "MPI sendtag should be greater than or equal to zero");
261 $assert(recvtag == -2 || recvtag >= 0, "Illegal MPI message receive tag %d.\n", recvtag);
[bca683b]262
[42171ca]263 deterministicRecvTag = recvtag < 0 ? -2 : recvtag;
[62ea9fe]264 if((dest >= 0) && ((source >= 0 || source == MPI_ANY_SOURCE))) {
265 $message out, in;
266 int size = sendcount*sizeofDatatype(sendtype);
[1b50dad1]267 int place = $comm_place(comm->p2p);
[62ea9fe]268
269 out = $message_pack(place, dest, sendtag, sendbuf, size);
[cdfae0a]270 $elaborate(source);
[eac9892]271 $elaborate(dest);
[62ea9fe]272 $choose {
[5cd4c07]273 $when($true){
[1b50dad1]274 $comm_enqueue(comm->p2p, out);
275 in = $comm_dequeue(comm->p2p, source, deterministicRecvTag);
[62ea9fe]276 }
[5cd4c07]277 $when($false){
278 /* This $choose branch plays a trick which correctly
279 implements the sendrecv() semantically. Such a branch
280 ensures that there is no chance of potential deadlocks when
281 all processes do send then recv collectively. However,
282 effectively, this branch is no need and never will be
283 executed.*/
[1b50dad1]284 in = $comm_dequeue(comm->p2p, source, deterministicRecvTag);
285 $comm_enqueue(comm->p2p, out);
[62ea9fe]286 }
287 }
288 size = recvcount*sizeofDatatype(recvtype);
289 $message_unpack(in, recvbuf, size);
290 if (status != MPI_STATUS_IGNORE) {
291 status->size = $message_size(in);
292 status->MPI_SOURCE = $message_source(in);
293 status->MPI_TAG = $message_tag(in);
294 status->MPI_ERROR = 0;
295 }
296 }
[eac9892]297 else if (dest >= 0)
[d7d8b9b]298 $mpi_send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
[eac9892]299 else if (source >= 0 || source == MPI_ANY_SOURCE)
[42171ca]300 $mpi_recv(recvbuf, recvcount, recvtype, source, deterministicRecvTag, comm, status);
[62ea9fe]301 return 0;
302}
303
[178d986]304/********************* Collective helper functions ********************/
305/* Note: collective helpers functions are functions have same
306 behaviors as MPI collective functions, it can be re-used as a part
307 of implementation by different MPI routines. For example,
308 MPI_Allreduce will call CMPI_Reduce and CMPI_Bcast, both of them
309 should throw errors (if encounters any) as if errors are thrown
310 from MPI_Allreduce.
311*/
[8715107]312int $mpi_collective_send(const void *buf, int count, MPI_Datatype datatype, int dest,
[d7d8b9b]313 int tag, MPI_Comm comm) {
314 if (dest >= 0) {
315 int size = count*sizeofDatatype(datatype);
[1b50dad1]316 int place = $comm_place(comm->col);
[d7d8b9b]317 $message out = $message_pack(place, dest, tag, buf, size);
318
[e045201d]319 $elaborate(dest);
[1b50dad1]320 $comm_enqueue(comm->col, out);
[d7d8b9b]321 }
322 return 0;
323}
324
[ff51d87]325int $mpi_collective_recv(void *buf, int count, MPI_Datatype datatype,
[d7d8b9b]326 int source, int tag, MPI_Comm comm,
327 MPI_Status * status, char * routName) {
[46766eb]328 if(source >= 0 || source == MPI_ANY_SOURCE) {
[cdfae0a]329 $elaborate(source);
[1b50dad1]330 $message in = $comm_dequeue(comm->col, source, MPI_ANY_TAG);
[46766eb]331 int size = count*sizeofDatatype(datatype);
332 int recvTag;
333
[42171ca]334 /* This routine should only be used by collective routines, there
335 is no non-deterministic tags for collective routines.*/
[46766eb]336 recvTag = $message_tag(in);
[42171ca]337 $assert (recvTag == tag, "Collective routine %s receives a "
[178d986]338 "message with a mismatched tag\n", routName);
[46766eb]339 $message_unpack(in, buf, size);
340 if (status != MPI_STATUS_IGNORE) {
341 status->size = $message_size(in);
342 status->MPI_SOURCE = $message_source(in);
343 status->MPI_TAG = recvTag;
344 status->MPI_ERROR = 0;
345 }
346 }
347 return 0;
348}
349
350/* Broadcast helper function that uses any specified message tag */
[ff51d87]351int $mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, int tag,
[d7d8b9b]352 MPI_Comm comm, char * routName) {
[1b50dad1]353 if ($comm_place(comm->col) == root) {
354 int nprocs = $comm_size(comm->col);
[46766eb]355
356 for (int i=0; i<nprocs; i++)
357 if (i != root)
[d7d8b9b]358 $mpi_collective_send(buf, count, datatype, i, tag, comm);
[d4d97a6]359 } else
[d7d8b9b]360 $mpi_collective_recv(buf, count, datatype, root, tag, comm,
361 MPI_STATUS_IGNORE, routName);
[46766eb]362 return 0;
363}
364
365/* Reduction helper function that uses any specified message tag */
[ff51d87]366int $mpi_reduce(const void* sendbuf, void* recvbuf, int count,
[46766eb]367 MPI_Datatype datatype, MPI_Op op, int root, int tag,
[d7d8b9b]368 MPI_Comm comm, char * routName) {
[46766eb]369 int rank;
370
[1b50dad1]371 rank = $comm_place(comm->col);
[46766eb]372 if (rank != root)
[d7d8b9b]373 $mpi_collective_send(sendbuf, count, datatype, root, tag, comm);
[46766eb]374 else {
[1b50dad1]375 int nprocs = $comm_size(comm->col);
[46766eb]376 int size;
377
378 size = count * sizeofDatatype(datatype);
[0a8fa32]379 memcpy(recvbuf, sendbuf, size);
[46766eb]380 for (int i = 0; i<nprocs; i++) {
[0a8fa32]381 if(i != root){
[46766eb]382 int colTag;
[58207ec]383 void * applybuf;
[1b50dad1]384 $message in = $comm_dequeue(comm->col, i, MPI_ANY_TAG);
[cdfae0a]385
[42171ca]386 /* Collective routines have no non-deterministic tags.*/
[46766eb]387 colTag = $message_tag(in);
[178d986]388 $assert (colTag == tag , "Collective routine %s receives a "
389 "message with a mismatched tag\n", routName);
[46766eb]390 /* the third argument "count" indicates the number of cells needs doing the
391 operation. */
[58207ec]392 applybuf = $mpi_malloc(count, datatype);
393 $bundle_unpack_apply(in.data, recvbuf, op, count, applybuf);
394 memcpy(recvbuf, applybuf, size);
395 free(applybuf);
[46766eb]396 $assert (in.size <= size ,
397 "Message of size %d exceeds the specified size %d.", in.size, size);
398 }
399 }
400 }
401 return 0;
402}
403
404/* Gathering helper function that uses any specified message tag */
[ff51d87]405int $mpi_gather(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
[46766eb]406 void* recvbuf, int recvcount, MPI_Datatype recvtype,
[d7d8b9b]407 int root, int tag, MPI_Comm comm, char * routName){
[46766eb]408 int rank, nprocs;
409 MPI_Status status;
410
[1b50dad1]411 rank = $comm_place(comm->col);
412 nprocs = $comm_size(comm->col);
[d3a475f]413 /* MPI standard requirement:
414 * For root process, sendtype must be equal to
415 * recvtype. */
416 if(rank == root)
417 $assert (sendtype == recvtype,
418 "%s asks for equality "
419 "between 'sendtype' and 'recvtype'.", routName);
420 /* MPI_standard requirement:
421 * Only root process can use MPI_IN_PLACE*/
422 if(sendbuf == MPI_IN_PLACE){
[46766eb]423 $assert (root == rank,
424 "Only root can replace 'sendbuf' with 'MPI_IN_PLACE'.");
[d3a475f]425 } else if(root == rank) {
426 void * ptr;
427
428 $assert(sendcount == recvcount, "Root process of routine %d without using"
429 " MPI_IN_PLACE should give the same value for recvcount and sendcount",
430 routName);
[e6d3df3]431 ptr = $mpi_pointer_add(recvbuf, root * recvcount, recvtype);
[d3a475f]432 memcpy(ptr, sendbuf, recvcount * sizeofDatatype(recvtype));
433 } else
[d7d8b9b]434 $mpi_collective_send(sendbuf, sendcount, sendtype, root, tag, comm);
[d3a475f]435 /* Root process receives messages and put them in right places */
[46766eb]436 if(rank == root){
437 int real_recvcount;
438 int offset;
439
440 for(int i=0; i<nprocs; i++){
[d3a475f]441 if(i != root) {
442 void * ptr;
443
444 offset = i * recvcount;
[e6d3df3]445 ptr = $mpi_pointer_add(recvbuf, offset, recvtype);
[ff51d87]446 $mpi_collective_recv(ptr, recvcount, recvtype,
[d7d8b9b]447 i, tag, comm, &status, routName);
[8e06c1c]448 real_recvcount = status.size/sizeofDatatype(recvtype);
[46766eb]449 $assert(real_recvcount == recvcount,
[d3a475f]450 "%s asks for equality between"
451 " the amount of data sent and the "
[178d986]452 "amount of data received.", routName);
[46766eb]453 }
454 }
455 }
456 return 0;
457}
458
[ff51d87]459int $mpi_gatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
[11eac62]460 void* recvbuf, const int recvcounts[], const int displs[],
461 MPI_Datatype recvtype, int root, int tag,
[d7d8b9b]462 MPI_Comm comm, char * routName){
[11eac62]463 int rank, nprocs;
464
[1b50dad1]465 rank = $comm_place(comm->col);
466 nprocs = $comm_size(comm->col);
[d3a475f]467 /* MPI standard requirement:
468 * For root process, sendtype must be equal to
469 * recvtype. */
470 if(rank == root)
471 $assert(sendtype == recvtype, "%s asks for equality "
472 "between 'sendtype' and 'recvtype'.", routName);
473 /* MPI_standard requirement:
474 * Only root process can use MPI_IN_PLACE*/
[11eac62]475 if(sendbuf == MPI_IN_PLACE){
476 $assert(root == rank, "Only root can replace 'sendbuf' with 'MPI_IN_PLACE'.");
[d3a475f]477 }else if(root == rank) {
478 void * ptr;
479
480 $assert(sendcount == recvcounts[root], "For routine %s, recvcounts[%d] "
481 "should be same as the sendcount of the process with rank %d.\n",
482 routName, root, root);
[e6d3df3]483 ptr = $mpi_pointer_add(recvbuf, displs[rank], recvtype);
[d3a475f]484 memcpy(ptr, sendbuf, sendcount * sizeofDatatype(recvtype));
[11eac62]485 }else{
[d7d8b9b]486 $mpi_collective_send(sendbuf, sendcount, sendtype, root, tag, comm);
[11eac62]487 }
[d3a475f]488 /* Root process receives messages and put them in right places */
[11eac62]489 if(rank == root){
490 int real_recvcount;
491 MPI_Status status;
[d3a475f]492
[11eac62]493 for(int i=0; i<nprocs; i++){
[d3a475f]494 if(i != root){
[e6d3df3]495 void * ptr = $mpi_pointer_add(recvbuf, displs[i], recvtype);
[d3a475f]496
[ff51d87]497 $mpi_collective_recv(ptr, recvcounts[i],
[d7d8b9b]498 recvtype, i, tag, comm, &status, routName);
[8e06c1c]499 real_recvcount = status.size/sizeofDatatype(recvtype);
[178d986]500 $assert(real_recvcount == recvcounts[i], "%s asks for equality between"
[d3a475f]501 " the amount of data sent and the "
[178d986]502 "amount of data received.", routName);
[11eac62]503 }
504 }
505 }
506 return 0;
507}
508
509/* Scatter helper function that uses any specified message tag */
[ff51d87]510int $mpi_scatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
[11eac62]511 void* recvbuf, int recvcount, MPI_Datatype recvtype, int root,
[d7d8b9b]512 int tag, MPI_Comm comm, char * routName){
[11eac62]513 int rank, nprocs;
514
[1b50dad1]515 rank = $comm_place(comm->col);
516 nprocs = $comm_size(comm->col);
[d3a475f]517 /* MPI standard requirement:
518 * For root process, sendtype must be equal to
519 * recvtype. */
520 if(rank == root)
521 $assert(sendtype == recvtype, "MPI_Scatter() asks for equality "
522 "between 'sendtype' and 'recvtype'.");
523 /* MPI_standard requirement:
524 * Only root process can use MPI_IN_PLACE */
525 if(recvbuf == MPI_IN_PLACE){
[11eac62]526 $assert(root == rank, "Only root can replace 'recvbuf' with 'MPI_IN_PLACE'.");
[d3a475f]527 }else if(rank == root) {
528 void * ptr;
529
530 $assert(sendcount == recvcount, "Root process of routine %d without using"
531 " MPI_IN_PLACE should give the same value for recvcount and sendcount",
532 routName);
[e6d3df3]533 ptr = $mpi_pointer_add(sendbuf, root*recvcount, sendtype);
[d3a475f]534 memcpy(recvbuf, ptr, sizeofDatatype(recvtype)*recvcount);
[11eac62]535 }
[d3a475f]536 /* Root process scatters data to other processes */
[11eac62]537 if(rank == root){
538 int offset;
539
540 for(int i=0; i<nprocs; i++){
[d3a475f]541 if(i != root) {
542 void * ptr;
[792d583]543
[d3a475f]544 offset = i * sendcount;
[e6d3df3]545 ptr = $mpi_pointer_add(sendbuf, offset, sendtype);
[d7d8b9b]546 $mpi_collective_send(ptr, sendcount, sendtype, i, tag, comm);
[792d583]547 }
[11eac62]548 }
549 }
[d3a475f]550 /* Non-root processes receive data */
551 if(!(root == rank)){
[11eac62]552 int real_recvcount;
553 MPI_Status status;
554
[ff51d87]555 $mpi_collective_recv(recvbuf, recvcount, recvtype,
[d7d8b9b]556 root, tag, comm, &status, routName);
[8e06c1c]557 real_recvcount = status.size/sizeofDatatype(recvtype);
[11eac62]558 $assert(real_recvcount == recvcount,
[178d986]559 "%s asks for equality between"
[11eac62]560 " the amount of data sent and the "
[178d986]561 "amount of data received.", routName);
[11eac62]562 }
563 return 0;
564}
565
566/* Scatterv helper function that uses any specified message tag */
[ff51d87]567int $mpi_scatterv(const void* sendbuf, const int sendcounts[], const
[11eac62]568 int displs[], MPI_Datatype sendtype, void* recvbuf,
569 int recvcount, MPI_Datatype recvtype, int root, int tag,
[d7d8b9b]570 MPI_Comm comm, char * routName){
[11eac62]571 int rank, nprocs;
572
[1b50dad1]573 rank = $comm_place(comm->col);
574 nprocs = $comm_size(comm->col);
[d3a475f]575 /* MPI standard requirement:
576 * For root process, sendtype must be equal to
577 * recvtype. */
578 if(rank == root)
579 $assert(sendtype == recvtype, "%s asks for equality "
580 "between 'sendtype' and 'recvtype'.", routName);
581 /* MPI_standard requirement:
582 * Only root process can use MPI_IN_PLACE */
[11eac62]583 if(recvbuf == MPI_IN_PLACE){
584 $assert(root == rank, "Only root can replace 'recvbuf' with 'MPI_IN_PLACE'.");
[d3a475f]585 } else if(rank == root) {
586 void * ptr;
587
588 $assert(sendcounts[root] == recvcount, "For routine %s, sendcounts[%d] "
589 "should be same as the recvcount of the process with rank %d.\n",
590 routName, root, root);
[e6d3df3]591 ptr = $mpi_pointer_add(sendbuf, displs[root], sendtype);
[d3a475f]592 memcpy(recvbuf, ptr, recvcount*sizeofDatatype(recvtype));
[11eac62]593 }
[d3a475f]594 /* Root process scatters data to other processes */
[11eac62]595 if(rank == root){
596 for(int i=0; i<nprocs; i++){
[d3a475f]597 if(i != root) {
[e6d3df3]598 void * ptr = $mpi_pointer_add(sendbuf, displs[i], sendtype);
[d3a475f]599
[d7d8b9b]600 $mpi_collective_send(ptr, sendcounts[i], sendtype, i,
601 tag, comm);
[792d583]602 }
[11eac62]603 }
604 }
[d3a475f]605 if(!(root == rank)){
[11eac62]606 MPI_Status status;
607 int real_recvcount;
[d3a475f]608
[ff51d87]609 $mpi_collective_recv(recvbuf, recvcount, recvtype,
[d7d8b9b]610 root, tag, comm, &status, routName);
[8e06c1c]611 real_recvcount = status.size/sizeofDatatype(recvtype);
[178d986]612 $assert(real_recvcount == recvcount, "Process rank:%d\n%s asks for equality between"
[d3a475f]613 " the amount of data sent (%d) and the "
[178d986]614 "amount of data received (%d).", rank, routName, real_recvcount, recvcount);
[11eac62]615 }
616 return 0;
617}
618
[a2c8eb4]619/* The helper function for (inclusive) MPI_Scan */
620int $mpi_scan(const void* sendbuf, void* recvbuf, int count,
621 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
622 int place, nprocs;
623 int size = count * sizeofDatatype(datatype);
624 void * tmp_space = $mpi_malloc(count, datatype);
625 void * tmp_ptr, * tmp_space_const = tmp_space;
626
[1b50dad1]627 place = $comm_place(comm->col);
628 nprocs = $comm_size(comm->col);
[a2c8eb4]629 /* Each process do reduction from 0 .. rank (inclusive):
630 * Send to process rank + 1 ... nprocs-1 (inclusive)
631 * Recv from process 0 .. rank (exclusive)
632 */
633 for (int i = place + 1; i < nprocs; i++)
634 $mpi_collective_send(sendbuf, count, datatype, i, SCAN_TAG, comm);
635 if (sendbuf != MPI_IN_PLACE)
636 memcpy(recvbuf, sendbuf, size);
637 for (int i = 0; i < place; i++) {
[1b50dad1]638 $message in = $comm_dequeue(comm->col, i, SCAN_TAG);
[a2c8eb4]639
640 // so far, unpack_apply requires that 'recvbuf' is not aliasing
641 // 'tmp_space' (can be improved in the future):
642 $bundle_unpack_apply(in.data, recvbuf, op, count, tmp_space);
643 // swap tmp_space with recvbuf
644 tmp_ptr = tmp_space;
645 tmp_space = recvbuf;
646 recvbuf = tmp_ptr;
647 $assert (in.size <= size ,
648 "Message of size %d exceeds the specified size %d.", in.size, size);
649 }
650 if (recvbuf == tmp_space_const)
651 memcpy(tmp_space, recvbuf, size);
652 free(tmp_space_const);
653 return 0;
654}
655
656/* The helper function for (the exclusive) MPI_Exscan */
657int $mpi_exscan(const void* sendbuf, void* recvbuf, int count,
658 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
659 int place, nprocs;
660 int size = count * sizeofDatatype(datatype);
661 void * tmp_space = $mpi_malloc(count, datatype);
662 void * tmp_ptr, * tmp_space_const = tmp_space;
663
[1b50dad1]664 place = $comm_place(comm->col);
665 nprocs = $comm_size(comm->col);
[a2c8eb4]666 /* The “in place” option for intracommunicators is specified by
667 * passing MPI_IN_PLACE in the sendbuf argument. In this case, the
668 * input data is taken from the receive buffer, and replaced by the
669 * output data. The receive buffer on rank 0 is not changed by this
670 * operation. */
671 if (sendbuf == MPI_IN_PLACE)
672 sendbuf = recvbuf;
673 /* Each process do reduction from 0 .. rank (inclusive):
674 * Send to process rank + 1 ... nprocs-1 (inclusive)
675 * Recv from process 0 .. rank (exclusive)
676 */
677 for (int i = place + 1; i < nprocs; i++)
678 $mpi_collective_send(sendbuf, count, datatype, i, EXSCAN_TAG, comm);
679 // no-op for rank 0
680 if (place != 0) {
[1b50dad1]681 $message in = $comm_dequeue(comm->col, 0, EXSCAN_TAG);
[a2c8eb4]682 $bundle_unpack(in.data, recvbuf);
683 for (int i = 1; i < place; i++) {
[1b50dad1]684 in = $comm_dequeue(comm->col, i, EXSCAN_TAG);
[a2c8eb4]685 // so far, unpack_apply requires that 'recvbuf' is not aliasing
686 // 'tmp_space' (can be improved in the future):
687 $bundle_unpack_apply(in.data, recvbuf, op, count, tmp_space);
688 // swap tmp_space with recvbuf
689 tmp_ptr = tmp_space;
690 tmp_space = recvbuf;
691 recvbuf = tmp_ptr;
692 $assert (in.size <= size ,
693 "Message of size %d exceeds the specified size %d.", in.size, size);
694 }
695 }
696 if (recvbuf == tmp_space_const)
697 memcpy(tmp_space, recvbuf, size);
698 free(tmp_space_const);
699 return 0;
700}
701
702
703/* ******************** End of collective routines ********************* */
704
[ff51d87]705int $mpi_comm_dup($scope scope, MPI_Comm comm, MPI_Comm * newcomm, char * routName) {
[1b50dad1]706 int place = $comm_place(comm->col);
[ff51d87]707 $mpi_gcomm newgcomm;
[e8f4e6a]708 int idx;
[566a657]709 $scope CMPI_ROOT_SCOPE = $mpi_root_scope(comm);
[e8f4e6a]710
711 if(place == 0) {
[1b50dad1]712 int size = $comm_size(comm->col);
[e8f4e6a]713
[ff51d87]714 newgcomm = $mpi_gcomm_create(CMPI_ROOT_SCOPE, size);
[e6d3df3]715 idx = $mpi_new_gcomm(CMPI_ROOT_SCOPE, newgcomm);
[e8f4e6a]716 }
[ff51d87]717 $mpi_bcast(&idx, 1, MPI_INT, 0, COMMDUP_TAG,
[d7d8b9b]718 comm, routName);
[e6d3df3]719 newgcomm = $mpi_get_gcomm(CMPI_ROOT_SCOPE, idx);
[ff51d87]720 (*newcomm) = $mpi_comm_create(scope, newgcomm, place);
[1b50dad1]721 (*newcomm)->gcommIndex = idx;
722 $barrier_call(comm->barrier);
723 $gcomm_dup(comm->p2p, (*newcomm)->p2p);
724 $gcomm_dup(comm->col, (*newcomm)->col);
725 $barrier_call(comm->barrier);
[e8f4e6a]726 return 0;
727}
728
[17ad6bf]729int $mpi_comm_free(MPI_Comm *comm, $mpi_state mpi_state) {
[1b50dad1]730 int place = $comm_place((*comm)->col);
731 int size = $comm_size((*comm)->col);
[e8f4e6a]732 int buf[size];
[1b50dad1]733 int gcommIndex = (*comm)->gcommIndex;
[566a657]734 $scope CMPI_ROOT_SCOPE = $mpi_root_scope(*comm);
[e8f4e6a]735
[ff51d87]736 //TODO: $mpi_gather here is just a ugly synchronization
737 $mpi_gather(&place, 1, MPI_INT, buf, 1, MPI_INT, 0,
[d7d8b9b]738 COMMFREE_TAG, (*comm), "MPI_Comm_free synchronization.");
[17ad6bf]739 $mpi_comm_destroy(*comm, mpi_state);
[e8f4e6a]740 if(place == 0) {
[e6d3df3]741 $mpi_gcomm temp = $mpi_get_gcomm(CMPI_ROOT_SCOPE, gcommIndex);
[e8f4e6a]742
[ff51d87]743 $mpi_gcomm_destroy(temp);
[e8f4e6a]744 }
745 return 0;
746}
747
[e6d3df3]748$bundle $mpi_create_coroutine_entry(int routineTag, int root,
[d4d65d3]749 int op, int numDatatypes, int * datatypes) {
750 int zero = 0;
[1e47fae]751 $bundle bundledEntry;
752 struct Entry {
[d4d65d3]753 int routine_tag;
754 int root;
755 int op;
756 int numTypes;
757 int datatypes[];
[1e47fae]758 }entry;
[d4d65d3]759
[1e47fae]760 entry.routine_tag = routineTag;
761 entry.root = root;
762 entry.op = op;
763 entry.numTypes = numDatatypes;
764 $seq_init(&entry.datatypes, numDatatypes, &zero);
[d4d65d3]765 for(int i = 0; i < numDatatypes; i++)
[1e47fae]766 entry.datatypes[i] = datatypes[i];
767 bundledEntry = $bundle_pack(&entry, sizeof(struct Entry));
768 return bundledEntry;
[d4d65d3]769}
770
[e6d3df3]771void $mpi_diff_coroutine_entries($bundle specEntry, $bundle mineEntry, int rank) {
[1e47fae]772 struct Entry {
773 int routine_tag;
774 int root;
775 int op;
776 int numTypes;
777 int datatypes[];
778 }spec, mine;
779 char * routine;
780 int numTypes;
781
782 $bundle_unpack(specEntry, &spec);
783 $bundle_unpack(mineEntry, &mine);
[a1f42a0]784 routine = $mpi_coroutine_name(spec.routine_tag);
[1e47fae]785 if(spec.routine_tag != mine.routine_tag) {
[a1f42a0]786 char * mineRoutine = $mpi_coroutine_name(mine.routine_tag);
[1e47fae]787
788 $assert($false, "Process with rank %d reaches an MPI collective routine "
789 "%s while at least one of others are collectively reaching %s.",
790 rank, mineRoutine, routine);
791 }
792 else if(spec.root != mine.root) {
793 $assert($false, "Process with rank %d reaches an MPI collective routine "
794 "%s which has a different root with at least one of others.", rank, routine);
795 } else if(spec.op != mine.op) {
796 $assert($false, "Process with rank %d reaches an MPI collective routine "
797 "%s which has a different MPI_Op with at least one of others", rank, routine);
798 } else if(spec.numTypes != mine.numTypes) {
799 $assert($false, "Process with rank %d reaches an MPI collective routine "
800 "%s which has an inconsistent datatype specification with at least"
801 " one of others",
802 rank, routine);
803 }
804 numTypes = spec.numTypes;
805 for(int i = 0; i < numTypes; i++)
806 if(spec.datatypes[i] != mine.datatypes[i]) {
807 $assert($false, "Process with rank %d reaches an MPI collective routine "
808 "%s which has an inconsistent datatype specification with at "
809 "least one of others",
810 rank, routine);
811 break;
812 }
813}
814
[566a657]815int $mpi_barrier(MPI_Comm comm) {
816 $barrier_call(comm->barrier);
817 return 0;
818}
819
[f41c876]820#ifdef _MPI_CONTRACT
[1b69190]821
[ca81155]822$collate_state $mpi_snapshot(MPI_Comm comm, $scope scope) {
[1b50dad1]823 return $collate_arrives(comm->collator, scope);
[ca81155]824}
825
[762b93c]826void $mpi_unsnapshot(MPI_Comm comm, $collate_state cs) {
[1b50dad1]827 $collate_departs(comm->collator, cs);
[567d98c]828}
829
[0ad5b73]830void $mpi_assigns(void * buf, int count, MPI_Datatype datatype) {
[ff7d7fa]831 if ($is_concrete_int(datatype)) {
[1b7d18d]832 size_t size = sizeofDatatype(datatype);
[ff7d7fa]833 int _int[count];
834 int _2int[count * 2];
835 char _char[count];
[1b7d18d]836 $real _real[count];
837
[ff7d7fa]838 switch (datatype) {
839 case MPI_INT:
840 case MPI_SHORT:
841 case MPI_LONG:
842 case MPI_LONG_LONG_INT:
843 case MPI_LONG_LONG:
[1b7d18d]844 case MPI_UNSIGNED_LONG_LONG:
845 memcpy(buf, _int, count * size);
[ff7d7fa]846 break;
847 case MPI_2INT:
[1b7d18d]848 memcpy(buf, _2int, 2 * count * size);
[ff7d7fa]849 break;
850 case MPI_FLOAT:
851 case MPI_DOUBLE:
[1b7d18d]852 case MPI_LONG_DOUBLE:
853 memcpy(buf, _real, count * size);
[ff7d7fa]854 break;
855 case MPI_CHAR:
856 case MPI_BYTE:
[1b7d18d]857 memcpy(buf, _char, count * size);
[ff7d7fa]858 break;
859 default:
860 $assert(0, "Unreachable");
861 }
862 } else {
863 size_t realCount = count * $mpi_extentof(datatype);
864 char newValues[realCount];
[1b69190]865
[1b7d18d]866 memcpy(buf, newValues, count * sizeofDatatype(datatype));
[1b69190]867 }
868}
869
870$atomic_f void $mpi_comm_empty(MPI_Comm comm, MPI_COMM_MODE mode) {
871 _Bool empty;
872
873 if (mode == P2P) {
[1b50dad1]874 empty = $comm_empty_in(comm->p2p);
875 empty &= $comm_empty_out(comm->p2p);
[1b69190]876 } else {
[1b50dad1]877 empty = $comm_empty_in(comm->col);
878 empty &= $comm_empty_out(comm->col);
[1b69190]879 }
880 $assert(empty, "Messages are remaining in MPI communicator\n");
881}
882
[f41c876]883#endif
[1b69190]884
[1e47fae]885/********************* Private helper functions *********************/
886/* Returns the string literal of MPI collective routine names by
887 * giving the unique message tag. */
[a1f42a0]888char * $mpi_coroutine_name(int tag) {
[1e47fae]889 switch(tag) {
890 case 9999: return "MPI_Bcast";
891 case 9998: return "MPI_Reduce";
892 case 9997: return "MPI_Allreduce";
893 case 9996: return "MPI_Gather";
894 case 9995: return "MPI_Scatter";
895 case 9994: return "MPI_Gatherv";
896 case 9993: return "MPI_Scatterv";
897 case 9992: return "MPI_Allgather";
898 case 9991: return "MPI_Reduce_scatter";
899 case 9990: return "MPI_Alltoall";
900 case 9989: return "MPI_Alltoallv";
901 case 9988: return "MPI_Alltoallw";
902 case 9987: return "MPI_Barrier";
903 case 9986: return "MPI_Commdup";
904 case 9985: return "MPI_Commfree";
[a2c8eb4]905 case 9984: return "MPI_Scan";
906 case 9983: return "MPI_Exscan";
[1e47fae]907 default: $assert($false, "Internal Error: Unexpected MPI routine tag:%d.\n", tag);
908 }
909}
[566a657]910
911/**************** Bridging MPI and the comm library *****************/
912
913$atomic_f $state_f $comm $mpi_get_p2p_channel(MPI_Comm comm) {
914 return comm->p2p;
915}
916
917$atomic_f $state_f $comm $mpi_get_col_channel(MPI_Comm comm) {
918 return comm->col;
919}
920
921$atomic_f $state_f int $mpi_comm_size(MPI_Comm comm) {
922 return $comm_size(comm->p2p);
923}
924
925$atomic_f $state_f int $mpi_comm_place(MPI_Comm comm) {
926 return $comm_place(comm->p2p);
927}
928
929$atomic_f $state_f $scope $mpi_root_scope(MPI_Comm comm) {
930 return $mpi_root_scope_system(comm->p2p);
931}
932
933$atomic_f $state_f $scope $mpi_proc_scope(MPI_Comm comm) {
934 return $mpi_proc_scope_system(comm->p2p);
935}
936
937/**************** Bridging MPI and the collate library ****************/
[a3da6fb]938$state_f $bundle $mpi_check_collective(MPI_Comm comm,
[566a657]939 $bundle checking) {
940 return $collate_check(comm->collator, checking);
941}
942
[d109d6b]943#endif
944
Note: See TracBrowser for help on using the repository browser.