wiki:MessagePassing

Version 21 (modified by siegel, 13 years ago) ( diff )

--

Message Passing

Primitives

CIVL-C will provide some abstract datatypes for message queues and so on:

  • $message
    • an abstract, immutable datatype representing a message
  • $message $message_pack(int source, int dest, int tag, void *data, int size)
    • creates a new message, copying data from the specified buffer
  • int $message_source($message message)
    • returns the message source
  • int $message_tag($message message)
    • returns the message tag
  • int $message_dest($message message)
    • returns the message destination
  • int $message_size($message message)
    • returns the message size
  • void $message_unpack($message message, void *buf, int size)
    • transfers message data to buf, throwing exception if message size exceeds specified size
  • $comm
    • an abstract, immutable datatype representing a communicator value, which encapsulates a set of message channels between every pair of processes in a list of processes
  • $comm $comm_create(int nprocs, $proc procs[])
    • creates a new comm from the given sequence of processes, by allocating memory and copying the process sequence; the new comm has no messages
  • int $comm_nprocs($comm comm)
    • returns the number of processes associated to the comm
  • $proc $comm_getProc($comm comm, int rank)
    • returns the rank-th proc in the comm
  • $comm $comm_enqueue($comm comm, $message message)
    • adds the message to the comm (or, more precisely, returns a new comm value identical to the old except with the message added in the appropriate queue)
  • boolean $comm_probe($comm comm, int source, int dest, int tag)
    • returns true iff a matching message exists in comm
  • int $comm_seek($comm comm, int source, int dest, int tag)
    • finds the first matching message and returns its index without modifying comm
  • $message $comm_get($comm comm, int source, int dest, int index)
    • returns the message at the index without modifying the comm
  • $comm $comm_dequeue($comm comm, int source, int dest, int index)
    • removes the message at the index from the comm and returns the modified comm
  • int $comm_chan_size($comm comm, int source, int dest)
    • returns the number of messages from source to dest stored in comm
  • int $comm_total_size($comm comm)
    • returns the total number of messages in the comm
  • constants defined:
    • int $COMM_ANY_SOURCE
    • int $COMM_ANY_TAG
  • define a normal struct:
    struct $comm_recv_pair {
      $comm comm;
      $message msg;
    }
    
  • struct $comm_recv_pair $comm_recv($comm comm, int source, int dest, int tag)

blocks until a matching message is available and then returns the message and the new communicator value obtained by removing that message in one struct

Examples

Here is how these primitives could be used to implement an MPI program:

#define MPI_ANY_SOURCE $COMM_ANY_SOURCE
#define MPI_ANY_TAG $COMM_ANY_TAG
#define MPI_INT 1
#define MPI_FLOAT 2
#define MPI_DOUBLE 3
// etc.

$input int NPROCS;
$assume NPROCS >= 1;
$scope top;
$proc procs[NPROCS];

typedef $comm *MPI_Comm;
$comm MPI_Comm_world_comm;
MPI_Comm *MPI_COMM_WORLD = &MPI_Comm_world_comm;
boolean initialized = $false;

/* This structure tells you the source and tag of a received message,
 * which you need if you used wildcards,  Also size.
 */
typedef struct MPI_Status {
  int source;
  int tag;
  int size;
} MPI_Status;

typedef int MPI_Datatype;

int sizeofDatatype(MPI_Datatype type) {
  switch (type) {
    case MPI_INT: return sizeof(int);
    case MPI_FLOAT: return sizeof(float);
    case MPI_DOUBLE: return sizeof(double);
    default: exit(-1); // not yet implemented
  }
}

void MPI_Comm_init(MPI_Comm comm) {
  $when (initialized) ;
}

void MPI_Send(int pid, void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
  $message m = $message_pack(pid, dest, tag, buf, sizeofDatatype(type)*count);

  *comm = $comm_enqueue(*comm, m);
}

void MPI_Recv(int pid, void *buf, int count, MPI_Datataype type, int source, int tag, MPI_Comm comm, MPI_Status *status) {
  int size = sizeofDatatype(type)*count;

  when ($comm_probe(*comm, source, pid, tag)) {
    int index = $comm_seek(*comm, source, pid, tag);
    $message m = $comm_get(*comm, source, pid, index);

    *comm = $comm_dequeue(*comm, source, pid, index);
    status->size = $message_size(m);
    status->source = $message_source(m);
    status->dest = $message_dest(m);
    $message_unpack(m, buf, size); // will throw exception if message too big
  }
}

void MPI_process(int pid) {
  ...
  MPI_Comm_init(MPI_COMM_WORLD);
  ...
}

void main() {
  for (int i=0; i<NPROCS; i++) procs[i] = $spawn MPI_process(i);
  MPI_Comm_world_comm = $comm_create(NPROCS, &procs[0]);
  initialized = $true;
  for (int i=0; i<NPROCS; i++) $wait procs[i];
}
Note: See TracWiki for help on using the wiki.