wiki:MessagePassing

Version 9 (modified by siegel, 13 years ago) ( diff )

--

Message Passing

First attempt

Low-level implementation using malloc, linked lists and so on:

#define MPI_ANY_SOURCE -1
#define MPI_ANY_TAG -2
#define MPI_INT 1
#define MPI_FLOAT 2
#define MPI_DOUBLE 3
// etc.

$input int NPROCS;
$assume NPROCS >= 1;
$scope top;
$heap mp_heap;
$proc procs[NPROCS];

/* There will be one message queue for each pair of prods (i,j).
 * Each queue is a doubly-linked list of Message objects.
 */
typedef struct MPI_Message {
  struct MPI_Message *<top> next;
  struct MPI_Message *<top> prev;
  int tag;
  int size;
  void *<top> data;
} MPI_Message;

typedef struct MPI_Comm_struct {
  MPI_Message *<top> buf_front[NPROCS][NPROCS];
  MPI_Message *<top> buf_back[NPROCS][NPROCS];
} MPI_Comm_struct;

typedef MPI_Comm_struct *MPI_Comm;

/* As in MPI, when a receive returns, this structure
 * tells you the source and tag of received message,
 * which you need if you used wildcards,  Also size.
 */
typedef struct MPI_Status {
  int source;
  int tag;
  int size;
} MPI_Status;

typedef int MPI_Datatype;

/* This is the actual MPI Comm world structure */
MPI_Comm_struct MPI_Comm_world_struct;

/* The user will use MPI_COMM_WORLD */
MPI_Comm MPI_COMM_WORLD = &MPI_Comm_world_struct;

void init() {
  for (int i=0; i<NPROCS; i++)
    procs[i] = $spawn proc(i);
  for (int i=0; i<NPROCS; i++)
    for (int j=0; j<NPROCS; j++) {
      MPI_COMM_WORLD->buf_front[i][j] = NULL;
      MPI_COMM_WORLD->buf_back[i][j] = NULL;
    }
}

int sizeofDatatype(MPI_Datatype type) {
  switch (type) {
    case MPI_INT: return sizeof(int);
    case MPI_FLOAT: return sizeof(float);
    case MPI_DOUBLE: return sizeof(double);
    default: exit(-1); // not yet implemented
  }
}

void MPI_process(int pid) {

  void MPI_Send(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
    // create a message
    Message message;

    message.tag = tag;
    message.size = count*sizeofDatatype(type);
    message.data = $malloc(&mp_heap, size);
    memcpy(message.data, buf, size);
    // enqueue on comm->buf_front[i][j] …
    // update  message.next, message.prev, buf_back[i][j]
  }

  boolean match(MPI_Message *p, int source, int tag, Comm comm) {
    return (source == MPI_ANY_SOURCE || source == p->source)
         && (tag == MPI_ANY_TAG || tag == p->tag)
         && (comm == p->comm);
  }
  
  boolean probe(int source, int tag, Comm comm) {
    for (MPI_Message *p = comm->buf_front[source][pid]; p != NULL; p=p->next)
      if (match(p, source, dest, tag)) return true;
    return false;
  }

  boolean recv_guard(int source, int tag, Comm comm) {
    if (source == MPI_ANY_SOURCE) {
      for (int i=0; i<NPROCS; i++) {
        if (probe(i, tag, comm)) return true;
      }
      return false;
    } else {
      return probe(source, tag, comm);
    }
  }

  void recv(int dest, void *buf, int size, int source, int tag, Comm comm, Status *status) {
    int message_source;
    int message_dest;
    MPI_Message *message;

    when (recv_guard(source, tag, comm)) ;
    if (source == MPI_ANY_SOURCE) {   
      for (int i=0; i<NPROCS; i++) {
        message = comm->buf_front[source][dest];
        ...
       // search the queue looking for the message
       // also set message_source, message_dest;
       // and remove message from the queue
      }
    } else {
      …
    }
  
    if (message->size > size) error…;
    memcpy(buf, message->data, message.size);
    status->size = message.size;
    status->source = message_source;
    status->dest = message_dest;
    $free(&mp_heap, message->data);
  }

  // main body of process…

  $scope proc_scope;
  $heap proc_heap;
    
  …
}

void main() {
  for (int i=0; i<NPROCS; i++)
    procs[i] = $spawn(i);
  for (int i=0; i<NPROCS; i++)
    $wait procs[i];

}

Second attempt

It would be much better if CIVL-C provided some abstract datatypes for message queues and so on. What should these datatypes be? Proposal:

  • $message
    • an abstract datatype
  • $message $message_create(int source, int dest, int tag, void *data, int size)
    • creates a new message, allocating memory and copying data from the specified buffer
  • int $message_source($message * message)
    • returns the message source
  • int $message_tag($message * message)
    • returns the message tag
  • int $message_dest($message * message)
    • returns the message destination
  • int $message_size($message * message)
    • returns the message size
  • void $message_destroy($message * message)
    • destroys the message, deallocating whatever was allocated in its creation
  • $comm
    • an abstract datatype representing a communicator, or set of message channels between every pair of processes in a set of processes
  • $comm $comm_create(int nprocs, $proc * procs)
    • creates a new comm from the given sequence of processes, by allocating memory and copying the process sequence; the new comm has no messages
  • void $comm_destroy($comm * comm)
    • destroys the comm, deallocating whatever was allocated in its creation
  • int $comm_nprocs($comm * comm)
    • returns the number of processes associated to the comm
  • $proc * $comm_procs($comm * comm)
    • returns a pointer to the procs array in comm
  • void $comm_enqueue($comm * comm, $message * message)
    • adds the message to the comm
  • boolean $comm_probe($comm * comm, int source, int dest, int tag)
    • returns true iff a matching message exists in comm
  • $message * $comm_seek($comm * comm, int source, int dest, int tag)
    • finds the first matching message and returns pointer to it without modifying comm
  • $message * $comm_dequeue($comm * comm, int source, int dest, int tag)
    • finds the first matching message, removes it from comm, and returns pointer to message
  • int $comm_chan_size($comm * comm, int source, int dest)
    • returns the number of messages from source to dest stored in comm
  • int $comm_total_size($comm * comm)
    • returns the total number of messages in the comm
  • constants defined:
    • int $COMM_ANY_SOURCE
    • int $COMM_ANY_TAG
Note: See TracWiki for help on using the wiki.