= Message Passing = First attempt: {{{ #define MPI_ANY_SOURCE -1 #define MPI_ANY_TAG -2 #define MPI_INT 1 #define MPI_FLOAT 2 #define MPI_DOUBLE 3 // etc. $input int NPROCS; $assume NPROCS >= 1; $scope top; $heap mp_heap; $proc procs[NPROCS]; /* There will be one message queue for each pair of prods (i,j). * Each queue is a doubly-linked list of Message objects. */ typedef struct MPI_Message { struct MPI_Message * next; struct MPI_Message * prev; int tag; int size; void * data; } MPI_Message; typedef struct MPI_Comm_struct { MPI_Message * buf_front[NPROCS][NPROCS]; MPI_Message * buf_back[NPROCS][NPROCS]; } MPI_Comm_struct; typedef MPI_Comm_struct *MPI_Comm; /* As in MPI, when a receive returns, this structure * tells you the source and tag of received message, * which you need if you used wildcards, Also size. */ typedef struct MPI_Status { int source; int tag; int size; } MPI_Status; typedef int MPI_Datatype; /* This is the actual MPI Comm world structure */ MPI_Comm_struct MPI_Comm_world_struct; /* The user will use MPI_COMM_WORLD */ MPI_Comm MPI_COMM_WORLD = &MPI_Comm_world_struct; void init() { for (int i=0; ibuf_front[i][j] = NULL; MPI_COMM_WORLD->buf_back[i][j] = NULL; } } int sizeofDatatype(MPI_Datatype type) { switch (type) { case MPI_INT: return sizeof(int); case MPI_FLOAT: return sizeof(float); case MPI_DOUBLE: return sizeof(double); default: exit(-1); // not yet implemented } } void MPI_process(int pid) { void MPI_Send(void *buf, int size, int tag, int dest, MPI_Comm comm) { // create a message Message message; message.tag = tag; message.size = size; message.data = $malloc(&mp_heap, size); memcpy(message.data, buf, size); // enqueue on comm->buf_front[i][j] … // update message.next, message.prev, buf_back[i][j] } // can I use a function as a guard? not now // can always use busy-wait loop? boolean recv_guard(int source, int dest, int tag, Comm comm) { } void recv(int dest, void *buf, int size, int tag, int source, Comm comm, Status *status) { Message *message; int message_source; int message_dest; Message *queue = comm->buf_front[source][dest]; // first, the guard: when ($exists {int i | 0<= i && // search the queue looking for the message // also set message_source, message_dest; // and remove message from the queue if (message.size > size) error…; memcpy(buf, message.data, message.size); status->size = message.size; status->source = message_source; status->dest = message_dest; $free(message->data); } }}}