#ifndef __CIVL_CIVLMPI__ #define __CIVL_CIVLMPI__ #include #include #include #include #include #include #include #include #include #include #include #include typedef struct _mpi_nb_record_t { void *buf; int count; MPI_Datatype datatype; int src_or_dest; int tag; _Bool completed; } _mpi_nb_record_t; struct CIVL_MPI_Comm { $comm p2p; // point-to-point communication $comm col; // collective communication $collator collator; $barrier barrier; int gcommIndex; //the index of the corresponding global communicator. _mpi_nb_record_t * send_records[]; _mpi_nb_record_t * recv_records[]; }; /* Library private helper function declaration */ char * $mpi_coroutine_name(int tag); /**************************** Duplicated Part *************************************/ /* Duplicated definition with the same struct in mpi.h. The reason of this duplication is to make civlmpi.cvl independent with mpi.cvl. */ /* Definition of CMPI_Gcomm (CMPI_Gcomm has a type of __CMPI_Gcomm) and MPI_Comm */ struct $mpi_gcomm { $gcomm p2p; // point-to-point communication $gcomm col; // collective communication $gcollator gcollator; $gbarrier gbarrier; }; // the definition of "struct MPI_Request": struct MPI_Request{ MPI_Comm comm; MPI_Status status; _Bool isSend; // true if this request is associated to a send; otherwise, it is associated to a receive. _mpi_nb_record_t * record; // points to the record }; // Note that the type "MPI_Request" is defined as "pointer-to-struct MPI_Request". /****************************** Helper Functions **********************************/ $state_f size_t $mpi_extentof(MPI_Datatype datatype) { $abstract size_t $mpi_sizeof(MPI_Datatype datatype); return $mpi_sizeof(datatype); } int sizeofDatatype(MPI_Datatype datatype) { size_t result; #ifdef _MPI_CONTRACT /* In MPI contract mode, it's possible that datatype is non-concrete. Then use an abstract function to represent the extent of the datatype. */ if (!$is_concrete_int(datatype)) return $mpi_extentof(datatype); #endif switch (datatype) { case MPI_INT: result = sizeof(int); break; case MPI_2INT: result = (sizeof(int)*2); break; case MPI_FLOAT: result = sizeof(float); break; case MPI_DOUBLE: result = sizeof(double); break; case MPI_CHAR: result = sizeof(char); break; case MPI_BYTE: result = sizeof(char); break; // char is always one byte ? case MPI_SHORT: result = sizeof(short); break; case MPI_LONG: result = sizeof(long); break; case MPI_LONG_DOUBLE: result = sizeof(long double); break; case MPI_LONG_LONG_INT: result = sizeof(long long int); break; case MPI_LONG_LONG: result = sizeof(long long); break; case MPI_UNSIGNED_LONG_LONG: result = sizeof(unsigned long long); break; default: $assert(0, "Unreachable"); } #ifdef _MPI_CONTRACT /* In the case that a datatype X which is previously non-concrete and whose extent $mpi_sizeof(X) was used. Later X is simplified to a concrete number representing 'int', we need somehow associate SIZEOF_INT to $mpi_sizeof(X) by assume the following predicate. */ $assume($mpi_extentof(datatype) == result); #endif return result; } void * $mpi_malloc(int count, MPI_Datatype datatype) { switch (datatype) { case MPI_INT: case MPI_SHORT: case MPI_LONG: case MPI_LONG_LONG_INT: case MPI_LONG_LONG: case MPI_UNSIGNED_LONG_LONG: return (int *)malloc(sizeof(int) * count); case MPI_2INT: return (int *)malloc(2 * count * sizeof(int)); case MPI_FLOAT: case MPI_DOUBLE: case MPI_LONG_DOUBLE: return ($real *)malloc(count * sizeof($real)); case MPI_CHAR: case MPI_BYTE: return (char *)malloc(count * sizeof(char)); default: $assert(0, "Unreachable"); } return (void *)0; } /************************** MPI LIB Implementations *******************************/ $mpi_gcomm $mpi_gcomm_create($scope scope, int size) { $mpi_gcomm result; result.p2p = $gcomm_create(scope, size); result.col = $gcomm_create(scope, size); result.gcollator = $gcollator_create(scope, size); result.gbarrier = $gbarrier_create(scope, size); return result; } void $mpi_gcomm_destroy($mpi_gcomm gc) { /* This function will report errors for any messages remaining the $mpi_gcomm. Those messages are junk messages. */ int numJunkRecord; int numJunkMsg; $message junkMsgs[]; // A CIVL-C sequence for junk messages. $seq_init(&junkMsgs, 0, NULL); numJunkMsg = $gcomm_destroy(gc.p2p, &junkMsgs); /* Informations of reporting junk messages in p2p communicator and collective communicator are different: */ for(int i = 0; i < numJunkMsg; i++) { int src, dest, tag; src = $message_source(junkMsgs[i]); dest = $message_dest(junkMsgs[i]); tag = $message_tag(junkMsgs[i]); $assert($false, "MPI message leak: There is a message from rank %d to rank %d with tag %d " "has been sent but is never received in point-to-point communication.", src, dest, tag); } numJunkMsg = $gcomm_destroy(gc.col, &junkMsgs); for(int i = 0; i < numJunkMsg; i++) { int src, tag; char * routine; src = $message_source(junkMsgs[i]); tag = $message_tag(junkMsgs[i]); routine = $mpi_coroutine_name(tag); $assert($false, "MPI message leak: There is a message sent by rank %d for collective routine %s" " that is never received.", src, routine); } $gcollator_destroy(gc.gcollator); $gbarrier_destroy(gc.gbarrier); } MPI_Comm $mpi_comm_create($scope scope, $mpi_gcomm gc, int rank) { MPI_Comm result = (MPI_Comm)$malloc(scope, sizeof(*result));; result->p2p = $comm_create(scope, gc.p2p, rank); result->col = $comm_create(scope, gc.col, rank); result->collator = $collator_create(gc.gcollator, scope, rank); result->barrier = $barrier_create(scope, gc.gbarrier, rank); result->gcommIndex = 0; $seq_init(&result->send_records, 0, NULL); $seq_init(&result->recv_records, 0, NULL); return result; } void $mpi_comm_destroy(MPI_Comm comm, $mpi_state mpi_state) { #ifndef _MPI_CONTRACT if(comm->gcommIndex == 0) $assert(mpi_state == _MPI_FINALIZED, "Process terminates without " "calling MPI_Finalize() first."); #endif $comm_destroy(comm->p2p); $comm_destroy(comm->col); $free(comm->collator); $barrier_destroy(comm->barrier); $free(comm); } void * $mpi_pointer_add(const void * ptr, int offset, MPI_Datatype datatype) { #ifdef _MPI_CONTRACT if (!$is_concrete_int(datatype)) { int datatypeExtent = $mpi_extentof(datatype); return $pointer_add(ptr, offset * datatypeExtent, sizeof(char)); } #endif int type_size = sizeofDatatype(datatype); return $pointer_add(ptr, offset, type_size); } /********************* Lower level MPI routines *********************/ /* CMPI_Send and CMPI_Recv are a pair of send receives functions that help implementing MPI routines. They should never be block which means no potential deadlocks related to these functions */ int $mpi_send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) { if (dest >= 0) { int size = count*sizeofDatatype(datatype); int place = $comm_place(comm->p2p); $message out; $elaborate(dest); out = $message_pack(place, dest, tag, buf, size); $comm_enqueue(comm->p2p, out); } return 0; } /* This function 1) allocates an MPI Request object and assigns the address of the object to the handle "*request"; and 2) creates a record carrying all the information of this send operation and enqueues the record into the send record queue associated to "comm"; and 3) enqueues the message into the message channel as if it is a standard $mpi_send. A pointer to the created record is saved in the allocated MPI Request object. Step 2) may be skipped if only checking absolute deadlock. */ int $mpi_isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request * request) { if (dest >= 0) { int place = $comm_place(comm->p2p); // Isend allocates an MPI_Request object (MPI-3.0 sec 3.7.2): MPI_Request req = (MPI_Request) malloc(sizeof(struct MPI_Request)); req->comm = comm; req->status.MPI_SOURCE = place; req->status.MPI_TAG = tag; req->status.MPI_ERROR = 0; req->status.size = count*sizeofDatatype(datatype); req->isSend = 1; req->record = (_mpi_nb_record_t *)malloc(sizeof(_mpi_nb_record_t)); req->record->buf = (void*)buf; req->record->count = count; req->record->datatype = datatype; req->record->src_or_dest = dest; req->record->tag = tag; req->record->completed = 0; *request = req; $seq_append(&comm->send_records, &req->record, 1); return $mpi_send(buf, count, datatype, dest, tag, comm); } else *request = MPI_REQUEST_NULL; return 1; } int $mpi_recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status) { if ((source >= 0 || source == MPI_ANY_SOURCE)) { $message in; int place = $comm_place(comm->p2p); int deterministicTag; $assert(tag == -2 || tag >= 0, "Illegal MPI message receive tag %d.\n", tag); deterministicTag = tag < 0 ? -2 : tag; $elaborate(source); in = $comm_dequeue(comm->p2p, source, deterministicTag); int size = count*sizeofDatatype(datatype); $message_unpack(in, buf, size); if (status != MPI_STATUS_IGNORE) { status->size = $message_size(in); status->MPI_SOURCE = $message_source(in); status->MPI_TAG = $message_tag(in); status->MPI_ERROR = 0; } } return 0; } /* This function 1) allocates an MPI Request object and assigns the address of the object to the handle "*request"; and 2) creates a record carrying all the information of this receive operation and enqueues the record into the receive record queue associated to "comm". A pointer to the created record is saved in the allocated MPI Request object. */ int $mpi_irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request * request) { if ((source >= 0 || source == MPI_ANY_SOURCE)) { // Irecv allocates an MPI_Request object (MPI-3.0 sec 3.7.2): MPI_Request req = (MPI_Request) malloc(sizeof(struct MPI_Request)); req->comm = comm; req->isSend = $false; req->record = (_mpi_nb_record_t*)malloc(sizeof(_mpi_nb_record_t)); req->record->buf = buf; req->record->count = count; req->record->datatype = datatype; req->record->src_or_dest = source; req->record->tag = tag; req->record->completed = $false; $seq_append(&comm->recv_records, &req->record, 1); *request = req; } else *request = MPI_REQUEST_NULL; return 1; } int $mpi_sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status) { int deterministicRecvTag; $assert(sendtag >= 0, "MPI sendtag should be greater than or equal to zero"); $assert(recvtag == -2 || recvtag >= 0, "Illegal MPI message receive tag %d.\n", recvtag); deterministicRecvTag = recvtag < 0 ? -2 : recvtag; if((dest >= 0) && ((source >= 0 || source == MPI_ANY_SOURCE))) { $message out, in; int size = sendcount*sizeofDatatype(sendtype); int place = $comm_place(comm->p2p); out = $message_pack(place, dest, sendtag, sendbuf, size); $elaborate(source); $elaborate(dest); $choose { $when($true){ $comm_enqueue(comm->p2p, out); in = $comm_dequeue(comm->p2p, source, deterministicRecvTag); } $when($false){ /* This $choose branch plays a trick which correctly implements the sendrecv() semantically. Such a branch ensures that there is no chance of potential deadlocks when all processes do send then recv collectively. However, effectively, this branch is no need and never will be executed.*/ in = $comm_dequeue(comm->p2p, source, deterministicRecvTag); $comm_enqueue(comm->p2p, out); } } size = recvcount*sizeofDatatype(recvtype); $message_unpack(in, recvbuf, size); if (status != MPI_STATUS_IGNORE) { status->size = $message_size(in); status->MPI_SOURCE = $message_source(in); status->MPI_TAG = $message_tag(in); status->MPI_ERROR = 0; } } else if (dest >= 0) $mpi_send(sendbuf, sendcount, sendtype, dest, sendtag, comm); else if (source >= 0 || source == MPI_ANY_SOURCE) $mpi_recv(recvbuf, recvcount, recvtype, source, deterministicRecvTag, comm, status); return 0; } /* This function completes a non-blocking send operation and removes the record associated to the send operation in the record queue. This function can potentially be blocked at a state where there is no matching receive for this send operation. requires "request" to be valid and "status" to be either valid or MPI_STATUS_IGNORE */ int $mpi_wait_send(MPI_Request * request, MPI_Status * status) { // TODO: To check potential deadlock, need to look through the message // channel and the send record queue at the same time to determine // if there is (or was) a matching receive enabled for this send // operation. Currently, the model only detects absolute deadlock. MPI_Request req = *request; $assert(req->isSend); if (status != MPI_STATUS_IGNORE) $copy(status, &(req->status)); int queue_size = $seq_length(&(req->comm->send_records)); for (int i = 0; i < queue_size; i++) { if (req->comm->send_records[i] == req->record) { $seq_remove(&(req->comm->send_records), i, NULL, 1); break; } } $free(req->record); return 1; } /* This function completes the receive operation associated to the "request" handle and assigns "MPI_Status" data to "*status". requires "request" to be valid and "status" to be either valid or MPI_STATUS_IGNORE */ int $mpi_wait_recv(MPI_Request * request, MPI_Status * status) { MPI_Request req = *request; _mpi_nb_record_t * record = req->record; MPI_Comm comm = req->comm; int queue_size = $seq_length(&comm->recv_records); $assert(!req->isSend); // if the operation of "*request" has already completed: if (record->completed) { if (status != MPI_STATUS_IGNORE) { status->MPI_SOURCE = record->src_or_dest; status->MPI_TAG = record->tag; status->MPI_ERROR = 0; status->size = record->count * sizeofDatatype(record->datatype); } for (int i = 0; i < queue_size; i++) if (comm->recv_records[i] == record) { $seq_remove(&comm->recv_records, i, NULL, 1); break; } $free(record); return 1; } $elaborate(record->src_or_dest); /* For any wildcard record "wr", on which "record" depends, "wr" must be completed first. But we can only over-approximate the dependency relation. */ for (int i = 0; i < queue_size; i++) { _mpi_nb_record_t * wr = comm->recv_records[i]; if (wr->completed) continue; if (wr == record) break; $elaborate(wr->src_or_dest); if (wr->src_or_dest == MPI_ANY_SOURCE && (wr->tag == record->tag || wr->tag == MPI_ANY_TAG || record->tag == MPI_ANY_TAG)) { // Over-approximately "record" depends on "wr". If receive of // "wr" can be enabled, we always let "wr" go first. In this // case, "record" may or may not depend on "wr". It is still // correct to let "wr" go first as it is already matched anyway. // If only the operation of "record" can be enabled, it means // that "record" in fact does not depend on "wr". We can ignore // "wr". Otherwise, blocked. _Bool wr_enabled = 0; MPI_Status tmp_status; $choose { $when($comm_probe(comm->p2p, MPI_ANY_SOURCE, wr->tag)) wr_enabled = 1; $when(!$comm_probe(comm->p2p, MPI_ANY_SOURCE, wr->tag) && $comm_probe(comm->p2p, record->src_or_dest, record->tag)) ; } if (wr_enabled) { $mpi_recv(wr->buf, wr->count, wr->datatype, wr->src_or_dest, wr->tag, comm, &tmp_status); wr->src_or_dest = tmp_status.MPI_SOURCE; wr->tag = tmp_status.MPI_TAG; wr->completed = 1; continue; } } } // At this point, there is no wildcard receive record on which // "record" depends on precedes "record" in queue. $message in; _Bool recv_again; int idx = -1; do { in = $comm_dequeue(comm->p2p, record->src_or_dest, record->tag); // check if any preceding record matches this message: recv_again = 0; for (int i = 0; i < queue_size; i++) { _mpi_nb_record_t * r = comm->recv_records[i]; if (r->completed) continue; if (r == record) idx = i; if (r == record || (r->src_or_dest == $message_source(in) && (r->tag == $message_tag(in) || r->tag == MPI_ANY_TAG))) { // this message belongs to r: r->src_or_dest = $message_source(in); r->tag = $message_tag(in); $message_unpack(in, r->buf, r->count * sizeofDatatype(r->datatype)); r->completed = 1; recv_again = r != record; break; } } } while (recv_again); if (status != MPI_STATUS_IGNORE) { status->MPI_SOURCE = $message_source(in); status->MPI_TAG = $message_tag(in); status->MPI_ERROR = 0; status->size = $message_size(in); } $seq_remove(&comm->recv_records, idx, NULL, 1); free(record); return 1; } int $mpi_wait(MPI_Request * request, MPI_Status * status) { if (*request == MPI_REQUEST_NULL) return 1; if ((*request)->isSend) $mpi_wait_send(request, status); else $mpi_wait_recv(request, status); $free(*request); *request = MPI_REQUEST_NULL; return 1; } /********************* Collective helper functions ********************/ /* Note: collective helpers functions are functions have same behaviors as MPI collective functions, it can be re-used as a part of implementation by different MPI routines. For example, MPI_Allreduce will call CMPI_Reduce and CMPI_Bcast, both of them should throw errors (if encounters any) as if errors are thrown from MPI_Allreduce. */ int $mpi_collective_send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) { if (dest >= 0) { int size = count*sizeofDatatype(datatype); int place = $comm_place(comm->col); $message out = $message_pack(place, dest, tag, buf, size); $elaborate(dest); $comm_enqueue(comm->col, out); } return 0; } int $mpi_collective_recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status * status, char * routName) { if(source >= 0 || source == MPI_ANY_SOURCE) { $elaborate(source); $message in = $comm_dequeue(comm->col, source, MPI_ANY_TAG); int size = count*sizeofDatatype(datatype); int recvTag; /* This routine should only be used by collective routines, there is no non-deterministic tags for collective routines.*/ recvTag = $message_tag(in); $assert (recvTag == tag, "Collective routine %s receives a " "message with a mismatched tag\n", routName); $message_unpack(in, buf, size); if (status != MPI_STATUS_IGNORE) { status->size = $message_size(in); status->MPI_SOURCE = $message_source(in); status->MPI_TAG = recvTag; status->MPI_ERROR = 0; } } return 0; } /* Broadcast helper function that uses any specified message tag */ int $mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, int tag, MPI_Comm comm, char * routName) { if ($comm_place(comm->col) == root) { int nprocs = $comm_size(comm->col); for (int i=0; icol); if (rank != root) $mpi_collective_send(sendbuf, count, datatype, root, tag, comm); else { int nprocs = $comm_size(comm->col); int size; size = count * sizeofDatatype(datatype); memcpy(recvbuf, sendbuf, size); for (int i = 0; icol, i, MPI_ANY_TAG); /* Collective routines have no non-deterministic tags.*/ colTag = $message_tag(in); $assert (colTag == tag , "Collective routine %s receives a " "message with a mismatched tag\n", routName); /* the third argument "count" indicates the number of cells needs doing the operation. */ applybuf = $mpi_malloc(count, datatype); $bundle_unpack_apply(in.data, recvbuf, op, count, applybuf); memcpy(recvbuf, applybuf, size); free(applybuf); $assert (in.size <= size , "Message of size %d exceeds the specified size %d.", in.size, size); } } } return 0; } /* Gathering helper function that uses any specified message tag */ int $mpi_gather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, int tag, MPI_Comm comm, char * routName){ int rank, nprocs; MPI_Status status; rank = $comm_place(comm->col); nprocs = $comm_size(comm->col); /* MPI standard requirement: * For root process, sendtype must be equal to * recvtype. */ if(rank == root) $assert (sendtype == recvtype, "%s asks for equality " "between 'sendtype' and 'recvtype'.", routName); /* MPI_standard requirement: * Only root process can use MPI_IN_PLACE*/ if(sendbuf == MPI_IN_PLACE){ $assert (root == rank, "Only root can replace 'sendbuf' with 'MPI_IN_PLACE'."); } else if(root == rank) { void * ptr; $assert(sendcount == recvcount, "Root process of routine %d without using" " MPI_IN_PLACE should give the same value for recvcount and sendcount", routName); ptr = $mpi_pointer_add(recvbuf, root * recvcount, recvtype); memcpy(ptr, sendbuf, recvcount * sizeofDatatype(recvtype)); } else $mpi_collective_send(sendbuf, sendcount, sendtype, root, tag, comm); /* Root process receives messages and put them in right places */ if(rank == root){ int real_recvcount; int offset; for(int i=0; icol); nprocs = $comm_size(comm->col); /* MPI standard requirement: * For root process, sendtype must be equal to * recvtype. */ if(rank == root) $assert(sendtype == recvtype, "%s asks for equality " "between 'sendtype' and 'recvtype'.", routName); /* MPI_standard requirement: * Only root process can use MPI_IN_PLACE*/ if(sendbuf == MPI_IN_PLACE){ $assert(root == rank, "Only root can replace 'sendbuf' with 'MPI_IN_PLACE'."); }else if(root == rank) { void * ptr; $assert(sendcount == recvcounts[root], "For routine %s, recvcounts[%d] " "should be same as the sendcount of the process with rank %d.\n", routName, root, root); ptr = $mpi_pointer_add(recvbuf, displs[rank], recvtype); memcpy(ptr, sendbuf, sendcount * sizeofDatatype(recvtype)); }else{ $mpi_collective_send(sendbuf, sendcount, sendtype, root, tag, comm); } /* Root process receives messages and put them in right places */ if(rank == root){ int real_recvcount; MPI_Status status; for(int i=0; icol); nprocs = $comm_size(comm->col); /* MPI standard requirement: * For root process, sendtype must be equal to * recvtype. */ if(rank == root) $assert(sendtype == recvtype, "MPI_Scatter() asks for equality " "between 'sendtype' and 'recvtype'."); /* MPI_standard requirement: * Only root process can use MPI_IN_PLACE */ if(recvbuf == MPI_IN_PLACE){ $assert(root == rank, "Only root can replace 'recvbuf' with 'MPI_IN_PLACE'."); }else if(rank == root) { void * ptr; $assert(sendcount == recvcount, "Root process of routine %d without using" " MPI_IN_PLACE should give the same value for recvcount and sendcount", routName); ptr = $mpi_pointer_add(sendbuf, root*recvcount, sendtype); memcpy(recvbuf, ptr, sizeofDatatype(recvtype)*recvcount); } /* Root process scatters data to other processes */ if(rank == root){ int offset; for(int i=0; icol); nprocs = $comm_size(comm->col); /* MPI standard requirement: * For root process, sendtype must be equal to * recvtype. */ if(rank == root) $assert(sendtype == recvtype, "%s asks for equality " "between 'sendtype' and 'recvtype'.", routName); /* MPI_standard requirement: * Only root process can use MPI_IN_PLACE */ if(recvbuf == MPI_IN_PLACE){ $assert(root == rank, "Only root can replace 'recvbuf' with 'MPI_IN_PLACE'."); } else if(rank == root) { void * ptr; $assert(sendcounts[root] == recvcount, "For routine %s, sendcounts[%d] " "should be same as the recvcount of the process with rank %d.\n", routName, root, root); ptr = $mpi_pointer_add(sendbuf, displs[root], sendtype); memcpy(recvbuf, ptr, recvcount*sizeofDatatype(recvtype)); } /* Root process scatters data to other processes */ if(rank == root){ for(int i=0; icol); nprocs = $comm_size(comm->col); /* Each process do reduction from 0 .. rank (inclusive): * Send to process rank + 1 ... nprocs-1 (inclusive) * Recv from process 0 .. rank (exclusive) */ for (int i = place + 1; i < nprocs; i++) $mpi_collective_send(sendbuf, count, datatype, i, SCAN_TAG, comm); if (sendbuf != MPI_IN_PLACE) memcpy(recvbuf, sendbuf, size); for (int i = 0; i < place; i++) { $message in = $comm_dequeue(comm->col, i, SCAN_TAG); // so far, unpack_apply requires that 'recvbuf' is not aliasing // 'tmp_space' (can be improved in the future): $bundle_unpack_apply(in.data, recvbuf, op, count, tmp_space); // swap tmp_space with recvbuf tmp_ptr = tmp_space; tmp_space = recvbuf; recvbuf = tmp_ptr; $assert (in.size <= size , "Message of size %d exceeds the specified size %d.", in.size, size); } if (recvbuf == tmp_space_const) memcpy(tmp_space, recvbuf, size); free(tmp_space_const); return 0; } /* The helper function for (the exclusive) MPI_Exscan */ int $mpi_exscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int place, nprocs; int size = count * sizeofDatatype(datatype); void * tmp_space = $mpi_malloc(count, datatype); void * tmp_ptr, * tmp_space_const = tmp_space; place = $comm_place(comm->col); nprocs = $comm_size(comm->col); /* The “in place” option for intracommunicators is specified by * passing MPI_IN_PLACE in the sendbuf argument. In this case, the * input data is taken from the receive buffer, and replaced by the * output data. The receive buffer on rank 0 is not changed by this * operation. */ if (sendbuf == MPI_IN_PLACE) sendbuf = recvbuf; /* Each process do reduction from 0 .. rank (inclusive): * Send to process rank + 1 ... nprocs-1 (inclusive) * Recv from process 0 .. rank (exclusive) */ for (int i = place + 1; i < nprocs; i++) $mpi_collective_send(sendbuf, count, datatype, i, EXSCAN_TAG, comm); // no-op for rank 0 if (place != 0) { $message in = $comm_dequeue(comm->col, 0, EXSCAN_TAG); $bundle_unpack(in.data, recvbuf); for (int i = 1; i < place; i++) { in = $comm_dequeue(comm->col, i, EXSCAN_TAG); // so far, unpack_apply requires that 'recvbuf' is not aliasing // 'tmp_space' (can be improved in the future): $bundle_unpack_apply(in.data, recvbuf, op, count, tmp_space); // swap tmp_space with recvbuf tmp_ptr = tmp_space; tmp_space = recvbuf; recvbuf = tmp_ptr; $assert (in.size <= size , "Message of size %d exceeds the specified size %d.", in.size, size); } } if (recvbuf == tmp_space_const) memcpy(tmp_space, recvbuf, size); free(tmp_space_const); return 0; } /* ******************** End of collective routines ********************* */ int $mpi_comm_dup($scope scope, MPI_Comm comm, MPI_Comm * newcomm, char * routName) { int place = $comm_place(comm->col); $mpi_gcomm newgcomm; int idx; $scope CMPI_ROOT_SCOPE = $mpi_root_scope(comm); if(place == 0) { int size = $comm_size(comm->col); newgcomm = $mpi_gcomm_create(CMPI_ROOT_SCOPE, size); idx = $mpi_new_gcomm(CMPI_ROOT_SCOPE, newgcomm); } $mpi_bcast(&idx, 1, MPI_INT, 0, COMMDUP_TAG, comm, routName); newgcomm = $mpi_get_gcomm(CMPI_ROOT_SCOPE, idx); (*newcomm) = $mpi_comm_create(scope, newgcomm, place); (*newcomm)->gcommIndex = idx; $barrier_call(comm->barrier); $gcomm_dup(comm->p2p, (*newcomm)->p2p); $gcomm_dup(comm->col, (*newcomm)->col); $barrier_call(comm->barrier); return 0; } int $mpi_comm_free(MPI_Comm *comm, $mpi_state mpi_state) { int place = $comm_place((*comm)->col); int size = $comm_size((*comm)->col); int buf[size]; int gcommIndex = (*comm)->gcommIndex; $scope CMPI_ROOT_SCOPE = $mpi_root_scope(*comm); //TODO: $mpi_gather here is just a ugly synchronization $mpi_gather(&place, 1, MPI_INT, buf, 1, MPI_INT, 0, COMMFREE_TAG, (*comm), "MPI_Comm_free synchronization."); $mpi_comm_destroy(*comm, mpi_state); if(place == 0) { $mpi_gcomm temp = $mpi_get_gcomm(CMPI_ROOT_SCOPE, gcommIndex); $mpi_gcomm_destroy(temp); } return 0; } $bundle $mpi_create_coroutine_entry(int routineTag, int root, int op, int numDatatypes, int * datatypes) { int zero = 0; $bundle bundledEntry; struct Entry { int routine_tag; int root; int op; int numTypes; int datatypes[]; }entry; entry.routine_tag = routineTag; entry.root = root; entry.op = op; entry.numTypes = numDatatypes; $seq_init(&entry.datatypes, numDatatypes, &zero); for(int i = 0; i < numDatatypes; i++) entry.datatypes[i] = datatypes[i]; bundledEntry = $bundle_pack(&entry, sizeof(struct Entry)); return bundledEntry; } void $mpi_diff_coroutine_entries($bundle specEntry, $bundle mineEntry, int rank) { struct Entry { int routine_tag; int root; int op; int numTypes; int datatypes[]; }spec, mine; char * routine; int numTypes; $bundle_unpack(specEntry, &spec); $bundle_unpack(mineEntry, &mine); routine = $mpi_coroutine_name(spec.routine_tag); if(spec.routine_tag != mine.routine_tag) { char * mineRoutine = $mpi_coroutine_name(mine.routine_tag); $assert($false, "Process with rank %d reaches an MPI collective routine " "%s while at least one of others are collectively reaching %s.", rank, mineRoutine, routine); } else if(spec.root != mine.root) { $assert($false, "Process with rank %d reaches an MPI collective routine " "%s which has a different root with at least one of others.", rank, routine); } else if(spec.op != mine.op) { $assert($false, "Process with rank %d reaches an MPI collective routine " "%s which has a different MPI_Op with at least one of others", rank, routine); } else if(spec.numTypes != mine.numTypes) { $assert($false, "Process with rank %d reaches an MPI collective routine " "%s which has an inconsistent datatype specification with at least" " one of others", rank, routine); } numTypes = spec.numTypes; for(int i = 0; i < numTypes; i++) if(spec.datatypes[i] != mine.datatypes[i]) { $assert($false, "Process with rank %d reaches an MPI collective routine " "%s which has an inconsistent datatype specification with at " "least one of others", rank, routine); break; } } int $mpi_barrier(MPI_Comm comm) { $barrier_call(comm->barrier); return 0; } #ifdef _MPI_CONTRACT $collate_state $mpi_snapshot(MPI_Comm comm, $scope scope) { return $collate_arrives(comm->collator, scope); } void $mpi_unsnapshot(MPI_Comm comm, $collate_state cs) { $collate_departs(comm->collator, cs); } void $mpi_assigns(void * buf, int count, MPI_Datatype datatype) { if ($is_concrete_int(datatype)) { size_t size = sizeofDatatype(datatype); int _int[count]; int _2int[count * 2]; char _char[count]; $real _real[count]; switch (datatype) { case MPI_INT: case MPI_SHORT: case MPI_LONG: case MPI_LONG_LONG_INT: case MPI_LONG_LONG: case MPI_UNSIGNED_LONG_LONG: memcpy(buf, _int, count * size); break; case MPI_2INT: memcpy(buf, _2int, 2 * count * size); break; case MPI_FLOAT: case MPI_DOUBLE: case MPI_LONG_DOUBLE: memcpy(buf, _real, count * size); break; case MPI_CHAR: case MPI_BYTE: memcpy(buf, _char, count * size); break; default: $assert(0, "Unreachable"); } } else { size_t realCount = count * $mpi_extentof(datatype); char newValues[realCount]; memcpy(buf, newValues, count * sizeofDatatype(datatype)); } } $atomic_f void $mpi_comm_empty(MPI_Comm comm, MPI_COMM_MODE mode) { _Bool empty; if (mode == P2P) { empty = $comm_empty_in(comm->p2p); empty &= $comm_empty_out(comm->p2p); } else { empty = $comm_empty_in(comm->col); empty &= $comm_empty_out(comm->col); } $assert(empty, "Messages are remaining in MPI communicator\n"); } #endif /********************* Private helper functions *********************/ /* Returns the string literal of MPI collective routine names by * giving the unique message tag. */ char * $mpi_coroutine_name(int tag) { switch(tag) { case 9999: return "MPI_Bcast"; case 9998: return "MPI_Reduce"; case 9997: return "MPI_Allreduce"; case 9996: return "MPI_Gather"; case 9995: return "MPI_Scatter"; case 9994: return "MPI_Gatherv"; case 9993: return "MPI_Scatterv"; case 9992: return "MPI_Allgather"; case 9991: return "MPI_Reduce_scatter"; case 9990: return "MPI_Alltoall"; case 9989: return "MPI_Alltoallv"; case 9988: return "MPI_Alltoallw"; case 9987: return "MPI_Barrier"; case 9986: return "MPI_Commdup"; case 9985: return "MPI_Commfree"; case 9984: return "MPI_Scan"; case 9983: return "MPI_Exscan"; default: $assert($false, "Internal Error: Unexpected MPI routine tag:%d.\n", tag); } } /**************** Bridging MPI and the comm library *****************/ $atomic_f $state_f $comm $mpi_get_p2p_channel(MPI_Comm comm) { return comm->p2p; } $atomic_f $state_f $comm $mpi_get_col_channel(MPI_Comm comm) { return comm->col; } $atomic_f $state_f int $mpi_comm_size(MPI_Comm comm) { return $comm_size(comm->p2p); } $atomic_f $state_f int $mpi_comm_place(MPI_Comm comm) { return $comm_place(comm->p2p); } $atomic_f $state_f $scope $mpi_root_scope(MPI_Comm comm) { return $mpi_root_scope_system(comm->p2p); } $atomic_f $state_f $scope $mpi_proc_scope(MPI_Comm comm) { return $mpi_proc_scope_system(comm->p2p); } /**************** Bridging MPI and the collate library ****************/ $atomic_f $state_f $bundle $mpi_check_collective(MPI_Comm comm, $bundle checking) { return $collate_check(comm->collator, checking); } #endif