| Version 28 (modified by , 12 years ago) ( diff ) |
|---|
Message Passing
Primitives
Note: what $pure means:
- the function is a system function
- it is side-effect-free
- its guard is "true" (i.e., it can never block)
Calls to a pure functions are exactly like expressions. They will not be removed by side-effect-removal. They can be used as guards in $when statements, or any other place an expression is called for. Hence they basically give you a way to add new expressions to the language (without fiddling with the grammar).
CIVL-C will provide some abstract datatypes for message queues and so on:
$message- an abstract, immutable datatype representing a message
<s> $pure $message $message_pack(int source, int dest, int tag, void *<s> data, int size) $reads s- creates a new message, copying data from the specified buffer
$pure int $message_source($message message)- returns the message source
$pure int $message_tag($message message)- returns the message tag
$pure int $message_dest($message message)- returns the message destination
$pure int $message_size($message message)- returns the message size
<s> void $message_unpack($message message, void *<s> buf, int size) $writes s- transfers message data to
buf, throwing exception if message size exceeds specified size
- transfers message data to
$comm- an abstract, immutable datatype representing a communicator value, which encapsulates a set of message channels between every pair of processes in a list of processes
$pure $comm $comm_create(int nprocs, $proc procs[[]])- creates a new comm from the given sequence of processes, by allocating memory and copying the process sequence; the new comm has no messages
$pure int $comm_nprocs($comm comm)- returns the number of processes associated to the comm
$pure $proc $comm_getProc($comm comm, int rank)- returns the rank-th proc in the comm
$pure $comm $comm_enqueue($comm comm, $message message)- adds the message to the comm (or, more precisely, returns a new comm value identical to the old except with the message added in the appropriate queue)
$pure boolean $comm_probe($comm comm, int source, int dest, int tag)- returns true iff a matching message exists in comm
$pure int $comm_seek($comm comm, int source, int dest, int tag)- finds the first matching message and returns its index without modifying comm
$pure $message $comm_get($comm comm, int source, int dest, int index)- returns the message at the index without modifying the comm
$pure $comm $comm_dequeue($comm comm, int source, int dest, int index)- removes the message at the index from the comm and returns the modified comm
$pure int $comm_chan_size($comm comm, int source, int dest)- returns the number of messages from source to dest stored in comm
$pure int $comm_total_size($comm comm)- returns the total number of messages in the comm
- constants defined:
int $COMM_ANY_SOURCEint $COMM_ANY_TAG
- define a normal struct:
struct $comm_recv_pair { $comm comm; $message msg; } struct $comm_recv_pair $comm_recv($comm comm, int source, int dest, int tag)- blocks until a matching message is available and then returns the message and the new communicator value obtained by removing that message in one struct
Examples
Here is how these primitives could be used to implement an MPI program:
// most of this stuff could go in a header file which is included here:
#define MPI_ANY_SOURCE $COMM_ANY_SOURCE
#define MPI_ANY_TAG $COMM_ANY_TAG
#define MPI_INT 1
#define MPI_FLOAT 2
#define MPI_DOUBLE 3
// etc.
$input int NPROCS;
$assume NPROCS >= 1;
$scope top;
$proc procs[NPROCS];
typedef $comm *MPI_Comm;
$comm MPI_Comm_world_comm;
MPI_Comm *MPI_COMM_WORLD = &MPI_Comm_world_comm;
boolean initialized = $false;
/* This structure tells you the source and tag of a received message,
* which you need if you used wildcards, Also size.
*/
typedef struct MPI_Status {
int source;
int tag;
int size;
} MPI_Status;
typedef int MPI_Datatype;
int sizeofDatatype(MPI_Datatype type) {
switch (type) {
case MPI_INT: return sizeof(int);
case MPI_FLOAT: return sizeof(float);
case MPI_DOUBLE: return sizeof(double);
default: exit(-1); // not yet implemented
}
}
void MPI_Comm_init(MPI_Comm comm) {
$when (initialized) ;
}
// very similar to MPI_Send but takes one extra argument at beginning saying who the sender is:
void CIVL_Send(int pid, void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
$message m = $message_pack(pid, dest, tag, buf, sizeofDatatype(type)*count);
*comm = $comm_enqueue(*comm, m);
}
// note we could define this function this way but it is more efficient to implement
// as system function:
struct $comm_recv_pair $comm_recv($comm comm, int source, int dest, int tag) {
struct $comm_recv_pair result;
when ($comm_probe(*comm, source, dest, tag)) {
int index = $comm_seek(*comm, source, pid, tag);
result.msg = $comm_get(*comm, source, pid, index);
result.comm = $comm_dequeue(*comm, source, dest, index);
return result;
}
}
// similar to MPI_Recv with extra arg at beginning saying who the receiver is:
void CIVL_Recv(int pid, void *buf, int count, MPI_Datataype type, int source, int tag, MPI_Comm comm, MPI_Status *status) {
int size = sizeofDatatype(type)*count;
struct $comm_recv_pair pair = $comm_recv(*comm, source, pid, tag);
$message m = pair.msg;
status->size = $message_size(m);
status->source = $message_source(m);
status->dest = $message_dest(m);
$message_unpack(m, buf, size); // will throw exception if message too big
}
void MPI_process(int pid) {
// these MPI functions are declared here in the process scope so that they can use the pid
// could put these in a separate header file which is included here?
void MPI_Send(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) {
CIVL_Send(pid, bug, count, type, dest, tag, comm);
}
void MPI_Recv(void *buf, int count, MPI_Datataype type, int source, int tag, MPI_Comm comm, MPI_Status *status) {
CIVL_Recv(pid, bug, count, type, source, tag, comm, status);
}
...
MPI_Comm_init(MPI_COMM_WORLD);
...
}
void main() {
for (int i=0; i<NPROCS; i++) procs[i] = $spawn MPI_process(i);
MPI_Comm_world_comm = $comm_create(NPROCS, &procs[0]);
initialized = $true;
for (int i=0; i<NPROCS; i++) $wait procs[i];
}
Collective functions (implemented with CIVL-C code)
Note:
See TracWiki
for help on using the wiki.
