= Message Passing = == First attempt == Low-level implementation using malloc, linked lists and so on: {{{ #define MPI_ANY_SOURCE -1 #define MPI_ANY_TAG -2 #define MPI_INT 1 #define MPI_FLOAT 2 #define MPI_DOUBLE 3 // etc. $input int NPROCS; $assume NPROCS >= 1; $scope top; $heap mp_heap; $proc procs[NPROCS]; /* There will be one message queue for each pair of prods (i,j). * Each queue is a doubly-linked list of Message objects. */ typedef struct MPI_Message { struct MPI_Message * next; struct MPI_Message * prev; int tag; int size; void * data; } MPI_Message; typedef struct MPI_Comm_struct { MPI_Message * buf_front[NPROCS][NPROCS]; MPI_Message * buf_back[NPROCS][NPROCS]; } MPI_Comm_struct; typedef MPI_Comm_struct *MPI_Comm; /* As in MPI, when a receive returns, this structure * tells you the source and tag of received message, * which you need if you used wildcards, Also size. */ typedef struct MPI_Status { int source; int tag; int size; } MPI_Status; typedef int MPI_Datatype; /* This is the actual MPI Comm world structure */ MPI_Comm_struct MPI_Comm_world_struct; /* The user will use MPI_COMM_WORLD */ MPI_Comm MPI_COMM_WORLD = &MPI_Comm_world_struct; void init() { for (int i=0; ibuf_front[i][j] = NULL; MPI_COMM_WORLD->buf_back[i][j] = NULL; } } int sizeofDatatype(MPI_Datatype type) { switch (type) { case MPI_INT: return sizeof(int); case MPI_FLOAT: return sizeof(float); case MPI_DOUBLE: return sizeof(double); default: exit(-1); // not yet implemented } } void MPI_process(int pid) { void MPI_Send(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) { // create a message Message message; message.tag = tag; message.size = count*sizeofDatatype(type); message.data = $malloc(&mp_heap, size); memcpy(message.data, buf, size); // enqueue on comm->buf_front[i][j] … // update message.next, message.prev, buf_back[i][j] } boolean match(MPI_Message *p, int source, int tag, Comm comm) { return (source == MPI_ANY_SOURCE || source == p->source) && (tag == MPI_ANY_TAG || tag == p->tag) && (comm == p->comm); } boolean probe(int source, int tag, Comm comm) { for (MPI_Message *p = comm->buf_front[source][pid]; p != NULL; p=p->next) if (match(p, source, dest, tag)) return true; return false; } boolean recv_guard(int source, int tag, Comm comm) { if (source == MPI_ANY_SOURCE) { for (int i=0; ibuf_front[source][dest]; ... // search the queue looking for the message // also set message_source, message_dest; // and remove message from the queue } } else { … } if (message->size > size) error…; memcpy(buf, message->data, message.size); status->size = message.size; status->source = message_source; status->dest = message_dest; $free(&mp_heap, message->data); } // main body of process… $scope proc_scope; $heap proc_heap; … } void main() { for (int i=0; i= 1; $scope top; $proc procs[NPROCS]; typedef $comm *MPI_Comm; $comm MPI_Comm_world_comm; MPI_Comm *MPI_COMM_WORLD = &_MPI_COMM_WORLD; boolean initialized = $false; /* This structure tells you the source and tag of a received message, * which you need if you used wildcards, Also size. */ typedef struct MPI_Status { int source; int tag; int size; } MPI_Status; typedef int MPI_Datatype; int sizeofDatatype(MPI_Datatype type) { switch (type) { case MPI_INT: return sizeof(int); case MPI_FLOAT: return sizeof(float); case MPI_DOUBLE: return sizeof(double); default: exit(-1); // not yet implemented } } void MPI_Comm_init(MPI_Comm comm) { $when (initialized) ; } void MPI_Send(int pid, void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) { $message m = $message_pack(pid, dest, tag, buf, sizeofDatatype(type)*count); $comm_enqueue(comm, &m); } void MPI_Recv(int pid, void *buf, int count, MPI_Datataype type, int source, int tag, MPI_Comm comm, MPI_Status *status) { int size = sizeofDatatype(type)*count; // the following is a system function with built-in guard: $message m = $comm_dequeue(comm, source, pid, tag); status->size = $message_size(&m); status->source = $message_source(&m); status->dest = $message_dest(&m); $message_unpack(&m, buf, size); // will throw exception if message too big } void MPI_process(int pid) { ... MPI_Comm_init(MPI_COMM_WORLD); ... } void main() { for (int i=0; i