= Message Passing = First attempt: {{{ #define MPI_ANY_SOURCE -1 #define MPI_ANY_TAG -2 #define MPI_INT 1 #define MPI_FLOAT 2 #define MPI_DOUBLE 3 // etc. $input int NPROCS; $assume NPROCS >= 1; $scope top; $heap mp_heap; $proc procs[NPROCS]; /* There will be one message queue for each pair of prods (i,j). * Each queue is a doubly-linked list of Message objects. */ typedef struct MPI_Message { struct MPI_Message * next; struct MPI_Message * prev; int tag; int size; void * data; } MPI_Message; typedef struct MPI_Comm_struct { MPI_Message * buf_front[NPROCS][NPROCS]; MPI_Message * buf_back[NPROCS][NPROCS]; } MPI_Comm_struct; typedef MPI_Comm_struct *MPI_Comm; /* As in MPI, when a receive returns, this structure * tells you the source and tag of received message, * which you need if you used wildcards, Also size. */ typedef struct MPI_Status { int source; int tag; int size; } MPI_Status; typedef int MPI_Datatype; /* This is the actual MPI Comm world structure */ MPI_Comm_struct MPI_Comm_world_struct; /* The user will use MPI_COMM_WORLD */ MPI_Comm MPI_COMM_WORLD = &MPI_Comm_world_struct; void init() { for (int i=0; ibuf_front[i][j] = NULL; MPI_COMM_WORLD->buf_back[i][j] = NULL; } } int sizeofDatatype(MPI_Datatype type) { switch (type) { case MPI_INT: return sizeof(int); case MPI_FLOAT: return sizeof(float); case MPI_DOUBLE: return sizeof(double); default: exit(-1); // not yet implemented } } void MPI_process(int pid) { void MPI_Send(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) { // create a message Message message; message.tag = tag; message.size = count*sizeofDatatype(type); message.data = $malloc(&mp_heap, size); memcpy(message.data, buf, size); // enqueue on comm->buf_front[i][j] … // update message.next, message.prev, buf_back[i][j] } boolean match(MPI_Message *p, int source, int tag, Comm comm) { return (source == MPI_ANY_SOURCE || source == p->source) && (tag == MPI_ANY_TAG || tag == p->tag) && (comm == p->comm); } boolean probe(int source, int tag, Comm comm) { for (MPI_Message *p = comm->buf_front[source][pid]; p != NULL; p=p->next) if (match(p, source, dest, tag)) return true; return false; } boolean recv_guard(int source, int tag, Comm comm) { if (source == MPI_ANY_SOURCE) { for (int i=0; ibuf_front[source][dest]; ... // search the queue looking for the message // also set message_source, message_dest; // and remove message from the queue } } else { … } if (message->size > size) error…; memcpy(buf, message->data, message.size); status->size = message.size; status->source = message_source; status->dest = message_dest; $free(&mp_heap, message->data); } // main body of process… $scope proc_scope; $heap proc_heap; … } void main() { for (int i=0; i