#ifndef _CIVL_COLLATE_ #define _CIVL_COLLATE_ #include #include #include // Status of a collate state for a process: #define NONARRIVED 0 #define ARRIVED 1 #define DEPARTED 2 // Status of a collate attribute checker: #define UNCHECKED 3 #define CHECKED 4 // The $gcollate_state whose state field is significant: #define _STATE_USED(gstate, place) \ ((gstate)->status[(place)] >= NONARRIVED && (gstate)->status[(place)] <= DEPARTED) #define _ARRIVED(gstate, place) \ ((gstate)->status[(place)] == ARRIVED || (gstate)->status[(place)] == DEPARTED) #define _DEPARTED(gstate, place) \ ((gstate)->status[(place)] == DEPARTED) // The $gcollate_state whose attribute field is significant: #define _ATTR_USED(gstate, place) \ ((gstate)->status[(place)] >= UNCHECKED && (gstate)->status[(place)] <= CHECKED) #define _CHECKED(gstate, place) \ ((gstate)->status[(place)] == CHECKED) /******************* Definition of datatypes: *******************/ struct _gcollator { // The number of participants of a _gcollator object: int nprocs; // $proc array: $proc procs[]; // The length of the queue of collation states: int queue_length; // The queue of collation states. Note that elements in this queue // are references to collate state objects: $gcollate_state queue[]; _Bool entered[]; }; struct _collator { // The place of the process in a _gcollator who holds this handle: int place; // A handle to the _gcollator object: $gcollator gcollator; }; // TODO: don't use one queue for collate states and collective attributes struct _gcollate_state { // An array of markers for whether a process has already arrived // this entry or departed: int status[]; // collate $state: $state state; // A customized attribute, it can be used to check some attribute // that must be equivalent collectively at some point: $bundle attribute; // Note: for one $gcollate_state object, state and attribute cannot // be in use at the same time. }; struct _collate_state { // The place of the process in a _gcollator who holds this handle: int place; // A reference to a _gcollate_state: $gcollate_state gstate; }; /******************* Function definitions ***************************/ /* Creates an global collator object. The object is allocated in the * given scope. It returns a handle $gcollator to the object. * * scope : The scope where the object is allocated. * nprocs: The number of processes included in the global collator. */ $atomic_f $gcollator $gcollator_create($scope scope, int nprocs) { $gcollator gcollator = ($gcollator)$malloc(scope, sizeof(struct _gcollator)); gcollator->nprocs = nprocs; gcollator->procs = ($proc[nprocs])$lambda(int i) $proc_null; gcollator->queue_length = 0; $seq_init(&(gcollator->queue), 0, NULL); gcollator->entered=(_Bool[nprocs])$lambda(int i) $false; return gcollator; } /* Free a $gcollator object. All gcollate states referred by this * object will be freed as well. * * gc : a handle to the $gcollator object */ $atomic_f void $gcollator_destroy($gcollator gc) { $assert(gc->queue_length == 0, "There is at least an entry in the gcollator " "(referred by %p) hasn't been departed by all processes.", gc); int garbage_size = $seq_length(&gc->queue); for (int i = 0; i < garbage_size; i++) $free(gc->queue[i]); $free(gc); } /* Creates a local collate object. The object is allocated in the * given scope. A local collate represents a local handle to part of * the global collate object. * * gcollator: The handle to the global collate object. * scope: The scope where the local collate object will be allocated. * place: The place of the participant process in the global collate * object. Each participant process has an unique place. Place shall * be greater than or equal to 0 and less than the number of all * participants. */ $atomic_f $collator $collator_create($gcollator gcollator, $scope scope, int place) { $collator collator = ($collator)$malloc(scope, sizeof(struct _collator)); collator->place = place; collator->gcollator = gcollator; collator->gcollator->procs[place] = $self; return collator; } $atomic_f void $collator_enters($collator collator){ collator->gcollator->entered[collator->place] = $true; } $atomic_f _Bool $collator_has_entered($collator collator, $range range){ $domain(1) dom=($domain(1)){range}; _Bool *entered=collator->gcollator->entered; $for(int i: dom){ if(!entered[i]) return $false; } return $true; } $atomic_f $collate_state $collate_arrives($collator c, $scope scope) { int place = c->place; int queue_size, nprocs; _Bool first = $true; $gcollate_state * queue; $gcollate_state gcollate_state; $collate_state result = {0, NULL}; nprocs = c->gcollator->nprocs; queue = c->gcollator->queue; queue_size = c->gcollator->queue_length; // Looking for the first unarrived collate state. If no such a collate // state, create and insert one: for (int i = 0; i < queue_size; i++) if (_STATE_USED(queue[i], place) && !_ARRIVED(queue[i], place)) { gcollate_state = queue[i]; first = $false; break; } if (first) { _Bool initValue = $false; $scope root = $scopeof(*c->gcollator); gcollate_state = ($gcollate_state)$malloc(root, sizeof(struct _gcollate_state)); gcollate_state->status = (int[nprocs])$lambda(int i) NONARRIVED; gcollate_state->state = $state_null; // It must use insertion instead of append given the existance of // garbage section: $seq_insert(&c->gcollator->queue, c->gcollator->queue_length, &gcollate_state, 1); c->gcollator->queue_length++; } result.place = place; result.gstate = gcollate_state; $collate_snapshot(result, nprocs, scope); gcollate_state->status[place] = ARRIVED; return result; } $atomic_f void $collate_departs($collator c, $collate_state cs) { int nprocs, queue_size, place, index; $gcollate_state *queue; $gcollate_state gcollate_state = cs.gstate; place = c->place; nprocs = c->gcollator->nprocs; queue_size = c->gcollator->queue_length; queue = c->gcollator->queue; $assert(_STATE_USED(gcollate_state, place) && _ARRIVED(gcollate_state, place), "Only an arrived collate state can be freed"); // Mark the refered gcollate state as departed: gcollate_state->status[place] = DEPARTED; /* For a gcollate state which has been departed by all processes, it * will be move to the tail of queue. Then there forms a garbage * section at the tail of the queue whose length will increase. The * reason of keeping these departed gcollate states is they still * can be read by processes after they are marked as departed. The garbage * section will be freed when the gcollator being destroyed. * * The queue length will not reveal the existance of the garbage * section, i.e. queue length will decrease by 1 after a gcollate * state being moved to the garbage section. */ /* Scan the queue in the gcollator to remove the entry that just has * been marked as DEPARTED by all processes (all-departed). At this * point, there is at most one all-departed entry in the queue. */ index = -1; for (int i = 0; i < queue_size; i++) { _Bool isGarbage = $true; for (int j = 0; j < nprocs; j++) if (!_DEPARTED(queue[i], j)) { isGarbage = $false; break; } if (isGarbage) { index = i; break; } } if (index >= 0) { $seq_remove(&c->gcollator->queue, index, NULL, 1); $seq_append(&c->gcollator->queue, &gcollate_state, 1); c->gcollator->queue_length--; } // set the gcollate state reference to NULL so that the process // cannot access it after the call of departs(): cs.gstate = NULL; } $atomic_f $bundle $collate_check($collator c, $bundle bundle) { int nprocs, queue_size, place, index; _Bool first = $true; $gcollate_state attr; $bundle result; nprocs = c->gcollator->nprocs; place = c->place; queue_size = c->gcollator->queue_length; for (int i = 0; i < queue_size; i++) { if (_ATTR_USED(c->gcollator->queue[i], place) && !_CHECKED(c->gcollator->queue[i], place)) { first = $false; attr = c->gcollator->queue[i]; index = i; break; } } if (first) { $scope gcollate_scope = $scopeof(*c->gcollator); attr = ($gcollate_state)$malloc(gcollate_scope, sizeof(struct _gcollate_state)); attr->status = ((int[nprocs])$lambda(int i)UNCHECKED); attr->attribute = bundle; // It must use insertion instead of append given the existance of // garbage section: $seq_insert(&c->gcollator->queue, c->gcollator->queue_length, &attr, 1); index = c->gcollator->queue_length++; } attr->status[place] = CHECKED; result = attr->attribute; // if complete, dequeue: _Bool complete = $true; for (int i = 0; i < nprocs; i++) if (!_CHECKED(attr, i)) { complete = $false; break; } if (complete) { $free(c->gcollator->queue[index]); $seq_remove(&c->gcollator->queue, index, NULL, 1); c->gcollator->queue_length--; } return result; } $atomic_f $state_f $state * $collate_get_state($collate_state state){ return &state.gstate->state; } #endif