| Version 21 (modified by , 13 years ago) ( diff ) |
|---|
Message Passing
Primitives
CIVL-C will provide some abstract datatypes for message queues and so on:
$message- an abstract, immutable datatype representing a message
$message $message_pack(int source, int dest, int tag, void *data, int size)- creates a new message, copying data from the specified buffer
int $message_source($message message)- returns the message source
int $message_tag($message message)- returns the message tag
int $message_dest($message message)- returns the message destination
int $message_size($message message)- returns the message size
void $message_unpack($message message, void *buf, int size)- transfers message data to
buf, throwing exception if message size exceeds specified size
- transfers message data to
$comm- an abstract, immutable datatype representing a communicator value, which encapsulates a set of message channels between every pair of processes in a list of processes
$comm $comm_create(int nprocs, $proc procs[])- creates a new comm from the given sequence of processes, by allocating memory and copying the process sequence; the new comm has no messages
int $comm_nprocs($comm comm)- returns the number of processes associated to the comm
$proc $comm_getProc($comm comm, int rank)- returns the rank-th proc in the comm
$comm $comm_enqueue($comm comm, $message message)- adds the message to the comm (or, more precisely, returns a new comm value identical to the old except with the message added in the appropriate queue)
boolean $comm_probe($comm comm, int source, int dest, int tag)- returns true iff a matching message exists in comm
int $comm_seek($comm comm, int source, int dest, int tag)- finds the first matching message and returns its index without modifying comm
$message $comm_get($comm comm, int source, int dest, int index)- returns the message at the index without modifying the comm
$comm $comm_dequeue($comm comm, int source, int dest, int index)- removes the message at the index from the comm and returns the modified comm
int $comm_chan_size($comm comm, int source, int dest)- returns the number of messages from source to dest stored in comm
int $comm_total_size($comm comm)- returns the total number of messages in the comm
- constants defined:
int $COMM_ANY_SOURCEint $COMM_ANY_TAG
- define a normal struct:
struct $comm_recv_pair { $comm comm; $message msg; } struct $comm_recv_pair $comm_recv($comm comm, int source, int dest, int tag)
blocks until a matching message is available and then returns the message and the new communicator value obtained by removing that message in one struct
Examples
Here is how these primitives could be used to implement an MPI program:
#define MPI_ANY_SOURCE $COMM_ANY_SOURCE
#define MPI_ANY_TAG $COMM_ANY_TAG
#define MPI_INT 1
#define MPI_FLOAT 2
#define MPI_DOUBLE 3
// etc.
$input int NPROCS;
$assume NPROCS >= 1;
$scope top;
$proc procs[NPROCS];
typedef $comm *MPI_Comm;
$comm MPI_Comm_world_comm;
MPI_Comm *MPI_COMM_WORLD = &MPI_Comm_world_comm;
boolean initialized = $false;
/* This structure tells you the source and tag of a received message,
* which you need if you used wildcards, Also size.
*/
typedef struct MPI_Status {
int source;
int tag;
int size;
} MPI_Status;
typedef int MPI_Datatype;
int sizeofDatatype(MPI_Datatype type) {
switch (type) {
case MPI_INT: return sizeof(int);
case MPI_FLOAT: return sizeof(float);
case MPI_DOUBLE: return sizeof(double);
default: exit(-1); // not yet implemented
}
}
void MPI_Comm_init(MPI_Comm comm) {
$when (initialized) ;
}
void MPI_Send(int pid, void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
$message m = $message_pack(pid, dest, tag, buf, sizeofDatatype(type)*count);
*comm = $comm_enqueue(*comm, m);
}
void MPI_Recv(int pid, void *buf, int count, MPI_Datataype type, int source, int tag, MPI_Comm comm, MPI_Status *status) {
int size = sizeofDatatype(type)*count;
when ($comm_probe(*comm, source, pid, tag)) {
int index = $comm_seek(*comm, source, pid, tag);
$message m = $comm_get(*comm, source, pid, index);
*comm = $comm_dequeue(*comm, source, pid, index);
status->size = $message_size(m);
status->source = $message_source(m);
status->dest = $message_dest(m);
$message_unpack(m, buf, size); // will throw exception if message too big
}
}
void MPI_process(int pid) {
...
MPI_Comm_init(MPI_COMM_WORLD);
...
}
void main() {
for (int i=0; i<NPROCS; i++) procs[i] = $spawn MPI_process(i);
MPI_Comm_world_comm = $comm_create(NPROCS, &procs[0]);
initialized = $true;
for (int i=0; i<NPROCS; i++) $wait procs[i];
}
Note:
See TracWiki
for help on using the wiki.
