#ifndef __CIVL_MPI__ #define __CIVL_MPI__ #include #include #include #include #include #include #include #include #include #include extern const int MPI_IN_PLACE_SPOT = 0; /* Completed definition for mpi-common.h */ $mpi_state _mpi_state=_MPI_UNINIT; /************************** MPI LIB Implementations *******************************/ int $mpi_init(void) { $assert(_mpi_state == _MPI_UNINIT, "Process can only call MPI_Init() at most once."); _mpi_state = _MPI_INIT; return 0; } int MPI_Finalize(void) { $assert(_mpi_state == _MPI_INIT, "Process can only call MPI_Finalize() after the " "MPI enviroment is created and before cleaned."); _mpi_state = _MPI_FINALIZED; return 0; } double MPI_Wtime() { double result; int CMPI_time_count = $next_time_count(); $assert(_mpi_state == _MPI_INIT, "MPI_Wtime() cannot be invoked " "without MPI_Init() being called before.\n"); result = $mpi_time(CMPI_time_count); if (CMPI_time_count > 0) { $assume(result > $mpi_time(CMPI_time_count-1)); } else { $assume(result > 0); } return result; } int MPI_Comm_size(MPI_Comm comm, int *size) { #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Comm_size() cannot be " "invoked without MPI_Init() being called before.\n"); #endif *size = $mpi_comm_size(comm); return 0; } int MPI_Comm_rank(MPI_Comm comm, int *rank) { #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Comm_rank() cannot be " "invoked without MPI_Init() being called before.\n"); #endif *rank = $mpi_comm_place(comm); return 0; } int MPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) { $assert(_mpi_state == _MPI_INIT, "MPI_Send() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING MPI_Request request; $mpi_isend(buf, count, datatype, dest, tag, comm, &request); $mpi_wait(&request, MPI_STATUS_IGNORE); #elif defined(_MPI_BLOCKING) $mpi_send(buf, count, datatype, dest, tag, comm); #endif return 1; } int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request * request) { $assert(_mpi_state == _MPI_INIT, "MPI_Isend() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING return $mpi_isend(buf, count, datatype, dest, tag, comm, request); #elif defined(_MPI_BLOCKING) $assert(0, "MPI_Isend is not supported in the MPI blocking implementation."); return 0; #endif } int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status) { $assert(_mpi_state == _MPI_INIT, "MPI_Recv() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING MPI_Request request; $mpi_irecv(buf, count, datatype, source, tag, comm, &request); $mpi_wait(&request, status); #elif defined(_MPI_BLOCKING) $mpi_recv(buf, count, datatype, source, tag, comm, status); #endif return 1; } int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request * request) { $assert(_mpi_state == _MPI_INIT, "MPI_Irecv() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING return $mpi_irecv(buf, count, datatype, source, tag, comm, request); #elif defined(_MPI_BLOCKING) $assert(0, "MPI_Irecv is not supported in the MPI blocking implementation."); return 0; #endif } int MPI_Wait(MPI_Request * request, MPI_Status * status) { $assert(_mpi_state == _MPI_INIT, "MPI_Wait() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING $mpi_wait(request, status); return 1; #elif defined(_MPI_BLOCKING) $assert(0, "MPI_Wait is not supported in the MPI blocking implementation."); return 0; #endif } int MPI_Waitall(int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[]) { $assert(_mpi_state == _MPI_INIT, "MPI_Waitall() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING $for (int i : 0 .. count-1) { MPI_Status * status = array_of_statuses == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : array_of_statuses + i; MPI_Request * req = array_of_requests + i; $mpi_wait(req, status); } return 1; #elif defined(_MPI_BLOCKING) $assert(0, "MPI_Waitall is not supported in the MPI blocking implementation."); return 0; #endif } int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status) { $assert(_mpi_state == _MPI_INIT, "MPI_Test() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_NON_BLOCKING if (*request == MPI_REQUEST_NULL) { *flag = 1; return 1; } $choose { $when ($true) { $mpi_wait(request, status); *flag = 1; } $when ($true) *flag = 0; } return 1; #elif defined(_MPI_BLOCKING) $assert(0, "MPI_Test is not supported in the MPI blocking implementation."); return 0; #endif } int MPI_Request_free(MPI_Request * request) { // The standard does not say this function accepts MPI_REQUEST_NULL #ifdef _MPI_NON_BLOCKING $free(*request); *request = MPI_REQUEST_NULL; return 1; #elif defined(_MPI_BLOCKING) $assert(0, "MPI_Request_free is not supported in the MPI blocking implementation."); return 0; #endif } int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) { #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Get_count() cannot be invoked " "without MPI_Init() being called before.\n"); #endif *count = status->size/sizeofDatatype(datatype); return 0; } int MPI_Get_processor_name(char * name, int * resultlen) { $abstract int MPI_GET_PROCESSOR_NAME(char *, int *); return MPI_GET_PROCESSOR_NAME(name, resultlen); } int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status) { $assert(_mpi_state == _MPI_INIT, "MPI_Sendrecv() cannot be invoked " "without MPI_Init() being called before.\n"); #ifdef _MPI_CONTRACT $elaborate(dest); $elaborate(source); #else $mpi_check_buffer(sendbuf, sendcount, sendtype); #endif // not correct for checking potential deadlock...rewrite: $mpi_sendrecv(sendbuf, sendcount, sendtype, dest, sendtag, recvbuf, recvcount, recvtype, source, recvtag, comm, status); return 0; } /******************************** Collective ***********************************/ /* Broadcasts a message from root to everyone else. * Need to use a differnt comm. */ int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[1] = {(int)datatype}; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert (_mpi_state == _MPI_INIT, "MPI_Bcast() cannot be invoked without MPI_Init() " "being called before.\n"); #endif if(place == root) $mpi_check_buffer(buf, count, datatype); checkerEntry = $mpi_create_coroutine_entry(BCAST_TAG, root, -1, 1, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_bcast(buf, count, datatype, root, BCAST_TAG, comm, "MPI_Bcast()"); return 0; } /* Reduces values on all processes to a single value */ int MPI_Reduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[1] = {(int)datatype}; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert (_mpi_state == _MPI_INIT, "MPI_Reduce() cannot be invoked without " "MPI_Init() being called before.\n"); #endif checkerEntry = $mpi_create_coroutine_entry(REDUCE_TAG, root, (int)op, 1, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_check_buffer(sendbuf, count, datatype); $assert(0 <= op && op <= 13, "unknown MPI reduce operation"); // refer to op.h & mpi.h in ABC/src/include for how MPI_Op is defined $mpi_reduce(sendbuf, recvbuf, count, datatype, op, root, REDUCE_TAG, comm, "MPI_Reduce()"); return 0; } /* Combines values from all processes and distributes the result back to all processes */ /* default root is 0 */ int MPI_Allreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int root = 0; int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[1] = {(int)datatype}; MPI_Status status; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Allreduce() cannot be invoked without " "MPI_Init() being called before.\n"); #endif $mpi_check_buffer(sendbuf, count, datatype); checkerEntry = $mpi_create_coroutine_entry(ALLREDUCE_TAG, root, (int)op, 1, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $assert(0 <= op && op <= 13, "unknown MPI reduce operation"); // refer to op.h & mpi.h in ABC/src/include for how MPI_Op is defined $mpi_reduce(sendbuf, recvbuf, count, datatype, op, root, ALLREDUCE_TAG, comm, "MPI_Allreduce()"); $mpi_bcast(recvbuf, count, datatype, root, ALLREDUCE_TAG, comm, "MPI_Allreduce()"); return 0; } int MPI_Barrier(MPI_Comm comm){ int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Barrier() cannot be invoked " "without MPI_Init() being called before.\n"); #endif checkerEntry = $mpi_create_coroutine_entry(BARRIER_TAG, 0, -1, 0, NULL); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_barrier(comm); return 0; } /* 1. If comm is an intracommunicator, each process (includes root process) sends the content of its send buffer to the root process. Root process receives the messages and stores them in rank order 2. TODO: If comm is an intercommunicator, it's not supported yet */ int MPI_Gather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm){ int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[2] = {(int)sendtype, (int)recvtype}; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Gather() cannot be invoked without " "MPI_Init() being called before.\n"); #endif if(sendbuf != MPI_IN_PLACE) $mpi_check_buffer(sendbuf, sendcount, sendtype); checkerEntry = $mpi_create_coroutine_entry(GATHER_TAG, root, -1, 2, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, GATHER_TAG, comm, "MPI_Gather()"); return 0; } /* The inverse operation of MPI_Gather() */ int MPI_Scatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm){ int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[2] = {(int)sendtype, (int)recvtype}; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Scatter() cannot be invoked without " "MPI_Init() being called before.\n"); #endif if (place == root) $mpi_check_buffer(sendbuf, sendcount, sendtype); checkerEntry = $mpi_create_coroutine_entry(SCATTER_TAG, root, -1, 2, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, SCATTER_TAG, comm, "MPI_Scatter()"); return 0; } /* MPI_Gatherv extends the functionality of MPI_Gather by allowing a varying count of data to be sent to root process, since recvcounts is now an array.*/ int MPI_Gatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int recvcounts[], const int displs[], MPI_Datatype recvtype, int root, MPI_Comm comm){ int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[2] = {(int)sendtype, (int)recvtype}; int recvcount = 0; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Gatherv() cannot be invoked without " "MPI_Init() being called before.\n"); #endif if(sendbuf != MPI_IN_PLACE) $mpi_check_buffer(sendbuf, sendcount, sendtype); checkerEntry = $mpi_create_coroutine_entry(GATHERV_TAG, root, -1, 2, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_gatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, GATHERV_TAG, comm, "MPI_Gatherv()"); return 0; } /* MPI_Scatterv extends the functionality of MPI_Scatter by allowing a varying count of data to be sent to each process, since sendcounts is now an array.*/ int MPI_Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm){ int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[2] = {(int)sendtype, (int)recvtype}; int sendcount = 0; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Scatterv() cannot be invoked without " "MPI_Init() being called before.\n"); #endif if (place == root) { for (int i = 0; i < nprocs; i++) sendcount += sendcounts[i]; $mpi_check_buffer(sendbuf, sendcount, sendtype); } checkerEntry = $mpi_create_coroutine_entry(SCATTERV_TAG, root, -1, 2, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); $mpi_scatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, SCATTERV_TAG, comm, "MPI_Scatterv()"); return 0; } int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm){ int place = $mpi_comm_place(comm); int nprocs = $mpi_comm_size(comm); int datatypes[2] = {(int)sendtype, (int)recvtype}; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Allgather() cannot be invoked without " "MPI_Init() being called before.\n"); #endif if(sendbuf != MPI_IN_PLACE) $mpi_check_buffer(sendbuf, sendcount, sendtype); checkerEntry = $mpi_create_coroutine_entry(ALLGATHER_TAG, 0, -1, 2, datatypes); specEntry = $mpi_check_collective(comm, checkerEntry); $mpi_diff_coroutine_entries(specEntry, checkerEntry, place); if (sendbuf != MPI_IN_PLACE) $mpi_gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, 0, ALLGATHER_TAG, comm, "MPI_Allgather()"); else { void * in_buf = $mpi_malloc(recvcount, recvtype); memcpy(in_buf, recvbuf + recvcount*place, sizeofDatatype(recvtype) * recvcount); $mpi_gather(in_buf, recvcount, recvtype, recvbuf, recvcount, recvtype, 0, ALLGATHER_TAG, comm, "MPI_Allgather()"); $free(in_buf); } $mpi_bcast(recvbuf, recvcount*nprocs, recvtype, 0, ALLGATHER_TAG, comm, "MPI_Allgather()"); return 0; } int MPI_Reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcount[], MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int total_count, i; int nprocs = $mpi_comm_size(comm); int rank = $mpi_comm_place(comm); int root = 0; int displs[nprocs]; int datatypes[1] = {(int)datatype}; // MPI library defined collective operation checking entries: $bundle checkerEntry; //the checking entry of this call $bundle specEntry; //a recorded entry as specification #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Reduce_scatter() cannot be invoked without " "MPI_Init() being called before.\n"); #endif $mpi_check_buffer(sendbuf, recvcount[rank], datatype); for(total_count = 0, i = 0; i 0) $mpi_check_buffer(recvbuf, count, datatype); $mpi_exscan(sendbuf, recvbuf, count, datatype, op, comm); return 0; } /* ****************** End of collecitve routines ********************* */ int MPI_Comm_dup(MPI_Comm comm, MPI_Comm * newcomm) { $scope CMPI_PROC_SCOPE = $mpi_proc_scope(comm); #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Comm_dup() cannot be invoked without " "MPI_Init() being called before.\n"); #endif $mpi_comm_dup(CMPI_PROC_SCOPE, comm, newcomm, "MPI_Comm_dup"); return 0; } int MPI_Comm_free(MPI_Comm * comm) { #ifndef _MPI_CONTRACT $assert(_mpi_state == _MPI_INIT, "MPI_Comm_free() cannot be invoked without " "MPI_Init() being called before.\n"); #endif $assert($is_derefable_pointer(comm), "The argument of MPI_Comm_free is NULL."); $mpi_comm_free(comm, _mpi_state); return 0; } int MPI_Init_thread( int *argc, char ***argv, int required, int *provided ){ _mpi_state = _MPI_INIT; //TODO: why set initialized flag here ?? *provided = MPI_THREAD_MULTIPLE; return MPI_SUCCESS; } #endif