| 1 | #ifndef _CIVL_COLLATE_
|
|---|
| 2 | #define _CIVL_COLLATE_
|
|---|
| 3 |
|
|---|
| 4 | #include<collate.cvh>
|
|---|
| 5 | #include<civlc.cvh>
|
|---|
| 6 | #include<seq.cvh>
|
|---|
| 7 |
|
|---|
| 8 | // Status of a collate state for a process:
|
|---|
| 9 | #define NONARRIVED 0
|
|---|
| 10 | #define ARRIVED 1
|
|---|
| 11 | #define DEPARTED 2
|
|---|
| 12 | // Status of a collate attribute checker:
|
|---|
| 13 | #define UNCHECKED 3
|
|---|
| 14 | #define CHECKED 4
|
|---|
| 15 | // The $gcollate_state whose state field is significant:
|
|---|
| 16 | #define _STATE_USED(gstate, place) \
|
|---|
| 17 | ((gstate)->status[(place)] >= NONARRIVED && (gstate)->status[(place)] <= DEPARTED)
|
|---|
| 18 | #define _ARRIVED(gstate, place) \
|
|---|
| 19 | ((gstate)->status[(place)] == ARRIVED || (gstate)->status[(place)] == DEPARTED)
|
|---|
| 20 | #define _DEPARTED(gstate, place) \
|
|---|
| 21 | ((gstate)->status[(place)] == DEPARTED)
|
|---|
| 22 | // The $gcollate_state whose attribute field is significant:
|
|---|
| 23 | #define _ATTR_USED(gstate, place) \
|
|---|
| 24 | ((gstate)->status[(place)] >= UNCHECKED && (gstate)->status[(place)] <= CHECKED)
|
|---|
| 25 | #define _CHECKED(gstate, place) \
|
|---|
| 26 | ((gstate)->status[(place)] == CHECKED)
|
|---|
| 27 |
|
|---|
| 28 | /******************* Definition of datatypes: *******************/
|
|---|
| 29 | struct _gcollator {
|
|---|
| 30 | // The number of participants of a _gcollator object:
|
|---|
| 31 | int nprocs;
|
|---|
| 32 | // $proc array:
|
|---|
| 33 | $proc procs[];
|
|---|
| 34 | // The length of the queue of collation states:
|
|---|
| 35 | int queue_length;
|
|---|
| 36 | // The queue of collation states. Note that elements in this queue
|
|---|
| 37 | // are references to collate state objects:
|
|---|
| 38 | $gcollate_state queue[];
|
|---|
| 39 | _Bool entered[];
|
|---|
| 40 | };
|
|---|
| 41 |
|
|---|
| 42 | struct _collator {
|
|---|
| 43 | // The place of the process in a _gcollator who holds this handle:
|
|---|
| 44 | int place;
|
|---|
| 45 | // A handle to the _gcollator object:
|
|---|
| 46 | $gcollator gcollator;
|
|---|
| 47 | };
|
|---|
| 48 |
|
|---|
| 49 | // TODO: don't use one queue for collate states and collective attributes
|
|---|
| 50 | struct _gcollate_state {
|
|---|
| 51 | // An array of markers for whether a process has already arrived
|
|---|
| 52 | // this entry or departed:
|
|---|
| 53 | int status[];
|
|---|
| 54 | // collate $state:
|
|---|
| 55 | $state state;
|
|---|
| 56 | // A customized attribute, it can be used to check some attribute
|
|---|
| 57 | // that must be equivalent collectively at some point:
|
|---|
| 58 | $bundle attribute;
|
|---|
| 59 | // Note: for one $gcollate_state object, state and attribute cannot
|
|---|
| 60 | // be in use at the same time.
|
|---|
| 61 | };
|
|---|
| 62 |
|
|---|
| 63 | struct _collate_state {
|
|---|
| 64 | // The place of the process in a _gcollator who holds this handle:
|
|---|
| 65 | int place;
|
|---|
| 66 | // A reference to a _gcollate_state:
|
|---|
| 67 | $gcollate_state gstate;
|
|---|
| 68 | };
|
|---|
| 69 |
|
|---|
| 70 | /******************* Function definitions ***************************/
|
|---|
| 71 | /* Creates an global collator object. The object is allocated in the
|
|---|
| 72 | * given scope. It returns a handle $gcollator to the object.
|
|---|
| 73 | *
|
|---|
| 74 | * scope : The scope where the object is allocated.
|
|---|
| 75 | * nprocs: The number of processes included in the global collator.
|
|---|
| 76 | */
|
|---|
| 77 | $atomic_f $gcollator $gcollator_create($scope scope, int nprocs) {
|
|---|
| 78 | $gcollator gcollator = ($gcollator)$malloc(scope, sizeof(struct _gcollator));
|
|---|
| 79 |
|
|---|
| 80 | gcollator->nprocs = nprocs;
|
|---|
| 81 | gcollator->procs = ($proc[nprocs])$lambda(int i) $proc_null;
|
|---|
| 82 | gcollator->queue_length = 0;
|
|---|
| 83 | $seq_init(&(gcollator->queue), 0, NULL);
|
|---|
| 84 | gcollator->entered=(_Bool[nprocs])$lambda(int i) $false;
|
|---|
| 85 | return gcollator;
|
|---|
| 86 | }
|
|---|
| 87 |
|
|---|
| 88 | /* Free a $gcollator object. All gcollate states referred by this
|
|---|
| 89 | * object will be freed as well.
|
|---|
| 90 | *
|
|---|
| 91 | * gc : a handle to the $gcollator object
|
|---|
| 92 | */
|
|---|
| 93 | $atomic_f void $gcollator_destroy($gcollator gc) {
|
|---|
| 94 | $assert(gc->queue_length == 0, "There is at least an entry in the gcollator "
|
|---|
| 95 | "(referred by %p) hasn't been departed by all processes.", gc);
|
|---|
| 96 | int garbage_size = $seq_length(&gc->queue);
|
|---|
| 97 |
|
|---|
| 98 | for (int i = 0; i < garbage_size; i++)
|
|---|
| 99 | $free(gc->queue[i]);
|
|---|
| 100 | $free(gc);
|
|---|
| 101 | }
|
|---|
| 102 |
|
|---|
| 103 | /* Creates a local collate object. The object is allocated in the
|
|---|
| 104 | * given scope. A local collate represents a local handle to part of
|
|---|
| 105 | * the global collate object.
|
|---|
| 106 | *
|
|---|
| 107 | * gcollator: The handle to the global collate object.
|
|---|
| 108 | * scope: The scope where the local collate object will be allocated.
|
|---|
| 109 | * place: The place of the participant process in the global collate
|
|---|
| 110 | * object. Each participant process has an unique place. Place shall
|
|---|
| 111 | * be greater than or equal to 0 and less than the number of all
|
|---|
| 112 | * participants.
|
|---|
| 113 | */
|
|---|
| 114 | $atomic_f $collator $collator_create($gcollator gcollator, $scope scope, int place) {
|
|---|
| 115 | $collator collator = ($collator)$malloc(scope, sizeof(struct _collator));
|
|---|
| 116 |
|
|---|
| 117 | collator->place = place;
|
|---|
| 118 | collator->gcollator = gcollator;
|
|---|
| 119 | collator->gcollator->procs[place] = $self;
|
|---|
| 120 | return collator;
|
|---|
| 121 | }
|
|---|
| 122 |
|
|---|
| 123 | $atomic_f void $collator_enters($collator collator){
|
|---|
| 124 | collator->gcollator->entered[collator->place] = $true;
|
|---|
| 125 | }
|
|---|
| 126 |
|
|---|
| 127 | $atomic_f _Bool $collator_has_entered($collator collator, $range range){
|
|---|
| 128 | $domain(1) dom=($domain(1)){range};
|
|---|
| 129 | _Bool *entered=collator->gcollator->entered;
|
|---|
| 130 |
|
|---|
| 131 | $for(int i: dom){
|
|---|
| 132 | if(!entered[i])
|
|---|
| 133 | return $false;
|
|---|
| 134 | }
|
|---|
| 135 | return $true;
|
|---|
| 136 | }
|
|---|
| 137 |
|
|---|
| 138 | $atomic_f $collate_state $collate_arrives($collator c, $scope scope) {
|
|---|
| 139 | int place = c->place;
|
|---|
| 140 | int queue_size, nprocs;
|
|---|
| 141 | _Bool first = $true;
|
|---|
| 142 | $gcollate_state * queue;
|
|---|
| 143 | $gcollate_state gcollate_state;
|
|---|
| 144 | $collate_state result = {0, NULL};
|
|---|
| 145 |
|
|---|
| 146 | nprocs = c->gcollator->nprocs;
|
|---|
| 147 | queue = c->gcollator->queue;
|
|---|
| 148 | queue_size = c->gcollator->queue_length;
|
|---|
| 149 |
|
|---|
| 150 | // Looking for the first unarrived collate state. If no such a collate
|
|---|
| 151 | // state, create and insert one:
|
|---|
| 152 | for (int i = 0; i < queue_size; i++)
|
|---|
| 153 | if (_STATE_USED(queue[i], place) && !_ARRIVED(queue[i], place)) {
|
|---|
| 154 | gcollate_state = queue[i];
|
|---|
| 155 | first = $false;
|
|---|
| 156 | break;
|
|---|
| 157 | }
|
|---|
| 158 | if (first) {
|
|---|
| 159 | _Bool initValue = $false;
|
|---|
| 160 | $scope root = $scopeof(*c->gcollator);
|
|---|
| 161 |
|
|---|
| 162 | gcollate_state = ($gcollate_state)$malloc(root, sizeof(struct _gcollate_state));
|
|---|
| 163 | gcollate_state->status = (int[nprocs])$lambda(int i) NONARRIVED;
|
|---|
| 164 | gcollate_state->state = $state_null;
|
|---|
| 165 | // It must use insertion instead of append given the existance of
|
|---|
| 166 | // garbage section:
|
|---|
| 167 | $seq_insert(&c->gcollator->queue, c->gcollator->queue_length, &gcollate_state, 1);
|
|---|
| 168 | c->gcollator->queue_length++;
|
|---|
| 169 | }
|
|---|
| 170 | result.place = place;
|
|---|
| 171 | result.gstate = gcollate_state;
|
|---|
| 172 | $collate_snapshot(result, nprocs, scope);
|
|---|
| 173 | gcollate_state->status[place] = ARRIVED;
|
|---|
| 174 | return result;
|
|---|
| 175 | }
|
|---|
| 176 |
|
|---|
| 177 | $atomic_f void $collate_departs($collator c, $collate_state cs) {
|
|---|
| 178 | int nprocs, queue_size, place, index;
|
|---|
| 179 | $gcollate_state *queue;
|
|---|
| 180 | $gcollate_state gcollate_state = cs.gstate;
|
|---|
| 181 |
|
|---|
| 182 | place = c->place;
|
|---|
| 183 | nprocs = c->gcollator->nprocs;
|
|---|
| 184 | queue_size = c->gcollator->queue_length;
|
|---|
| 185 | queue = c->gcollator->queue;
|
|---|
| 186 | $assert(_STATE_USED(gcollate_state, place)
|
|---|
| 187 | && _ARRIVED(gcollate_state, place),
|
|---|
| 188 | "Only an arrived collate state can be freed");
|
|---|
| 189 | // Mark the refered gcollate state as departed:
|
|---|
| 190 | gcollate_state->status[place] = DEPARTED;
|
|---|
| 191 |
|
|---|
| 192 | /* For a gcollate state which has been departed by all processes, it
|
|---|
| 193 | * will be move to the tail of queue. Then there forms a garbage
|
|---|
| 194 | * section at the tail of the queue whose length will increase. The
|
|---|
| 195 | * reason of keeping these departed gcollate states is they still
|
|---|
| 196 | * can be read by processes after they are marked as departed. The garbage
|
|---|
| 197 | * section will be freed when the gcollator being destroyed.
|
|---|
| 198 | *
|
|---|
| 199 | * The queue length will not reveal the existance of the garbage
|
|---|
| 200 | * section, i.e. queue length will decrease by 1 after a gcollate
|
|---|
| 201 | * state being moved to the garbage section.
|
|---|
| 202 | */
|
|---|
| 203 |
|
|---|
| 204 | /* Scan the queue in the gcollator to remove the entry that just has
|
|---|
| 205 | * been marked as DEPARTED by all processes (all-departed). At this
|
|---|
| 206 | * point, there is at most one all-departed entry in the queue. */
|
|---|
| 207 | index = -1;
|
|---|
| 208 | for (int i = 0; i < queue_size; i++) {
|
|---|
| 209 | _Bool isGarbage = $true;
|
|---|
| 210 |
|
|---|
| 211 | for (int j = 0; j < nprocs; j++)
|
|---|
| 212 | if (!_DEPARTED(queue[i], j)) {
|
|---|
| 213 | isGarbage = $false;
|
|---|
| 214 | break;
|
|---|
| 215 | }
|
|---|
| 216 | if (isGarbage) {
|
|---|
| 217 | index = i;
|
|---|
| 218 | break;
|
|---|
| 219 | }
|
|---|
| 220 | }
|
|---|
| 221 | if (index >= 0) {
|
|---|
| 222 | $seq_remove(&c->gcollator->queue, index, NULL, 1);
|
|---|
| 223 | $seq_append(&c->gcollator->queue, &gcollate_state, 1);
|
|---|
| 224 | c->gcollator->queue_length--;
|
|---|
| 225 | }
|
|---|
| 226 | // set the gcollate state reference to NULL so that the process
|
|---|
| 227 | // cannot access it after the call of departs():
|
|---|
| 228 | cs.gstate = NULL;
|
|---|
| 229 | }
|
|---|
| 230 |
|
|---|
| 231 | $atomic_f $bundle $collate_check($collator c, $bundle bundle) {
|
|---|
| 232 | int nprocs, queue_size, place, index;
|
|---|
| 233 | _Bool first = $true;
|
|---|
| 234 | $gcollate_state attr;
|
|---|
| 235 | $bundle result;
|
|---|
| 236 |
|
|---|
| 237 | nprocs = c->gcollator->nprocs;
|
|---|
| 238 | place = c->place;
|
|---|
| 239 | queue_size = c->gcollator->queue_length;
|
|---|
| 240 | for (int i = 0; i < queue_size; i++) {
|
|---|
| 241 | if (_ATTR_USED(c->gcollator->queue[i], place)
|
|---|
| 242 | && !_CHECKED(c->gcollator->queue[i], place)) {
|
|---|
| 243 | first = $false;
|
|---|
| 244 | attr = c->gcollator->queue[i];
|
|---|
| 245 | index = i;
|
|---|
| 246 | break;
|
|---|
| 247 | }
|
|---|
| 248 | }
|
|---|
| 249 |
|
|---|
| 250 | if (first) {
|
|---|
| 251 | $scope gcollate_scope = $scopeof(*c->gcollator);
|
|---|
| 252 |
|
|---|
| 253 | attr = ($gcollate_state)$malloc(gcollate_scope, sizeof(struct _gcollate_state));
|
|---|
| 254 | attr->status = ((int[nprocs])$lambda(int i)UNCHECKED);
|
|---|
| 255 | attr->attribute = bundle;
|
|---|
| 256 | // It must use insertion instead of append given the existance of
|
|---|
| 257 | // garbage section:
|
|---|
| 258 | $seq_insert(&c->gcollator->queue, c->gcollator->queue_length, &attr, 1);
|
|---|
| 259 | index = c->gcollator->queue_length++;
|
|---|
| 260 | }
|
|---|
| 261 | attr->status[place] = CHECKED;
|
|---|
| 262 | result = attr->attribute;
|
|---|
| 263 |
|
|---|
| 264 | // if complete, dequeue:
|
|---|
| 265 | _Bool complete = $true;
|
|---|
| 266 |
|
|---|
| 267 | for (int i = 0; i < nprocs; i++)
|
|---|
| 268 | if (!_CHECKED(attr, i)) {
|
|---|
| 269 | complete = $false;
|
|---|
| 270 | break;
|
|---|
| 271 | }
|
|---|
| 272 | if (complete) {
|
|---|
| 273 | $free(c->gcollator->queue[index]);
|
|---|
| 274 | $seq_remove(&c->gcollator->queue, index, NULL, 1);
|
|---|
| 275 | c->gcollator->queue_length--;
|
|---|
| 276 | }
|
|---|
| 277 | return result;
|
|---|
| 278 | }
|
|---|
| 279 |
|
|---|
| 280 | $atomic_f $state_f $state * $collate_get_state($collate_state state){
|
|---|
| 281 | return &state.gstate->state;
|
|---|
| 282 | }
|
|---|
| 283 |
|
|---|
| 284 | #endif
|
|---|