Changes between Version 4 and Version 5 of MessagePassing


Ignore:
Timestamp:
07/11/13 21:14:57 (13 years ago)
Author:
siegel
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • MessagePassing

    v4 v5  
    7575void MPI_process(int pid) {
    7676
    77   void MPI_Send(void *buf, int size, int tag, int dest, MPI_Comm comm) {
    78   // create a message
    79   Message message;
     77  void MPI_Send(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
     78    // create a message
     79    Message message;
    8080
    81   message.tag = tag;
    82   message.size = size;
    83   message.data = $malloc<top>(&mp_heap, size);
    84   memcpy(message.data, buf, size);
    85   // enqueue on comm->buf_front[i][j] …
    86   // update  message.next, message.prev, buf_back[i][j]
    87 }
     81    message.tag = tag;
     82    message.size = count*sizeofDatatype(type);
     83    message.data = $malloc(&mp_heap, size);
     84    memcpy(message.data, buf, size);
     85    // enqueue on comm->buf_front[i][j] …
     86    // update  message.next, message.prev, buf_back[i][j]
     87  }
    8888
    89 // can I use a function as a guard?  not now
    90 // can always use busy-wait loop?
    91 boolean recv_guard(int source, int dest, int tag, Comm comm) {
     89  boolean match(MPI_Message *p, int source, int tag, Comm comm) {
     90    return (source == MPI_ANY_SOURCE || source == p->source)
     91         && (tag == MPI_ANY_TAG || tag == p->tag)
     92         && (comm == p->comm);
     93  }
     94 
     95  boolean probe(int source, int tag, Comm comm) {
     96    for (MPI_Message *p = comm->buf_front[source][pid]; p != NULL; p=p->next)
     97      if (match(p, source, dest, tag)) return true;
     98    return false;
     99  }
     100
     101  boolean recv_guard(int source, int tag, Comm comm) {
     102    if (source == MPI_ANY_SOURCE) {
     103      for (int i=0; i<NPROCS; i++) {
     104        if (probe(i, tag, comm)) return true;
     105      }
     106      return false;
     107    } else {
     108      return probe(source, tag, comm);
     109    }
     110  }
     111
     112  void recv(int dest, void *buf, int size, int source, int tag, Comm comm, Status *status) {
     113    int message_source;
     114    int message_dest;
     115    MPI_Message *message;
     116
     117    when (recv_guard(source, tag, comm)) ;
     118    if (source == MPI_ANY_SOURCE) {   
     119      for (int i=0; i<NPROCS; i++) {
     120        message = comm->buf_front[source][dest];
     121        ...
     122       // search the queue looking for the message
     123       // also set message_source, message_dest;
     124       // and remove message from the queue
     125      }
     126    } else {
     127      …
     128    }
     129 
     130    if (message->size > size) error…;
     131    memcpy(buf, message->data, message.size);
     132    status->size = message.size;
     133    status->source = message_source;
     134    status->dest = message_dest;
     135    $free(&mp_heap, message->data);
     136  }
     137
     138  // main body of process...
    92139
    93140}
    94141
    95 void recv(int dest, void *buf, int size, int tag, int source, Comm comm, Status *status) {
    96   Message *message; 
    97   int message_source;
    98   int message_dest;
    99   Message *queue = comm->buf_front[source][dest];
    100 
    101   // first,  the guard:
    102   when ($exists {int i | 0<= i &&
    103   // search the queue looking for the message
    104   // also set message_source, message_dest;
    105   // and remove message from the queue
    106  
    107   if (message.size > size) error…;
    108   memcpy(buf, message.data, message.size);
    109   status->size = message.size;
    110   status->source = message_source;
    111   status->dest = message_dest;
    112   $free(message->data);
    113 }
    114 
    115142}}}