/* This file completes the definitions of some types and functions * for communication, which are declared in comm-common.cvh. */ #ifndef __CIVLC_COMM__ #define __CIVLC_COMM__ #include #include #include #include #pragma CIVL ACSL /* *********************** Types *********************** */ /* A datatype representing a queue of messages. All message * data is encapsulated inside this value; no external allocation * is used. Completes the declaration of this structure type in * civlc-common.h */ struct _queue { int length; $message messages[]; }; /* A global communicator datatype which must be operated by local communicators. * This communicator type has the same meaning as the communicator type * in MPI. Completes the declaration of this type in civlc-common.h */ struct _gcomm { int nprocs; // number of processes $proc procs[]; // process references _Bool isInit[]; // if the local comm has been initiated $queue buf[][]; // message buffers }; /* A datatype representing a local communicator which is used for * operating global communicators. The local communicator type has * a handle of a global communicator. This type represents for * a set of processes which have ranks in common. * Completes the declaration of this type in civlc-common.h. */ struct _comm { int place; $gcomm gcomm; }; /* *********************** Functions *********************** */ /* creates a new message, copying data from the specified buffer */ $atomic_f $message $message_pack(int source, int dest, int tag, const void *data, int size) { $message result; result.source = source; result.dest = dest; result.tag = tag; if (data != NULL && size > 0) { result.data = $bundle_pack(data, size); } else if (data == NULL) $assert(size == 0, "Attempt to pack a non-zero size message with a NULL pointer"); result.size = size; return result; } /* returns the message source */ int $message_source($message message) { return message.source; } /* returns the message tag */ int $message_tag($message message) { return message.tag; } /* returns the message destination */ int $message_dest($message message) { return message.dest; } /* returns the message size */ int $message_size($message message) { return message.size; } /* transfers message data to buf, throwing exception if message * size exceeds specified size */ $atomic_f void $message_unpack($message message, void *buf, int size) { if (buf != NULL && message.size > 0) { $bundle_unpack(message.data, buf); $assert(message.size <= size, "Message of size %d exceeds the specified size %d.", message.size, size); } else if (buf == NULL) $assert(message.size == 0, "Attempt to unpack a non-zero size message with a NULL pointer."); } $atomic_f $gcomm $gcomm_create($scope scope, int size){ $gcomm gcomm=($gcomm)$malloc(scope, sizeof(struct _gcomm)); $queue empty; empty.length=0; $seq_init(&empty.messages, 0, NULL); gcomm->nprocs=size; gcomm->procs=(($proc[size])$lambda(int i)$proc_null); gcomm->isInit=((_Bool[size])$lambda(int i)$false); gcomm->buf=(($queue[size][size])$lambda(int i,j) empty); return gcomm; } $atomic_f int $gcomm_destroy($gcomm gcomm, void * junkMsgs){ int nprocs = gcomm->nprocs; int numJunks = 0; if(junkMsgs != NULL) { for (int i = 0; i < nprocs; i++) for (int j = 0; j < nprocs; j++) { $queue queue = gcomm->buf[i][j]; if (queue.length > 0) $seq_append(junkMsgs, queue.messages, queue.length); } numJunks = $seq_length(junkMsgs); } $free(gcomm); return numJunks; } /*@ executes_when \true; @ depends_on \access(comm->gcomm, newcomm->gcomm); */ $atomic_f void $gcomm_dup($comm comm, $comm newcomm){ $copy(newcomm->gcomm, comm->gcomm); } $atomic_f $comm $comm_create($scope scope, $gcomm gcomm, int place){ $assert(!gcomm->isInit[place], "the place %d is already occupied in the global communicator!", place); $comm comm=($comm)$malloc(scope, sizeof(struct _comm)); gcomm->procs[place]=$self; gcomm->isInit[place]=$true; comm->gcomm=gcomm; comm->place=place; return comm; } $atomic_f void $comm_destroy($comm comm){ $free(comm); } /* Returns the place of the local communicator. This is the same as the * place argument used to create the local communicator. */ $atomic_f int $comm_place($comm comm){ return comm->place; } $atomic_f int $comm_size($comm comm){ return comm->gcomm->nprocs; } #endif