Changes between Version 11 and Version 12 of MessagePassing


Ignore:
Timestamp:
07/18/13 07:40:13 (13 years ago)
Author:
siegel
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • MessagePassing

    v11 v12  
    11= Message Passing =
    22
    3 == First attempt ==
     3== Primitives ==
    44
    5 Low-level implementation using malloc, linked lists and so on:
    6 
    7 {{{
    8 
    9 #define MPI_ANY_SOURCE -1
    10 #define MPI_ANY_TAG -2
    11 #define MPI_INT 1
    12 #define MPI_FLOAT 2
    13 #define MPI_DOUBLE 3
    14 // etc.
    15 
    16 $input int NPROCS;
    17 $assume NPROCS >= 1;
    18 $scope top;
    19 $heap mp_heap;
    20 $proc procs[NPROCS];
    21 
    22 /* There will be one message queue for each pair of prods (i,j).
    23  * Each queue is a doubly-linked list of Message objects.
    24  */
    25 typedef struct MPI_Message {
    26   struct MPI_Message *<top> next;
    27   struct MPI_Message *<top> prev;
    28   int tag;
    29   int size;
    30   void *<top> data;
    31 } MPI_Message;
    32 
    33 typedef struct MPI_Comm_struct {
    34   MPI_Message *<top> buf_front[NPROCS][NPROCS];
    35   MPI_Message *<top> buf_back[NPROCS][NPROCS];
    36 } MPI_Comm_struct;
    37 
    38 typedef MPI_Comm_struct *MPI_Comm;
    39 
    40 /* As in MPI, when a receive returns, this structure
    41  * tells you the source and tag of received message,
    42  * which you need if you used wildcards,  Also size.
    43  */
    44 typedef struct MPI_Status {
    45   int source;
    46   int tag;
    47   int size;
    48 } MPI_Status;
    49 
    50 typedef int MPI_Datatype;
    51 
    52 /* This is the actual MPI Comm world structure */
    53 MPI_Comm_struct MPI_Comm_world_struct;
    54 
    55 /* The user will use MPI_COMM_WORLD */
    56 MPI_Comm MPI_COMM_WORLD = &MPI_Comm_world_struct;
    57 
    58 void init() {
    59   for (int i=0; i<NPROCS; i++)
    60     procs[i] = $spawn proc(i);
    61   for (int i=0; i<NPROCS; i++)
    62     for (int j=0; j<NPROCS; j++) {
    63       MPI_COMM_WORLD->buf_front[i][j] = NULL;
    64       MPI_COMM_WORLD->buf_back[i][j] = NULL;
    65     }
    66 }
    67 
    68 int sizeofDatatype(MPI_Datatype type) {
    69   switch (type) {
    70     case MPI_INT: return sizeof(int);
    71     case MPI_FLOAT: return sizeof(float);
    72     case MPI_DOUBLE: return sizeof(double);
    73     default: exit(-1); // not yet implemented
    74   }
    75 }
    76 
    77 void MPI_process(int pid) {
    78 
    79   void MPI_Send(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
    80     // create a message
    81     Message message;
    82 
    83     message.tag = tag;
    84     message.size = count*sizeofDatatype(type);
    85     message.data = $malloc(&mp_heap, size);
    86     memcpy(message.data, buf, size);
    87     // enqueue on comm->buf_front[i][j] …
    88     // update  message.next, message.prev, buf_back[i][j]
    89   }
    90 
    91   boolean match(MPI_Message *p, int source, int tag, Comm comm) {
    92     return (source == MPI_ANY_SOURCE || source == p->source)
    93          && (tag == MPI_ANY_TAG || tag == p->tag)
    94          && (comm == p->comm);
    95   }
    96  
    97   boolean probe(int source, int tag, Comm comm) {
    98     for (MPI_Message *p = comm->buf_front[source][pid]; p != NULL; p=p->next)
    99       if (match(p, source, dest, tag)) return true;
    100     return false;
    101   }
    102 
    103   boolean recv_guard(int source, int tag, Comm comm) {
    104     if (source == MPI_ANY_SOURCE) {
    105       for (int i=0; i<NPROCS; i++) {
    106         if (probe(i, tag, comm)) return true;
    107       }
    108       return false;
    109     } else {
    110       return probe(source, tag, comm);
    111     }
    112   }
    113 
    114   void recv(int dest, void *buf, int size, int source, int tag, Comm comm, Status *status) {
    115     int message_source;
    116     int message_dest;
    117     MPI_Message *message;
    118 
    119     when (recv_guard(source, tag, comm)) ;
    120     if (source == MPI_ANY_SOURCE) {   
    121       for (int i=0; i<NPROCS; i++) {
    122         message = comm->buf_front[source][dest];
    123         ...
    124        // search the queue looking for the message
    125        // also set message_source, message_dest;
    126        // and remove message from the queue
    127       }
    128     } else {
    129       …
    130     }
    131  
    132     if (message->size > size) error…;
    133     memcpy(buf, message->data, message.size);
    134     status->size = message.size;
    135     status->source = message_source;
    136     status->dest = message_dest;
    137     $free(&mp_heap, message->data);
    138   }
    139 
    140   // main body of process…
    141 
    142   $scope proc_scope;
    143   $heap proc_heap;
    144    
    145   …
    146 }
    147 
    148 void main() {
    149   for (int i=0; i<NPROCS; i++)
    150     procs[i] = $spawn(i);
    151   for (int i=0; i<NPROCS; i++)
    152     $wait procs[i];
    153 
    154 }
    155 
    156 }}}
    157 
    158 == Second attempt ==
    159 
    160 It would be much better if CIVL-C provided some abstract datatypes for message queues and so on.  What should these datatypes be?  Proposal:
     5CIVL-C will provide some abstract datatypes for message queues and so on:
    1616
    1627* `$message`
     
    19944 * `int $COMM_ANY_SOURCE`
    20045 * `int $COMM_ANY_TAG`
     46
     47== Examples ==
    20148
    20249Here is how these primitives could be used to implement an MPI program: