| 1 | #ifndef __CIVL_CIVLMPI__
|
|---|
| 2 | #define __CIVL_CIVLMPI__
|
|---|
| 3 |
|
|---|
| 4 | #include <civlc.cvh>
|
|---|
| 5 | #include <concurrency.cvh>
|
|---|
| 6 | #include <collate.cvh>
|
|---|
| 7 | #include <comm.cvh>
|
|---|
| 8 | #include <bundle.cvh>
|
|---|
| 9 | #include <mpi.h>
|
|---|
| 10 | #include <civl-mpi.cvh>
|
|---|
| 11 | #include <string.h>
|
|---|
| 12 | #include <pointer.cvh>
|
|---|
| 13 | #include <seq.cvh>
|
|---|
| 14 | #include <stdlib.h>
|
|---|
| 15 | #include <stdio.h>
|
|---|
| 16 |
|
|---|
| 17 | typedef struct _mpi_nb_record_t {
|
|---|
| 18 | void *buf;
|
|---|
| 19 | int count;
|
|---|
| 20 | MPI_Datatype datatype;
|
|---|
| 21 | int src_or_dest;
|
|---|
| 22 | int tag;
|
|---|
| 23 | _Bool completed;
|
|---|
| 24 | } _mpi_nb_record_t;
|
|---|
| 25 |
|
|---|
| 26 | struct CIVL_MPI_Comm {
|
|---|
| 27 | $comm p2p; // point-to-point communication
|
|---|
| 28 | $comm col; // collective communication
|
|---|
| 29 | $collator collator;
|
|---|
| 30 | $barrier barrier;
|
|---|
| 31 | int gcommIndex; //the index of the corresponding global communicator.
|
|---|
| 32 | _mpi_nb_record_t * send_records[];
|
|---|
| 33 | _mpi_nb_record_t * recv_records[];
|
|---|
| 34 | };
|
|---|
| 35 |
|
|---|
| 36 | /* Library private helper function declaration */
|
|---|
| 37 | char * $mpi_coroutine_name(int tag);
|
|---|
| 38 |
|
|---|
| 39 | /**************************** Duplicated Part *************************************/
|
|---|
| 40 | /* Duplicated definition with the same struct in mpi.h.
|
|---|
| 41 | The reason of this duplication is to make civlmpi.cvl
|
|---|
| 42 | independent with mpi.cvl. */
|
|---|
| 43 |
|
|---|
| 44 |
|
|---|
| 45 | /* Definition of CMPI_Gcomm (CMPI_Gcomm has a type of __CMPI_Gcomm)
|
|---|
| 46 | and MPI_Comm */
|
|---|
| 47 | struct $mpi_gcomm {
|
|---|
| 48 | $gcomm p2p; // point-to-point communication
|
|---|
| 49 | $gcomm col; // collective communication
|
|---|
| 50 | $gcollator gcollator;
|
|---|
| 51 | $gbarrier gbarrier;
|
|---|
| 52 | };
|
|---|
| 53 |
|
|---|
| 54 | // the definition of "struct MPI_Request":
|
|---|
| 55 | struct MPI_Request{
|
|---|
| 56 | MPI_Comm comm;
|
|---|
| 57 | MPI_Status status;
|
|---|
| 58 | _Bool isSend; // true if this request is associated to a send; otherwise, it is associated to a receive.
|
|---|
| 59 | _mpi_nb_record_t * record; // points to the record
|
|---|
| 60 | };
|
|---|
| 61 | // Note that the type "MPI_Request" is defined as "pointer-to-struct MPI_Request".
|
|---|
| 62 |
|
|---|
| 63 | /****************************** Helper Functions **********************************/
|
|---|
| 64 | $state_f size_t $mpi_extentof(MPI_Datatype datatype) {
|
|---|
| 65 | $abstract size_t $mpi_sizeof(MPI_Datatype datatype);
|
|---|
| 66 |
|
|---|
| 67 | return $mpi_sizeof(datatype);
|
|---|
| 68 | }
|
|---|
| 69 |
|
|---|
| 70 | int sizeofDatatype(MPI_Datatype datatype) {
|
|---|
| 71 | size_t result;
|
|---|
| 72 |
|
|---|
| 73 | #ifdef _MPI_CONTRACT
|
|---|
| 74 | /* In MPI contract mode, it's possible that datatype is
|
|---|
| 75 | non-concrete. Then use an abstract function to represent the
|
|---|
| 76 | extent of the datatype. */
|
|---|
| 77 | if (!$is_concrete_int(datatype))
|
|---|
| 78 | return $mpi_extentof(datatype);
|
|---|
| 79 | #endif
|
|---|
| 80 |
|
|---|
| 81 | switch (datatype) {
|
|---|
| 82 | case MPI_INT:
|
|---|
| 83 | result = sizeof(int); break;
|
|---|
| 84 | case MPI_2INT:
|
|---|
| 85 | result = (sizeof(int)*2); break;
|
|---|
| 86 | case MPI_FLOAT:
|
|---|
| 87 | result = sizeof(float); break;
|
|---|
| 88 | case MPI_DOUBLE:
|
|---|
| 89 | result = sizeof(double); break;
|
|---|
| 90 | case MPI_CHAR:
|
|---|
| 91 | result = sizeof(char); break;
|
|---|
| 92 | case MPI_BYTE:
|
|---|
| 93 | result = sizeof(char); break; // char is always one byte ?
|
|---|
| 94 | case MPI_SHORT:
|
|---|
| 95 | result = sizeof(short); break;
|
|---|
| 96 | case MPI_LONG:
|
|---|
| 97 | result = sizeof(long); break;
|
|---|
| 98 | case MPI_LONG_DOUBLE:
|
|---|
| 99 | result = sizeof(long double); break;
|
|---|
| 100 | case MPI_LONG_LONG_INT:
|
|---|
| 101 | result = sizeof(long long int); break;
|
|---|
| 102 | case MPI_LONG_LONG:
|
|---|
| 103 | result = sizeof(long long); break;
|
|---|
| 104 | case MPI_UNSIGNED_LONG_LONG:
|
|---|
| 105 | result = sizeof(unsigned long long); break;
|
|---|
| 106 | default:
|
|---|
| 107 | $assert(0, "Unreachable");
|
|---|
| 108 | }
|
|---|
| 109 | #ifdef _MPI_CONTRACT
|
|---|
| 110 | /* In the case that a datatype X which is previously non-concrete
|
|---|
| 111 | and whose extent $mpi_sizeof(X) was used. Later X is simplified
|
|---|
| 112 | to a concrete number representing 'int', we need somehow
|
|---|
| 113 | associate SIZEOF_INT to $mpi_sizeof(X) by assume the following
|
|---|
| 114 | predicate. */
|
|---|
| 115 | $assume($mpi_extentof(datatype) == result);
|
|---|
| 116 | #endif
|
|---|
| 117 | return result;
|
|---|
| 118 | }
|
|---|
| 119 |
|
|---|
| 120 | void * $mpi_malloc(int count, MPI_Datatype datatype) {
|
|---|
| 121 | switch (datatype) {
|
|---|
| 122 | case MPI_INT:
|
|---|
| 123 | case MPI_SHORT:
|
|---|
| 124 | case MPI_LONG:
|
|---|
| 125 | case MPI_LONG_LONG_INT:
|
|---|
| 126 | case MPI_LONG_LONG:
|
|---|
| 127 | case MPI_UNSIGNED_LONG_LONG:
|
|---|
| 128 | return (int *)malloc(sizeof(int) * count);
|
|---|
| 129 | case MPI_2INT:
|
|---|
| 130 | return (int *)malloc(2 * count * sizeof(int));
|
|---|
| 131 | case MPI_FLOAT:
|
|---|
| 132 | case MPI_DOUBLE:
|
|---|
| 133 | case MPI_LONG_DOUBLE:
|
|---|
| 134 | return ($real *)malloc(count * sizeof($real));
|
|---|
| 135 | case MPI_CHAR:
|
|---|
| 136 | case MPI_BYTE:
|
|---|
| 137 | return (char *)malloc(count * sizeof(char));
|
|---|
| 138 | default:
|
|---|
| 139 | $assert(0, "Unreachable");
|
|---|
| 140 | }
|
|---|
| 141 | return (void *)0;
|
|---|
| 142 | }
|
|---|
| 143 |
|
|---|
| 144 | /************************** MPI LIB Implementations *******************************/
|
|---|
| 145 | $mpi_gcomm $mpi_gcomm_create($scope scope, int size) {
|
|---|
| 146 | $mpi_gcomm result;
|
|---|
| 147 |
|
|---|
| 148 | result.p2p = $gcomm_create(scope, size);
|
|---|
| 149 | result.col = $gcomm_create(scope, size);
|
|---|
| 150 | result.gcollator = $gcollator_create(scope, size);
|
|---|
| 151 | result.gbarrier = $gbarrier_create(scope, size);
|
|---|
| 152 | return result;
|
|---|
| 153 | }
|
|---|
| 154 |
|
|---|
| 155 | void $mpi_gcomm_destroy($mpi_gcomm gc) {
|
|---|
| 156 | /* This function will report errors for any messages remaining the
|
|---|
| 157 | $mpi_gcomm. Those messages are junk messages. */
|
|---|
| 158 | int numJunkRecord;
|
|---|
| 159 | int numJunkMsg;
|
|---|
| 160 | $message junkMsgs[]; // A CIVL-C sequence for junk messages.
|
|---|
| 161 |
|
|---|
| 162 | $seq_init(&junkMsgs, 0, NULL);
|
|---|
| 163 | numJunkMsg = $gcomm_destroy(gc.p2p, &junkMsgs);
|
|---|
| 164 | /* Informations of reporting junk messages in p2p communicator and
|
|---|
| 165 | collective communicator are different: */
|
|---|
| 166 | for(int i = 0; i < numJunkMsg; i++) {
|
|---|
| 167 | int src, dest, tag;
|
|---|
| 168 |
|
|---|
| 169 | src = $message_source(junkMsgs[i]);
|
|---|
| 170 | dest = $message_dest(junkMsgs[i]);
|
|---|
| 171 | tag = $message_tag(junkMsgs[i]);
|
|---|
| 172 | $assert($false, "MPI message leak: There is a message from rank %d to rank %d with tag %d "
|
|---|
| 173 | "has been sent but is never received in point-to-point communication.",
|
|---|
| 174 | src, dest, tag);
|
|---|
| 175 | }
|
|---|
| 176 | numJunkMsg = $gcomm_destroy(gc.col, &junkMsgs);
|
|---|
| 177 | for(int i = 0; i < numJunkMsg; i++) {
|
|---|
| 178 | int src, tag;
|
|---|
| 179 | char * routine;
|
|---|
| 180 |
|
|---|
| 181 | src = $message_source(junkMsgs[i]);
|
|---|
| 182 | tag = $message_tag(junkMsgs[i]);
|
|---|
| 183 | routine = $mpi_coroutine_name(tag);
|
|---|
| 184 | $assert($false, "MPI message leak: There is a message sent by rank %d for collective routine %s"
|
|---|
| 185 | " that is never received.",
|
|---|
| 186 | src, routine);
|
|---|
| 187 | }
|
|---|
| 188 | $gcollator_destroy(gc.gcollator);
|
|---|
| 189 | $gbarrier_destroy(gc.gbarrier);
|
|---|
| 190 | }
|
|---|
| 191 |
|
|---|
| 192 | MPI_Comm $mpi_comm_create($scope scope, $mpi_gcomm gc, int rank) {
|
|---|
| 193 | MPI_Comm result = (MPI_Comm)$malloc(scope, sizeof(*result));;
|
|---|
| 194 |
|
|---|
| 195 | result->p2p = $comm_create(scope, gc.p2p, rank);
|
|---|
| 196 | result->col = $comm_create(scope, gc.col, rank);
|
|---|
| 197 | result->collator = $collator_create(gc.gcollator, scope, rank);
|
|---|
| 198 | result->barrier = $barrier_create(scope, gc.gbarrier, rank);
|
|---|
| 199 | result->gcommIndex = 0;
|
|---|
| 200 | $seq_init(&result->send_records, 0, NULL);
|
|---|
| 201 | $seq_init(&result->recv_records, 0, NULL);
|
|---|
| 202 | return result;
|
|---|
| 203 | }
|
|---|
| 204 |
|
|---|
| 205 | void $mpi_comm_destroy(MPI_Comm comm, $mpi_state mpi_state) {
|
|---|
| 206 | #ifndef _MPI_CONTRACT
|
|---|
| 207 | if(comm->gcommIndex == 0)
|
|---|
| 208 | $assert(mpi_state == _MPI_FINALIZED, "Process terminates without "
|
|---|
| 209 | "calling MPI_Finalize() first.");
|
|---|
| 210 | #endif
|
|---|
| 211 | $comm_destroy(comm->p2p);
|
|---|
| 212 | $comm_destroy(comm->col);
|
|---|
| 213 | $free(comm->collator);
|
|---|
| 214 | $barrier_destroy(comm->barrier);
|
|---|
| 215 | $free(comm);
|
|---|
| 216 | }
|
|---|
| 217 |
|
|---|
| 218 | void * $mpi_pointer_add(const void * ptr, int offset, MPI_Datatype datatype) {
|
|---|
| 219 | #ifdef _MPI_CONTRACT
|
|---|
| 220 | if (!$is_concrete_int(datatype)) {
|
|---|
| 221 | int datatypeExtent = $mpi_extentof(datatype);
|
|---|
| 222 |
|
|---|
| 223 | return $pointer_add(ptr, offset * datatypeExtent, sizeof(char));
|
|---|
| 224 | }
|
|---|
| 225 | #endif
|
|---|
| 226 |
|
|---|
| 227 | int type_size = sizeofDatatype(datatype);
|
|---|
| 228 |
|
|---|
| 229 | return $pointer_add(ptr, offset, type_size);
|
|---|
| 230 | }
|
|---|
| 231 |
|
|---|
| 232 | /********************* Lower level MPI routines *********************/
|
|---|
| 233 | /* CMPI_Send and CMPI_Recv are a pair of send receives functions that
|
|---|
| 234 | help implementing MPI routines. They should never be block which
|
|---|
| 235 | means no potential deadlocks related to these functions */
|
|---|
| 236 | int $mpi_send(const void *buf, int count, MPI_Datatype datatype, int dest,
|
|---|
| 237 | int tag, MPI_Comm comm) {
|
|---|
| 238 | if (dest >= 0) {
|
|---|
| 239 | int size = count*sizeofDatatype(datatype);
|
|---|
| 240 | int place = $comm_place(comm->p2p);
|
|---|
| 241 | $message out;
|
|---|
| 242 |
|
|---|
| 243 | $elaborate(dest);
|
|---|
| 244 | out = $message_pack(place, dest, tag, buf, size);
|
|---|
| 245 | $comm_enqueue(comm->p2p, out);
|
|---|
| 246 | }
|
|---|
| 247 | return 0;
|
|---|
| 248 | }
|
|---|
| 249 |
|
|---|
| 250 | /* This function
|
|---|
| 251 |
|
|---|
| 252 | 1) allocates an MPI Request object and assigns the address of the
|
|---|
| 253 | object to the handle "*request"; and
|
|---|
| 254 |
|
|---|
| 255 | 2) creates a record carrying all the information of this send
|
|---|
| 256 | operation and enqueues the record into the send record queue
|
|---|
| 257 | associated to "comm"; and
|
|---|
| 258 |
|
|---|
| 259 | 3) enqueues the message into the message channel as if it is a
|
|---|
| 260 | standard $mpi_send.
|
|---|
| 261 |
|
|---|
| 262 | A pointer to the created record is saved in the allocated MPI
|
|---|
| 263 | Request object.
|
|---|
| 264 |
|
|---|
| 265 | Step 2) may be skipped if only checking absolute deadlock.
|
|---|
| 266 | */
|
|---|
| 267 | int $mpi_isend(const void *buf, int count, MPI_Datatype datatype, int dest,
|
|---|
| 268 | int tag, MPI_Comm comm, MPI_Request * request) {
|
|---|
| 269 | if (dest >= 0) {
|
|---|
| 270 | int place = $comm_place(comm->p2p);
|
|---|
| 271 | // Isend allocates an MPI_Request object (MPI-3.0 sec 3.7.2):
|
|---|
| 272 | MPI_Request req = (MPI_Request) malloc(sizeof(struct MPI_Request));
|
|---|
| 273 |
|
|---|
| 274 | req->comm = comm;
|
|---|
| 275 | req->status.MPI_SOURCE = place;
|
|---|
| 276 | req->status.MPI_TAG = tag;
|
|---|
| 277 | req->status.MPI_ERROR = 0;
|
|---|
| 278 | req->status.size = count*sizeofDatatype(datatype);
|
|---|
| 279 | req->isSend = 1;
|
|---|
| 280 | req->record = (_mpi_nb_record_t *)malloc(sizeof(_mpi_nb_record_t));
|
|---|
| 281 | req->record->buf = (void*)buf;
|
|---|
| 282 | req->record->count = count;
|
|---|
| 283 | req->record->datatype = datatype;
|
|---|
| 284 | req->record->src_or_dest = dest;
|
|---|
| 285 | req->record->tag = tag;
|
|---|
| 286 | req->record->completed = 0;
|
|---|
| 287 | *request = req;
|
|---|
| 288 | $seq_append(&comm->send_records, &req->record, 1);
|
|---|
| 289 | return $mpi_send(buf, count, datatype, dest, tag, comm);
|
|---|
| 290 | } else
|
|---|
| 291 | *request = MPI_REQUEST_NULL;
|
|---|
| 292 | return 1;
|
|---|
| 293 | }
|
|---|
| 294 |
|
|---|
| 295 | int $mpi_recv(void *buf, int count, MPI_Datatype datatype, int source,
|
|---|
| 296 | int tag, MPI_Comm comm, MPI_Status *status) {
|
|---|
| 297 | if ((source >= 0 || source == MPI_ANY_SOURCE)) {
|
|---|
| 298 | $message in;
|
|---|
| 299 | int place = $comm_place(comm->p2p);
|
|---|
| 300 | int deterministicTag;
|
|---|
| 301 |
|
|---|
| 302 | $assert(tag == -2 || tag >= 0, "Illegal MPI message receive tag %d.\n", tag);
|
|---|
| 303 | deterministicTag = tag < 0 ? -2 : tag;
|
|---|
| 304 | $elaborate(source);
|
|---|
| 305 | in = $comm_dequeue(comm->p2p, source, deterministicTag);
|
|---|
| 306 |
|
|---|
| 307 | int size = count*sizeofDatatype(datatype);
|
|---|
| 308 |
|
|---|
| 309 | $message_unpack(in, buf, size);
|
|---|
| 310 | if (status != MPI_STATUS_IGNORE) {
|
|---|
| 311 | status->size = $message_size(in);
|
|---|
| 312 | status->MPI_SOURCE = $message_source(in);
|
|---|
| 313 | status->MPI_TAG = $message_tag(in);
|
|---|
| 314 | status->MPI_ERROR = 0;
|
|---|
| 315 | }
|
|---|
| 316 | }
|
|---|
| 317 | return 0;
|
|---|
| 318 | }
|
|---|
| 319 |
|
|---|
| 320 | /* This function
|
|---|
| 321 |
|
|---|
| 322 | 1) allocates an MPI Request object and assigns the address of the
|
|---|
| 323 | object to the handle "*request"; and
|
|---|
| 324 |
|
|---|
| 325 | 2) creates a record carrying all the information of this receive
|
|---|
| 326 | operation and enqueues the record into the receive record queue
|
|---|
| 327 | associated to "comm".
|
|---|
| 328 |
|
|---|
| 329 | A pointer to the created record is saved in the allocated MPI
|
|---|
| 330 | Request object.
|
|---|
| 331 | */
|
|---|
| 332 | int $mpi_irecv(void *buf, int count, MPI_Datatype datatype, int source,
|
|---|
| 333 | int tag, MPI_Comm comm, MPI_Request * request) {
|
|---|
| 334 | if ((source >= 0 || source == MPI_ANY_SOURCE)) {
|
|---|
| 335 | // Irecv allocates an MPI_Request object (MPI-3.0 sec 3.7.2):
|
|---|
| 336 | MPI_Request req = (MPI_Request) malloc(sizeof(struct MPI_Request));
|
|---|
| 337 |
|
|---|
| 338 | req->comm = comm;
|
|---|
| 339 | req->isSend = $false;
|
|---|
| 340 | req->record = (_mpi_nb_record_t*)malloc(sizeof(_mpi_nb_record_t));
|
|---|
| 341 | req->record->buf = buf;
|
|---|
| 342 | req->record->count = count;
|
|---|
| 343 | req->record->datatype = datatype;
|
|---|
| 344 | req->record->src_or_dest = source;
|
|---|
| 345 | req->record->tag = tag;
|
|---|
| 346 | req->record->completed = $false;
|
|---|
| 347 | $seq_append(&comm->recv_records, &req->record, 1);
|
|---|
| 348 | *request = req;
|
|---|
| 349 | } else
|
|---|
| 350 | *request = MPI_REQUEST_NULL;
|
|---|
| 351 | return 1;
|
|---|
| 352 | }
|
|---|
| 353 |
|
|---|
| 354 |
|
|---|
| 355 | int $mpi_sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
|
|---|
| 356 | int dest, int sendtag, void *recvbuf, int recvcount,
|
|---|
| 357 | MPI_Datatype recvtype, int source, int recvtag,
|
|---|
| 358 | MPI_Comm comm, MPI_Status *status) {
|
|---|
| 359 | int deterministicRecvTag;
|
|---|
| 360 |
|
|---|
| 361 | $assert(sendtag >= 0, "MPI sendtag should be greater than or equal to zero");
|
|---|
| 362 | $assert(recvtag == -2 || recvtag >= 0, "Illegal MPI message receive tag %d.\n", recvtag);
|
|---|
| 363 |
|
|---|
| 364 | deterministicRecvTag = recvtag < 0 ? -2 : recvtag;
|
|---|
| 365 | if((dest >= 0) && ((source >= 0 || source == MPI_ANY_SOURCE))) {
|
|---|
| 366 | $message out, in;
|
|---|
| 367 | int size = sendcount*sizeofDatatype(sendtype);
|
|---|
| 368 | int place = $comm_place(comm->p2p);
|
|---|
| 369 |
|
|---|
| 370 | out = $message_pack(place, dest, sendtag, sendbuf, size);
|
|---|
| 371 | $elaborate(source);
|
|---|
| 372 | $elaborate(dest);
|
|---|
| 373 | $choose {
|
|---|
| 374 | $when($true){
|
|---|
| 375 | $comm_enqueue(comm->p2p, out);
|
|---|
| 376 | in = $comm_dequeue(comm->p2p, source, deterministicRecvTag);
|
|---|
| 377 | }
|
|---|
| 378 | $when($false){
|
|---|
| 379 | /* This $choose branch plays a trick which correctly
|
|---|
| 380 | implements the sendrecv() semantically. Such a branch
|
|---|
| 381 | ensures that there is no chance of potential deadlocks when
|
|---|
| 382 | all processes do send then recv collectively. However,
|
|---|
| 383 | effectively, this branch is no need and never will be
|
|---|
| 384 | executed.*/
|
|---|
| 385 | in = $comm_dequeue(comm->p2p, source, deterministicRecvTag);
|
|---|
| 386 | $comm_enqueue(comm->p2p, out);
|
|---|
| 387 | }
|
|---|
| 388 | }
|
|---|
| 389 | size = recvcount*sizeofDatatype(recvtype);
|
|---|
| 390 | $message_unpack(in, recvbuf, size);
|
|---|
| 391 | if (status != MPI_STATUS_IGNORE) {
|
|---|
| 392 | status->size = $message_size(in);
|
|---|
| 393 | status->MPI_SOURCE = $message_source(in);
|
|---|
| 394 | status->MPI_TAG = $message_tag(in);
|
|---|
| 395 | status->MPI_ERROR = 0;
|
|---|
| 396 | }
|
|---|
| 397 | }
|
|---|
| 398 | else if (dest >= 0)
|
|---|
| 399 | $mpi_send(sendbuf, sendcount, sendtype, dest, sendtag, comm);
|
|---|
| 400 | else if (source >= 0 || source == MPI_ANY_SOURCE)
|
|---|
| 401 | $mpi_recv(recvbuf, recvcount, recvtype, source, deterministicRecvTag, comm, status);
|
|---|
| 402 | return 0;
|
|---|
| 403 | }
|
|---|
| 404 |
|
|---|
| 405 |
|
|---|
| 406 | /*
|
|---|
| 407 | This function completes a non-blocking send operation and removes
|
|---|
| 408 | the record associated to the send operation in the record queue.
|
|---|
| 409 | This function can potentially be blocked at a state where there is
|
|---|
| 410 | no matching receive for this send operation.
|
|---|
| 411 |
|
|---|
| 412 | requires "request" to be valid and "status" to be either valid or
|
|---|
| 413 | MPI_STATUS_IGNORE
|
|---|
| 414 | */
|
|---|
| 415 | int $mpi_wait_send(MPI_Request * request, MPI_Status * status) {
|
|---|
| 416 | // TODO: To check potential deadlock, need to look through the message
|
|---|
| 417 | // channel and the send record queue at the same time to determine
|
|---|
| 418 | // if there is (or was) a matching receive enabled for this send
|
|---|
| 419 | // operation. Currently, the model only detects absolute deadlock.
|
|---|
| 420 | MPI_Request req = *request;
|
|---|
| 421 |
|
|---|
| 422 | $assert(req->isSend);
|
|---|
| 423 | if (status != MPI_STATUS_IGNORE)
|
|---|
| 424 | $copy(status, &(req->status));
|
|---|
| 425 |
|
|---|
| 426 | int queue_size = $seq_length(&(req->comm->send_records));
|
|---|
| 427 |
|
|---|
| 428 | for (int i = 0; i < queue_size; i++) {
|
|---|
| 429 | if (req->comm->send_records[i] == req->record) {
|
|---|
| 430 | $seq_remove(&(req->comm->send_records), i, NULL, 1);
|
|---|
| 431 | break;
|
|---|
| 432 | }
|
|---|
| 433 | }
|
|---|
| 434 | $free(req->record);
|
|---|
| 435 | return 1;
|
|---|
| 436 | }
|
|---|
| 437 |
|
|---|
| 438 | /*
|
|---|
| 439 | This function completes the receive operation associated to the
|
|---|
| 440 | "request" handle and assigns "MPI_Status" data to "*status".
|
|---|
| 441 |
|
|---|
| 442 | requires "request" to be valid and "status" to be either valid or
|
|---|
| 443 | MPI_STATUS_IGNORE
|
|---|
| 444 | */
|
|---|
| 445 | int $mpi_wait_recv(MPI_Request * request, MPI_Status * status) {
|
|---|
| 446 | MPI_Request req = *request;
|
|---|
| 447 | _mpi_nb_record_t * record = req->record;
|
|---|
| 448 | MPI_Comm comm = req->comm;
|
|---|
| 449 | int queue_size = $seq_length(&comm->recv_records);
|
|---|
| 450 |
|
|---|
| 451 | $assert(!req->isSend);
|
|---|
| 452 | // if the operation of "*request" has already completed:
|
|---|
| 453 | if (record->completed) {
|
|---|
| 454 | if (status != MPI_STATUS_IGNORE) {
|
|---|
| 455 | status->MPI_SOURCE = record->src_or_dest;
|
|---|
| 456 | status->MPI_TAG = record->tag;
|
|---|
| 457 | status->MPI_ERROR = 0;
|
|---|
| 458 | status->size = record->count * sizeofDatatype(record->datatype);
|
|---|
| 459 | }
|
|---|
| 460 | for (int i = 0; i < queue_size; i++)
|
|---|
| 461 | if (comm->recv_records[i] == record) {
|
|---|
| 462 | $seq_remove(&comm->recv_records, i, NULL, 1);
|
|---|
| 463 | break;
|
|---|
| 464 | }
|
|---|
| 465 | $free(record);
|
|---|
| 466 | return 1;
|
|---|
| 467 | }
|
|---|
| 468 | $elaborate(record->src_or_dest);
|
|---|
| 469 | /*
|
|---|
| 470 | For any wildcard record "wr", on which "record" depends, "wr" must
|
|---|
| 471 | be completed first. But we can only over-approximate the
|
|---|
| 472 | dependency relation.
|
|---|
| 473 | */
|
|---|
| 474 | for (int i = 0; i < queue_size; i++) {
|
|---|
| 475 | _mpi_nb_record_t * wr = comm->recv_records[i];
|
|---|
| 476 |
|
|---|
| 477 | if (wr->completed)
|
|---|
| 478 | continue;
|
|---|
| 479 | if (wr == record)
|
|---|
| 480 | break;
|
|---|
| 481 | $elaborate(wr->src_or_dest);
|
|---|
| 482 | if (wr->src_or_dest == MPI_ANY_SOURCE && (wr->tag == record->tag ||
|
|---|
| 483 | wr->tag == MPI_ANY_TAG ||
|
|---|
| 484 | record->tag == MPI_ANY_TAG)) {
|
|---|
| 485 | // Over-approximately "record" depends on "wr". If receive of
|
|---|
| 486 | // "wr" can be enabled, we always let "wr" go first. In this
|
|---|
| 487 | // case, "record" may or may not depend on "wr". It is still
|
|---|
| 488 | // correct to let "wr" go first as it is already matched anyway.
|
|---|
| 489 | // If only the operation of "record" can be enabled, it means
|
|---|
| 490 | // that "record" in fact does not depend on "wr". We can ignore
|
|---|
| 491 | // "wr". Otherwise, blocked.
|
|---|
| 492 | _Bool wr_enabled = 0;
|
|---|
| 493 | MPI_Status tmp_status;
|
|---|
| 494 |
|
|---|
| 495 | $choose {
|
|---|
| 496 | $when($comm_probe(comm->p2p, MPI_ANY_SOURCE, wr->tag))
|
|---|
| 497 | wr_enabled = 1;
|
|---|
| 498 | $when(!$comm_probe(comm->p2p, MPI_ANY_SOURCE, wr->tag) && $comm_probe(comm->p2p, record->src_or_dest, record->tag))
|
|---|
| 499 | ;
|
|---|
| 500 | }
|
|---|
| 501 | if (wr_enabled) {
|
|---|
| 502 | $mpi_recv(wr->buf, wr->count, wr->datatype, wr->src_or_dest, wr->tag, comm, &tmp_status);
|
|---|
| 503 | wr->src_or_dest = tmp_status.MPI_SOURCE;
|
|---|
| 504 | wr->tag = tmp_status.MPI_TAG;
|
|---|
| 505 | wr->completed = 1;
|
|---|
| 506 | continue;
|
|---|
| 507 | }
|
|---|
| 508 | }
|
|---|
| 509 | }
|
|---|
| 510 | // At this point, there is no wildcard receive record on which
|
|---|
| 511 | // "record" depends on precedes "record" in queue.
|
|---|
| 512 | $message in;
|
|---|
| 513 | _Bool recv_again;
|
|---|
| 514 | int idx = -1;
|
|---|
| 515 |
|
|---|
| 516 | do {
|
|---|
| 517 | in = $comm_dequeue(comm->p2p, record->src_or_dest, record->tag);
|
|---|
| 518 | // check if any preceding record matches this message:
|
|---|
| 519 | recv_again = 0;
|
|---|
| 520 | for (int i = 0; i < queue_size; i++) {
|
|---|
| 521 | _mpi_nb_record_t * r = comm->recv_records[i];
|
|---|
| 522 |
|
|---|
| 523 | if (r->completed)
|
|---|
| 524 | continue;
|
|---|
| 525 | if (r == record)
|
|---|
| 526 | idx = i;
|
|---|
| 527 | if (r == record || (r->src_or_dest == $message_source(in) &&
|
|---|
| 528 | (r->tag == $message_tag(in) || r->tag == MPI_ANY_TAG))) {
|
|---|
| 529 | // this message belongs to r:
|
|---|
| 530 | r->src_or_dest = $message_source(in);
|
|---|
| 531 | r->tag = $message_tag(in);
|
|---|
| 532 | $message_unpack(in, r->buf, r->count * sizeofDatatype(r->datatype));
|
|---|
| 533 | r->completed = 1;
|
|---|
| 534 | recv_again = r != record;
|
|---|
| 535 | break;
|
|---|
| 536 | }
|
|---|
| 537 | }
|
|---|
| 538 | } while (recv_again);
|
|---|
| 539 | if (status != MPI_STATUS_IGNORE) {
|
|---|
| 540 | status->MPI_SOURCE = $message_source(in);
|
|---|
| 541 | status->MPI_TAG = $message_tag(in);
|
|---|
| 542 | status->MPI_ERROR = 0;
|
|---|
| 543 | status->size = $message_size(in);
|
|---|
| 544 | }
|
|---|
| 545 | $seq_remove(&comm->recv_records, idx, NULL, 1);
|
|---|
| 546 | free(record);
|
|---|
| 547 | return 1;
|
|---|
| 548 | }
|
|---|
| 549 |
|
|---|
| 550 | int $mpi_wait(MPI_Request * request, MPI_Status * status) {
|
|---|
| 551 | if (*request == MPI_REQUEST_NULL)
|
|---|
| 552 | return 1;
|
|---|
| 553 | if ((*request)->isSend)
|
|---|
| 554 | $mpi_wait_send(request, status);
|
|---|
| 555 | else
|
|---|
| 556 | $mpi_wait_recv(request, status);
|
|---|
| 557 | $free(*request);
|
|---|
| 558 | *request = MPI_REQUEST_NULL;
|
|---|
| 559 | return 1;
|
|---|
| 560 | }
|
|---|
| 561 |
|
|---|
| 562 | /********************* Collective helper functions ********************/
|
|---|
| 563 | /* Note: collective helpers functions are functions have same
|
|---|
| 564 | behaviors as MPI collective functions, it can be re-used as a part
|
|---|
| 565 | of implementation by different MPI routines. For example,
|
|---|
| 566 | MPI_Allreduce will call CMPI_Reduce and CMPI_Bcast, both of them
|
|---|
| 567 | should throw errors (if encounters any) as if errors are thrown
|
|---|
| 568 | from MPI_Allreduce.
|
|---|
| 569 | */
|
|---|
| 570 | int $mpi_collective_send(const void *buf, int count, MPI_Datatype datatype, int dest,
|
|---|
| 571 | int tag, MPI_Comm comm) {
|
|---|
| 572 | if (dest >= 0) {
|
|---|
| 573 | int size = count*sizeofDatatype(datatype);
|
|---|
| 574 | int place = $comm_place(comm->col);
|
|---|
| 575 | $message out = $message_pack(place, dest, tag, buf, size);
|
|---|
| 576 |
|
|---|
| 577 | $elaborate(dest);
|
|---|
| 578 | $comm_enqueue(comm->col, out);
|
|---|
| 579 | }
|
|---|
| 580 | return 0;
|
|---|
| 581 | }
|
|---|
| 582 |
|
|---|
| 583 | int $mpi_collective_recv(void *buf, int count, MPI_Datatype datatype,
|
|---|
| 584 | int source, int tag, MPI_Comm comm,
|
|---|
| 585 | MPI_Status * status, char * routName) {
|
|---|
| 586 | if(source >= 0 || source == MPI_ANY_SOURCE) {
|
|---|
| 587 | $elaborate(source);
|
|---|
| 588 | $message in = $comm_dequeue(comm->col, source, MPI_ANY_TAG);
|
|---|
| 589 | int size = count*sizeofDatatype(datatype);
|
|---|
| 590 | int recvTag;
|
|---|
| 591 |
|
|---|
| 592 | /* This routine should only be used by collective routines, there
|
|---|
| 593 | is no non-deterministic tags for collective routines.*/
|
|---|
| 594 | recvTag = $message_tag(in);
|
|---|
| 595 | $assert (recvTag == tag, "Collective routine %s receives a "
|
|---|
| 596 | "message with a mismatched tag\n", routName);
|
|---|
| 597 | $message_unpack(in, buf, size);
|
|---|
| 598 | if (status != MPI_STATUS_IGNORE) {
|
|---|
| 599 | status->size = $message_size(in);
|
|---|
| 600 | status->MPI_SOURCE = $message_source(in);
|
|---|
| 601 | status->MPI_TAG = recvTag;
|
|---|
| 602 | status->MPI_ERROR = 0;
|
|---|
| 603 | }
|
|---|
| 604 | }
|
|---|
| 605 | return 0;
|
|---|
| 606 | }
|
|---|
| 607 |
|
|---|
| 608 | /* Broadcast helper function that uses any specified message tag */
|
|---|
| 609 | int $mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, int tag,
|
|---|
| 610 | MPI_Comm comm, char * routName) {
|
|---|
| 611 | if ($comm_place(comm->col) == root) {
|
|---|
| 612 | int nprocs = $comm_size(comm->col);
|
|---|
| 613 |
|
|---|
| 614 | for (int i=0; i<nprocs; i++)
|
|---|
| 615 | if (i != root)
|
|---|
| 616 | $mpi_collective_send(buf, count, datatype, i, tag, comm);
|
|---|
| 617 | } else
|
|---|
| 618 | $mpi_collective_recv(buf, count, datatype, root, tag, comm,
|
|---|
| 619 | MPI_STATUS_IGNORE, routName);
|
|---|
| 620 | return 0;
|
|---|
| 621 | }
|
|---|
| 622 |
|
|---|
| 623 | /* Reduction helper function that uses any specified message tag */
|
|---|
| 624 | int $mpi_reduce(const void* sendbuf, void* recvbuf, int count,
|
|---|
| 625 | MPI_Datatype datatype, MPI_Op op, int root, int tag,
|
|---|
| 626 | MPI_Comm comm, char * routName) {
|
|---|
| 627 | int rank;
|
|---|
| 628 |
|
|---|
| 629 | rank = $comm_place(comm->col);
|
|---|
| 630 | if (rank != root)
|
|---|
| 631 | $mpi_collective_send(sendbuf, count, datatype, root, tag, comm);
|
|---|
| 632 | else {
|
|---|
| 633 | int nprocs = $comm_size(comm->col);
|
|---|
| 634 | int size;
|
|---|
| 635 |
|
|---|
| 636 | size = count * sizeofDatatype(datatype);
|
|---|
| 637 | memcpy(recvbuf, sendbuf, size);
|
|---|
| 638 | for (int i = 0; i<nprocs; i++) {
|
|---|
| 639 | if(i != root){
|
|---|
| 640 | int colTag;
|
|---|
| 641 | void * applybuf;
|
|---|
| 642 | $message in = $comm_dequeue(comm->col, i, MPI_ANY_TAG);
|
|---|
| 643 |
|
|---|
| 644 | /* Collective routines have no non-deterministic tags.*/
|
|---|
| 645 | colTag = $message_tag(in);
|
|---|
| 646 | $assert (colTag == tag , "Collective routine %s receives a "
|
|---|
| 647 | "message with a mismatched tag\n", routName);
|
|---|
| 648 | /* the third argument "count" indicates the number of cells needs doing the
|
|---|
| 649 | operation. */
|
|---|
| 650 | applybuf = $mpi_malloc(count, datatype);
|
|---|
| 651 | $bundle_unpack_apply(in.data, recvbuf, op, count, applybuf);
|
|---|
| 652 | memcpy(recvbuf, applybuf, size);
|
|---|
| 653 | free(applybuf);
|
|---|
| 654 | $assert (in.size <= size ,
|
|---|
| 655 | "Message of size %d exceeds the specified size %d.", in.size, size);
|
|---|
| 656 | }
|
|---|
| 657 | }
|
|---|
| 658 | }
|
|---|
| 659 | return 0;
|
|---|
| 660 | }
|
|---|
| 661 |
|
|---|
| 662 | /* Gathering helper function that uses any specified message tag */
|
|---|
| 663 | int $mpi_gather(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
|
|---|
| 664 | void* recvbuf, int recvcount, MPI_Datatype recvtype,
|
|---|
| 665 | int root, int tag, MPI_Comm comm, char * routName){
|
|---|
| 666 | int rank, nprocs;
|
|---|
| 667 | MPI_Status status;
|
|---|
| 668 |
|
|---|
| 669 | rank = $comm_place(comm->col);
|
|---|
| 670 | nprocs = $comm_size(comm->col);
|
|---|
| 671 | /* MPI standard requirement:
|
|---|
| 672 | * For root process, sendtype must be equal to
|
|---|
| 673 | * recvtype. */
|
|---|
| 674 | if(rank == root)
|
|---|
| 675 | $assert (sendtype == recvtype,
|
|---|
| 676 | "%s asks for equality "
|
|---|
| 677 | "between 'sendtype' and 'recvtype'.", routName);
|
|---|
| 678 | /* MPI_standard requirement:
|
|---|
| 679 | * Only root process can use MPI_IN_PLACE*/
|
|---|
| 680 | if(sendbuf == MPI_IN_PLACE){
|
|---|
| 681 | $assert (root == rank,
|
|---|
| 682 | "Only root can replace 'sendbuf' with 'MPI_IN_PLACE'.");
|
|---|
| 683 | } else if(root == rank) {
|
|---|
| 684 | void * ptr;
|
|---|
| 685 |
|
|---|
| 686 | $assert(sendcount == recvcount, "Root process of routine %d without using"
|
|---|
| 687 | " MPI_IN_PLACE should give the same value for recvcount and sendcount",
|
|---|
| 688 | routName);
|
|---|
| 689 | ptr = $mpi_pointer_add(recvbuf, root * recvcount, recvtype);
|
|---|
| 690 | memcpy(ptr, sendbuf, recvcount * sizeofDatatype(recvtype));
|
|---|
| 691 | } else
|
|---|
| 692 | $mpi_collective_send(sendbuf, sendcount, sendtype, root, tag, comm);
|
|---|
| 693 | /* Root process receives messages and put them in right places */
|
|---|
| 694 | if(rank == root){
|
|---|
| 695 | int real_recvcount;
|
|---|
| 696 | int offset;
|
|---|
| 697 |
|
|---|
| 698 | for(int i=0; i<nprocs; i++){
|
|---|
| 699 | if(i != root) {
|
|---|
| 700 | void * ptr;
|
|---|
| 701 |
|
|---|
| 702 | offset = i * recvcount;
|
|---|
| 703 | ptr = $mpi_pointer_add(recvbuf, offset, recvtype);
|
|---|
| 704 | $mpi_collective_recv(ptr, recvcount, recvtype,
|
|---|
| 705 | i, tag, comm, &status, routName);
|
|---|
| 706 | real_recvcount = status.size/sizeofDatatype(recvtype);
|
|---|
| 707 | $assert(real_recvcount == recvcount,
|
|---|
| 708 | "%s asks for equality between"
|
|---|
| 709 | " the amount of data sent and the "
|
|---|
| 710 | "amount of data received.", routName);
|
|---|
| 711 | }
|
|---|
| 712 | }
|
|---|
| 713 | }
|
|---|
| 714 | return 0;
|
|---|
| 715 | }
|
|---|
| 716 |
|
|---|
| 717 | int $mpi_gatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
|
|---|
| 718 | void* recvbuf, const int recvcounts[], const int displs[],
|
|---|
| 719 | MPI_Datatype recvtype, int root, int tag,
|
|---|
| 720 | MPI_Comm comm, char * routName){
|
|---|
| 721 | int rank, nprocs;
|
|---|
| 722 |
|
|---|
| 723 | rank = $comm_place(comm->col);
|
|---|
| 724 | nprocs = $comm_size(comm->col);
|
|---|
| 725 | /* MPI standard requirement:
|
|---|
| 726 | * For root process, sendtype must be equal to
|
|---|
| 727 | * recvtype. */
|
|---|
| 728 | if(rank == root)
|
|---|
| 729 | $assert(sendtype == recvtype, "%s asks for equality "
|
|---|
| 730 | "between 'sendtype' and 'recvtype'.", routName);
|
|---|
| 731 | /* MPI_standard requirement:
|
|---|
| 732 | * Only root process can use MPI_IN_PLACE*/
|
|---|
| 733 | if(sendbuf == MPI_IN_PLACE){
|
|---|
| 734 | $assert(root == rank, "Only root can replace 'sendbuf' with 'MPI_IN_PLACE'.");
|
|---|
| 735 | }else if(root == rank) {
|
|---|
| 736 | void * ptr;
|
|---|
| 737 |
|
|---|
| 738 | $assert(sendcount == recvcounts[root], "For routine %s, recvcounts[%d] "
|
|---|
| 739 | "should be same as the sendcount of the process with rank %d.\n",
|
|---|
| 740 | routName, root, root);
|
|---|
| 741 | ptr = $mpi_pointer_add(recvbuf, displs[rank], recvtype);
|
|---|
| 742 | memcpy(ptr, sendbuf, sendcount * sizeofDatatype(recvtype));
|
|---|
| 743 | }else{
|
|---|
| 744 | $mpi_collective_send(sendbuf, sendcount, sendtype, root, tag, comm);
|
|---|
| 745 | }
|
|---|
| 746 | /* Root process receives messages and put them in right places */
|
|---|
| 747 | if(rank == root){
|
|---|
| 748 | int real_recvcount;
|
|---|
| 749 | MPI_Status status;
|
|---|
| 750 |
|
|---|
| 751 | for(int i=0; i<nprocs; i++){
|
|---|
| 752 | if(i != root){
|
|---|
| 753 | void * ptr = $mpi_pointer_add(recvbuf, displs[i], recvtype);
|
|---|
| 754 |
|
|---|
| 755 | $mpi_collective_recv(ptr, recvcounts[i],
|
|---|
| 756 | recvtype, i, tag, comm, &status, routName);
|
|---|
| 757 | real_recvcount = status.size/sizeofDatatype(recvtype);
|
|---|
| 758 | $assert(real_recvcount == recvcounts[i], "%s asks for equality between"
|
|---|
| 759 | " the amount of data sent and the "
|
|---|
| 760 | "amount of data received.", routName);
|
|---|
| 761 | }
|
|---|
| 762 | }
|
|---|
| 763 | }
|
|---|
| 764 | return 0;
|
|---|
| 765 | }
|
|---|
| 766 |
|
|---|
| 767 | /* Scatter helper function that uses any specified message tag */
|
|---|
| 768 | int $mpi_scatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
|
|---|
| 769 | void* recvbuf, int recvcount, MPI_Datatype recvtype, int root,
|
|---|
| 770 | int tag, MPI_Comm comm, char * routName){
|
|---|
| 771 | int rank, nprocs;
|
|---|
| 772 |
|
|---|
| 773 | rank = $comm_place(comm->col);
|
|---|
| 774 | nprocs = $comm_size(comm->col);
|
|---|
| 775 | /* MPI standard requirement:
|
|---|
| 776 | * For root process, sendtype must be equal to
|
|---|
| 777 | * recvtype. */
|
|---|
| 778 | if(rank == root)
|
|---|
| 779 | $assert(sendtype == recvtype, "MPI_Scatter() asks for equality "
|
|---|
| 780 | "between 'sendtype' and 'recvtype'.");
|
|---|
| 781 | /* MPI_standard requirement:
|
|---|
| 782 | * Only root process can use MPI_IN_PLACE */
|
|---|
| 783 | if(recvbuf == MPI_IN_PLACE){
|
|---|
| 784 | $assert(root == rank, "Only root can replace 'recvbuf' with 'MPI_IN_PLACE'.");
|
|---|
| 785 | }else if(rank == root) {
|
|---|
| 786 | void * ptr;
|
|---|
| 787 |
|
|---|
| 788 | $assert(sendcount == recvcount, "Root process of routine %d without using"
|
|---|
| 789 | " MPI_IN_PLACE should give the same value for recvcount and sendcount",
|
|---|
| 790 | routName);
|
|---|
| 791 | ptr = $mpi_pointer_add(sendbuf, root*recvcount, sendtype);
|
|---|
| 792 | memcpy(recvbuf, ptr, sizeofDatatype(recvtype)*recvcount);
|
|---|
| 793 | }
|
|---|
| 794 | /* Root process scatters data to other processes */
|
|---|
| 795 | if(rank == root){
|
|---|
| 796 | int offset;
|
|---|
| 797 |
|
|---|
| 798 | for(int i=0; i<nprocs; i++){
|
|---|
| 799 | if(i != root) {
|
|---|
| 800 | void * ptr;
|
|---|
| 801 |
|
|---|
| 802 | offset = i * sendcount;
|
|---|
| 803 | ptr = $mpi_pointer_add(sendbuf, offset, sendtype);
|
|---|
| 804 | $mpi_collective_send(ptr, sendcount, sendtype, i, tag, comm);
|
|---|
| 805 | }
|
|---|
| 806 | }
|
|---|
| 807 | }
|
|---|
| 808 | /* Non-root processes receive data */
|
|---|
| 809 | if(!(root == rank)){
|
|---|
| 810 | int real_recvcount;
|
|---|
| 811 | MPI_Status status;
|
|---|
| 812 |
|
|---|
| 813 | $mpi_collective_recv(recvbuf, recvcount, recvtype,
|
|---|
| 814 | root, tag, comm, &status, routName);
|
|---|
| 815 | real_recvcount = status.size/sizeofDatatype(recvtype);
|
|---|
| 816 | $assert(real_recvcount == recvcount,
|
|---|
| 817 | "%s asks for equality between"
|
|---|
| 818 | " the amount of data sent and the "
|
|---|
| 819 | "amount of data received.", routName);
|
|---|
| 820 | }
|
|---|
| 821 | return 0;
|
|---|
| 822 | }
|
|---|
| 823 |
|
|---|
| 824 | /* Scatterv helper function that uses any specified message tag */
|
|---|
| 825 | int $mpi_scatterv(const void* sendbuf, const int sendcounts[], const
|
|---|
| 826 | int displs[], MPI_Datatype sendtype, void* recvbuf,
|
|---|
| 827 | int recvcount, MPI_Datatype recvtype, int root, int tag,
|
|---|
| 828 | MPI_Comm comm, char * routName){
|
|---|
| 829 | int rank, nprocs;
|
|---|
| 830 |
|
|---|
| 831 | rank = $comm_place(comm->col);
|
|---|
| 832 | nprocs = $comm_size(comm->col);
|
|---|
| 833 | /* MPI standard requirement:
|
|---|
| 834 | * For root process, sendtype must be equal to
|
|---|
| 835 | * recvtype. */
|
|---|
| 836 | if(rank == root)
|
|---|
| 837 | $assert(sendtype == recvtype, "%s asks for equality "
|
|---|
| 838 | "between 'sendtype' and 'recvtype'.", routName);
|
|---|
| 839 | /* MPI_standard requirement:
|
|---|
| 840 | * Only root process can use MPI_IN_PLACE */
|
|---|
| 841 | if(recvbuf == MPI_IN_PLACE){
|
|---|
| 842 | $assert(root == rank, "Only root can replace 'recvbuf' with 'MPI_IN_PLACE'.");
|
|---|
| 843 | } else if(rank == root) {
|
|---|
| 844 | void * ptr;
|
|---|
| 845 |
|
|---|
| 846 | $assert(sendcounts[root] == recvcount, "For routine %s, sendcounts[%d] "
|
|---|
| 847 | "should be same as the recvcount of the process with rank %d.\n",
|
|---|
| 848 | routName, root, root);
|
|---|
| 849 | ptr = $mpi_pointer_add(sendbuf, displs[root], sendtype);
|
|---|
| 850 | memcpy(recvbuf, ptr, recvcount*sizeofDatatype(recvtype));
|
|---|
| 851 | }
|
|---|
| 852 | /* Root process scatters data to other processes */
|
|---|
| 853 | if(rank == root){
|
|---|
| 854 | for(int i=0; i<nprocs; i++){
|
|---|
| 855 | if(i != root) {
|
|---|
| 856 | void * ptr = $mpi_pointer_add(sendbuf, displs[i], sendtype);
|
|---|
| 857 |
|
|---|
| 858 | $mpi_collective_send(ptr, sendcounts[i], sendtype, i,
|
|---|
| 859 | tag, comm);
|
|---|
| 860 | }
|
|---|
| 861 | }
|
|---|
| 862 | }
|
|---|
| 863 | if(!(root == rank)){
|
|---|
| 864 | MPI_Status status;
|
|---|
| 865 | int real_recvcount;
|
|---|
| 866 |
|
|---|
| 867 | $mpi_collective_recv(recvbuf, recvcount, recvtype,
|
|---|
| 868 | root, tag, comm, &status, routName);
|
|---|
| 869 | real_recvcount = status.size/sizeofDatatype(recvtype);
|
|---|
| 870 | $assert(real_recvcount == recvcount, "Process rank:%d\n%s asks for equality between"
|
|---|
| 871 | " the amount of data sent (%d) and the "
|
|---|
| 872 | "amount of data received (%d).", rank, routName, real_recvcount, recvcount);
|
|---|
| 873 | }
|
|---|
| 874 | return 0;
|
|---|
| 875 | }
|
|---|
| 876 |
|
|---|
| 877 | /* The helper function for (inclusive) MPI_Scan */
|
|---|
| 878 | int $mpi_scan(const void* sendbuf, void* recvbuf, int count,
|
|---|
| 879 | MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
|
|---|
| 880 | int place, nprocs;
|
|---|
| 881 | int size = count * sizeofDatatype(datatype);
|
|---|
| 882 | void * tmp_space = $mpi_malloc(count, datatype);
|
|---|
| 883 | void * tmp_ptr, * tmp_space_const = tmp_space;
|
|---|
| 884 |
|
|---|
| 885 | place = $comm_place(comm->col);
|
|---|
| 886 | nprocs = $comm_size(comm->col);
|
|---|
| 887 | /* Each process do reduction from 0 .. rank (inclusive):
|
|---|
| 888 | * Send to process rank + 1 ... nprocs-1 (inclusive)
|
|---|
| 889 | * Recv from process 0 .. rank (exclusive)
|
|---|
| 890 | */
|
|---|
| 891 | for (int i = place + 1; i < nprocs; i++)
|
|---|
| 892 | $mpi_collective_send(sendbuf, count, datatype, i, SCAN_TAG, comm);
|
|---|
| 893 | if (sendbuf != MPI_IN_PLACE)
|
|---|
| 894 | memcpy(recvbuf, sendbuf, size);
|
|---|
| 895 | for (int i = 0; i < place; i++) {
|
|---|
| 896 | $message in = $comm_dequeue(comm->col, i, SCAN_TAG);
|
|---|
| 897 |
|
|---|
| 898 | // so far, unpack_apply requires that 'recvbuf' is not aliasing
|
|---|
| 899 | // 'tmp_space' (can be improved in the future):
|
|---|
| 900 | $bundle_unpack_apply(in.data, recvbuf, op, count, tmp_space);
|
|---|
| 901 | // swap tmp_space with recvbuf
|
|---|
| 902 | tmp_ptr = tmp_space;
|
|---|
| 903 | tmp_space = recvbuf;
|
|---|
| 904 | recvbuf = tmp_ptr;
|
|---|
| 905 | $assert (in.size <= size ,
|
|---|
| 906 | "Message of size %d exceeds the specified size %d.", in.size, size);
|
|---|
| 907 | }
|
|---|
| 908 | if (recvbuf == tmp_space_const)
|
|---|
| 909 | memcpy(tmp_space, recvbuf, size);
|
|---|
| 910 | free(tmp_space_const);
|
|---|
| 911 | return 0;
|
|---|
| 912 | }
|
|---|
| 913 |
|
|---|
| 914 | /* The helper function for (the exclusive) MPI_Exscan */
|
|---|
| 915 | int $mpi_exscan(const void* sendbuf, void* recvbuf, int count,
|
|---|
| 916 | MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
|
|---|
| 917 | int place, nprocs;
|
|---|
| 918 | int size = count * sizeofDatatype(datatype);
|
|---|
| 919 | void * tmp_space = $mpi_malloc(count, datatype);
|
|---|
| 920 | void * tmp_ptr, * tmp_space_const = tmp_space;
|
|---|
| 921 |
|
|---|
| 922 | place = $comm_place(comm->col);
|
|---|
| 923 | nprocs = $comm_size(comm->col);
|
|---|
| 924 | /* The “in place” option for intracommunicators is specified by
|
|---|
| 925 | * passing MPI_IN_PLACE in the sendbuf argument. In this case, the
|
|---|
| 926 | * input data is taken from the receive buffer, and replaced by the
|
|---|
| 927 | * output data. The receive buffer on rank 0 is not changed by this
|
|---|
| 928 | * operation. */
|
|---|
| 929 | if (sendbuf == MPI_IN_PLACE)
|
|---|
| 930 | sendbuf = recvbuf;
|
|---|
| 931 | /* Each process do reduction from 0 .. rank (inclusive):
|
|---|
| 932 | * Send to process rank + 1 ... nprocs-1 (inclusive)
|
|---|
| 933 | * Recv from process 0 .. rank (exclusive)
|
|---|
| 934 | */
|
|---|
| 935 | for (int i = place + 1; i < nprocs; i++)
|
|---|
| 936 | $mpi_collective_send(sendbuf, count, datatype, i, EXSCAN_TAG, comm);
|
|---|
| 937 | // no-op for rank 0
|
|---|
| 938 | if (place != 0) {
|
|---|
| 939 | $message in = $comm_dequeue(comm->col, 0, EXSCAN_TAG);
|
|---|
| 940 | $bundle_unpack(in.data, recvbuf);
|
|---|
| 941 | for (int i = 1; i < place; i++) {
|
|---|
| 942 | in = $comm_dequeue(comm->col, i, EXSCAN_TAG);
|
|---|
| 943 | // so far, unpack_apply requires that 'recvbuf' is not aliasing
|
|---|
| 944 | // 'tmp_space' (can be improved in the future):
|
|---|
| 945 | $bundle_unpack_apply(in.data, recvbuf, op, count, tmp_space);
|
|---|
| 946 | // swap tmp_space with recvbuf
|
|---|
| 947 | tmp_ptr = tmp_space;
|
|---|
| 948 | tmp_space = recvbuf;
|
|---|
| 949 | recvbuf = tmp_ptr;
|
|---|
| 950 | $assert (in.size <= size ,
|
|---|
| 951 | "Message of size %d exceeds the specified size %d.", in.size, size);
|
|---|
| 952 | }
|
|---|
| 953 | }
|
|---|
| 954 | if (recvbuf == tmp_space_const)
|
|---|
| 955 | memcpy(tmp_space, recvbuf, size);
|
|---|
| 956 | free(tmp_space_const);
|
|---|
| 957 | return 0;
|
|---|
| 958 | }
|
|---|
| 959 |
|
|---|
| 960 |
|
|---|
| 961 | /* ******************** End of collective routines ********************* */
|
|---|
| 962 |
|
|---|
| 963 | int $mpi_comm_dup($scope scope, MPI_Comm comm, MPI_Comm * newcomm, char * routName) {
|
|---|
| 964 | int place = $comm_place(comm->col);
|
|---|
| 965 | $mpi_gcomm newgcomm;
|
|---|
| 966 | int idx;
|
|---|
| 967 | $scope CMPI_ROOT_SCOPE = $mpi_root_scope(comm);
|
|---|
| 968 |
|
|---|
| 969 | if(place == 0) {
|
|---|
| 970 | int size = $comm_size(comm->col);
|
|---|
| 971 |
|
|---|
| 972 | newgcomm = $mpi_gcomm_create(CMPI_ROOT_SCOPE, size);
|
|---|
| 973 | idx = $mpi_new_gcomm(CMPI_ROOT_SCOPE, newgcomm);
|
|---|
| 974 | }
|
|---|
| 975 | $mpi_bcast(&idx, 1, MPI_INT, 0, COMMDUP_TAG,
|
|---|
| 976 | comm, routName);
|
|---|
| 977 | newgcomm = $mpi_get_gcomm(CMPI_ROOT_SCOPE, idx);
|
|---|
| 978 | (*newcomm) = $mpi_comm_create(scope, newgcomm, place);
|
|---|
| 979 | (*newcomm)->gcommIndex = idx;
|
|---|
| 980 | $barrier_call(comm->barrier);
|
|---|
| 981 | $gcomm_dup(comm->p2p, (*newcomm)->p2p);
|
|---|
| 982 | $gcomm_dup(comm->col, (*newcomm)->col);
|
|---|
| 983 | $barrier_call(comm->barrier);
|
|---|
| 984 | return 0;
|
|---|
| 985 | }
|
|---|
| 986 |
|
|---|
| 987 | int $mpi_comm_free(MPI_Comm *comm, $mpi_state mpi_state) {
|
|---|
| 988 | int place = $comm_place((*comm)->col);
|
|---|
| 989 | int size = $comm_size((*comm)->col);
|
|---|
| 990 | int buf[size];
|
|---|
| 991 | int gcommIndex = (*comm)->gcommIndex;
|
|---|
| 992 | $scope CMPI_ROOT_SCOPE = $mpi_root_scope(*comm);
|
|---|
| 993 |
|
|---|
| 994 | //TODO: $mpi_gather here is just a ugly synchronization
|
|---|
| 995 | $mpi_gather(&place, 1, MPI_INT, buf, 1, MPI_INT, 0,
|
|---|
| 996 | COMMFREE_TAG, (*comm), "MPI_Comm_free synchronization.");
|
|---|
| 997 | $mpi_comm_destroy(*comm, mpi_state);
|
|---|
| 998 | if(place == 0) {
|
|---|
| 999 | $mpi_gcomm temp = $mpi_get_gcomm(CMPI_ROOT_SCOPE, gcommIndex);
|
|---|
| 1000 |
|
|---|
| 1001 | $mpi_gcomm_destroy(temp);
|
|---|
| 1002 | }
|
|---|
| 1003 | return 0;
|
|---|
| 1004 | }
|
|---|
| 1005 |
|
|---|
| 1006 | $bundle $mpi_create_coroutine_entry(int routineTag, int root,
|
|---|
| 1007 | int op, int numDatatypes, int * datatypes) {
|
|---|
| 1008 | int zero = 0;
|
|---|
| 1009 | $bundle bundledEntry;
|
|---|
| 1010 | struct Entry {
|
|---|
| 1011 | int routine_tag;
|
|---|
| 1012 | int root;
|
|---|
| 1013 | int op;
|
|---|
| 1014 | int numTypes;
|
|---|
| 1015 | int datatypes[];
|
|---|
| 1016 | }entry;
|
|---|
| 1017 |
|
|---|
| 1018 | entry.routine_tag = routineTag;
|
|---|
| 1019 | entry.root = root;
|
|---|
| 1020 | entry.op = op;
|
|---|
| 1021 | entry.numTypes = numDatatypes;
|
|---|
| 1022 | $seq_init(&entry.datatypes, numDatatypes, &zero);
|
|---|
| 1023 | for(int i = 0; i < numDatatypes; i++)
|
|---|
| 1024 | entry.datatypes[i] = datatypes[i];
|
|---|
| 1025 | bundledEntry = $bundle_pack(&entry, sizeof(struct Entry));
|
|---|
| 1026 | return bundledEntry;
|
|---|
| 1027 | }
|
|---|
| 1028 |
|
|---|
| 1029 | void $mpi_diff_coroutine_entries($bundle specEntry, $bundle mineEntry, int rank) {
|
|---|
| 1030 | struct Entry {
|
|---|
| 1031 | int routine_tag;
|
|---|
| 1032 | int root;
|
|---|
| 1033 | int op;
|
|---|
| 1034 | int numTypes;
|
|---|
| 1035 | int datatypes[];
|
|---|
| 1036 | }spec, mine;
|
|---|
| 1037 | char * routine;
|
|---|
| 1038 | int numTypes;
|
|---|
| 1039 |
|
|---|
| 1040 | $bundle_unpack(specEntry, &spec);
|
|---|
| 1041 | $bundle_unpack(mineEntry, &mine);
|
|---|
| 1042 | routine = $mpi_coroutine_name(spec.routine_tag);
|
|---|
| 1043 | if(spec.routine_tag != mine.routine_tag) {
|
|---|
| 1044 | char * mineRoutine = $mpi_coroutine_name(mine.routine_tag);
|
|---|
| 1045 |
|
|---|
| 1046 | $assert($false, "Process with rank %d reaches an MPI collective routine "
|
|---|
| 1047 | "%s while at least one of others are collectively reaching %s.",
|
|---|
| 1048 | rank, mineRoutine, routine);
|
|---|
| 1049 | }
|
|---|
| 1050 | else if(spec.root != mine.root) {
|
|---|
| 1051 | $assert($false, "Process with rank %d reaches an MPI collective routine "
|
|---|
| 1052 | "%s which has a different root with at least one of others.", rank, routine);
|
|---|
| 1053 | } else if(spec.op != mine.op) {
|
|---|
| 1054 | $assert($false, "Process with rank %d reaches an MPI collective routine "
|
|---|
| 1055 | "%s which has a different MPI_Op with at least one of others", rank, routine);
|
|---|
| 1056 | } else if(spec.numTypes != mine.numTypes) {
|
|---|
| 1057 | $assert($false, "Process with rank %d reaches an MPI collective routine "
|
|---|
| 1058 | "%s which has an inconsistent datatype specification with at least"
|
|---|
| 1059 | " one of others",
|
|---|
| 1060 | rank, routine);
|
|---|
| 1061 | }
|
|---|
| 1062 | numTypes = spec.numTypes;
|
|---|
| 1063 | for(int i = 0; i < numTypes; i++)
|
|---|
| 1064 | if(spec.datatypes[i] != mine.datatypes[i]) {
|
|---|
| 1065 | $assert($false, "Process with rank %d reaches an MPI collective routine "
|
|---|
| 1066 | "%s which has an inconsistent datatype specification with at "
|
|---|
| 1067 | "least one of others",
|
|---|
| 1068 | rank, routine);
|
|---|
| 1069 | break;
|
|---|
| 1070 | }
|
|---|
| 1071 | }
|
|---|
| 1072 |
|
|---|
| 1073 | int $mpi_barrier(MPI_Comm comm) {
|
|---|
| 1074 | $barrier_call(comm->barrier);
|
|---|
| 1075 | return 0;
|
|---|
| 1076 | }
|
|---|
| 1077 |
|
|---|
| 1078 | #ifdef _MPI_CONTRACT
|
|---|
| 1079 |
|
|---|
| 1080 | $collate_state $mpi_snapshot(MPI_Comm comm, $scope scope) {
|
|---|
| 1081 | return $collate_arrives(comm->collator, scope);
|
|---|
| 1082 | }
|
|---|
| 1083 |
|
|---|
| 1084 | void $mpi_unsnapshot(MPI_Comm comm, $collate_state cs) {
|
|---|
| 1085 | $collate_departs(comm->collator, cs);
|
|---|
| 1086 | }
|
|---|
| 1087 |
|
|---|
| 1088 | void $mpi_assigns(void * buf, int count, MPI_Datatype datatype) {
|
|---|
| 1089 | if ($is_concrete_int(datatype)) {
|
|---|
| 1090 | size_t size = sizeofDatatype(datatype);
|
|---|
| 1091 | int _int[count];
|
|---|
| 1092 | int _2int[count * 2];
|
|---|
| 1093 | char _char[count];
|
|---|
| 1094 | $real _real[count];
|
|---|
| 1095 |
|
|---|
| 1096 | switch (datatype) {
|
|---|
| 1097 | case MPI_INT:
|
|---|
| 1098 | case MPI_SHORT:
|
|---|
| 1099 | case MPI_LONG:
|
|---|
| 1100 | case MPI_LONG_LONG_INT:
|
|---|
| 1101 | case MPI_LONG_LONG:
|
|---|
| 1102 | case MPI_UNSIGNED_LONG_LONG:
|
|---|
| 1103 | memcpy(buf, _int, count * size);
|
|---|
| 1104 | break;
|
|---|
| 1105 | case MPI_2INT:
|
|---|
| 1106 | memcpy(buf, _2int, 2 * count * size);
|
|---|
| 1107 | break;
|
|---|
| 1108 | case MPI_FLOAT:
|
|---|
| 1109 | case MPI_DOUBLE:
|
|---|
| 1110 | case MPI_LONG_DOUBLE:
|
|---|
| 1111 | memcpy(buf, _real, count * size);
|
|---|
| 1112 | break;
|
|---|
| 1113 | case MPI_CHAR:
|
|---|
| 1114 | case MPI_BYTE:
|
|---|
| 1115 | memcpy(buf, _char, count * size);
|
|---|
| 1116 | break;
|
|---|
| 1117 | default:
|
|---|
| 1118 | $assert(0, "Unreachable");
|
|---|
| 1119 | }
|
|---|
| 1120 | } else {
|
|---|
| 1121 | size_t realCount = count * $mpi_extentof(datatype);
|
|---|
| 1122 | char newValues[realCount];
|
|---|
| 1123 |
|
|---|
| 1124 | memcpy(buf, newValues, count * sizeofDatatype(datatype));
|
|---|
| 1125 | }
|
|---|
| 1126 | }
|
|---|
| 1127 |
|
|---|
| 1128 | $atomic_f void $mpi_comm_empty(MPI_Comm comm, MPI_COMM_MODE mode) {
|
|---|
| 1129 | _Bool empty;
|
|---|
| 1130 |
|
|---|
| 1131 | if (mode == P2P) {
|
|---|
| 1132 | empty = $comm_empty_in(comm->p2p);
|
|---|
| 1133 | empty &= $comm_empty_out(comm->p2p);
|
|---|
| 1134 | } else {
|
|---|
| 1135 | empty = $comm_empty_in(comm->col);
|
|---|
| 1136 | empty &= $comm_empty_out(comm->col);
|
|---|
| 1137 | }
|
|---|
| 1138 | $assert(empty, "Messages are remaining in MPI communicator\n");
|
|---|
| 1139 | }
|
|---|
| 1140 |
|
|---|
| 1141 | #endif
|
|---|
| 1142 |
|
|---|
| 1143 | /********************* Private helper functions *********************/
|
|---|
| 1144 | /* Returns the string literal of MPI collective routine names by
|
|---|
| 1145 | * giving the unique message tag. */
|
|---|
| 1146 | char * $mpi_coroutine_name(int tag) {
|
|---|
| 1147 | switch(tag) {
|
|---|
| 1148 | case 9999: return "MPI_Bcast";
|
|---|
| 1149 | case 9998: return "MPI_Reduce";
|
|---|
| 1150 | case 9997: return "MPI_Allreduce";
|
|---|
| 1151 | case 9996: return "MPI_Gather";
|
|---|
| 1152 | case 9995: return "MPI_Scatter";
|
|---|
| 1153 | case 9994: return "MPI_Gatherv";
|
|---|
| 1154 | case 9993: return "MPI_Scatterv";
|
|---|
| 1155 | case 9992: return "MPI_Allgather";
|
|---|
| 1156 | case 9991: return "MPI_Reduce_scatter";
|
|---|
| 1157 | case 9990: return "MPI_Alltoall";
|
|---|
| 1158 | case 9989: return "MPI_Alltoallv";
|
|---|
| 1159 | case 9988: return "MPI_Alltoallw";
|
|---|
| 1160 | case 9987: return "MPI_Barrier";
|
|---|
| 1161 | case 9986: return "MPI_Commdup";
|
|---|
| 1162 | case 9985: return "MPI_Commfree";
|
|---|
| 1163 | case 9984: return "MPI_Scan";
|
|---|
| 1164 | case 9983: return "MPI_Exscan";
|
|---|
| 1165 | default: $assert($false, "Internal Error: Unexpected MPI routine tag:%d.\n", tag);
|
|---|
| 1166 | }
|
|---|
| 1167 | }
|
|---|
| 1168 |
|
|---|
| 1169 | /**************** Bridging MPI and the comm library *****************/
|
|---|
| 1170 |
|
|---|
| 1171 | $atomic_f $state_f $comm $mpi_get_p2p_channel(MPI_Comm comm) {
|
|---|
| 1172 | return comm->p2p;
|
|---|
| 1173 | }
|
|---|
| 1174 |
|
|---|
| 1175 | $atomic_f $state_f $comm $mpi_get_col_channel(MPI_Comm comm) {
|
|---|
| 1176 | return comm->col;
|
|---|
| 1177 | }
|
|---|
| 1178 |
|
|---|
| 1179 | $atomic_f $state_f int $mpi_comm_size(MPI_Comm comm) {
|
|---|
| 1180 | return $comm_size(comm->p2p);
|
|---|
| 1181 | }
|
|---|
| 1182 |
|
|---|
| 1183 | $atomic_f $state_f int $mpi_comm_place(MPI_Comm comm) {
|
|---|
| 1184 | return $comm_place(comm->p2p);
|
|---|
| 1185 | }
|
|---|
| 1186 |
|
|---|
| 1187 | $atomic_f $state_f $scope $mpi_root_scope(MPI_Comm comm) {
|
|---|
| 1188 | return $mpi_root_scope_system(comm->p2p);
|
|---|
| 1189 | }
|
|---|
| 1190 |
|
|---|
| 1191 | $atomic_f $state_f $scope $mpi_proc_scope(MPI_Comm comm) {
|
|---|
| 1192 | return $mpi_proc_scope_system(comm->p2p);
|
|---|
| 1193 | }
|
|---|
| 1194 |
|
|---|
| 1195 | /**************** Bridging MPI and the collate library ****************/
|
|---|
| 1196 | $atomic_f $state_f $bundle $mpi_check_collective(MPI_Comm comm,
|
|---|
| 1197 | $bundle checking) {
|
|---|
| 1198 | return $collate_check(comm->collator, checking);
|
|---|
| 1199 | }
|
|---|
| 1200 |
|
|---|
| 1201 | #endif
|
|---|
| 1202 |
|
|---|