// civlc-omp.cvl: implementations of functions and types in civlc-omp.cvh #include #include #include #include #include #include #include #pragma CIVL ACSL /* Completes the definition of struct OMP_gshared given in civlc-omp.cvh. */ struct OMP_gshared { int nthreads; _Bool init[]; // which threads have joined $omp_shared shares[]; void * original; // pointer to original variable }; /* Completes the definition of struct OMP_shared given in civlc-omp.cvh. */ struct OMP_shared { $omp_gshared gshared; /* The thread id */ int tid; /* Pointer to the local copy of the shared variable. * This provides the thread's "private view" of the variable. */ void * local; /* Pointer to the local status variable */ void * status; }; /* Completes the definition of struct OMP_work_record * given in civlc-omp.cvh. */ struct OMP_work_record { int kind; // loop, barrier, sections, or single int location; // location in model of construct _Bool arrived; // has this thread arrived yet? $domain loop_domain; // full loop domain; null if not loop $domain subdomain; // tasks this thread must do // reduction operation? }; struct OMP_gteam { /* scope in which data is allocated in heap */ $scope scope; /* number of threads in team */ int nthreads; /* which threads have joined this gteam */ _Bool init[]; /* work queues. Length nthreads. For each thread, * a FIFO queue of work records */ /* work queues. Width nthreads. For each worksharing region, * one entry of size nthreads is added. */ $omp_work_record work[][]; /* the shared object data. */ $omp_gshared shared[]; $gbarrier gbarrier; /* r/w sets for each thread */ $mem read_sets[]; $mem write_sets[]; }; struct OMP_team { $omp_gteam gteam; $scope scope; int tid; $omp_shared shared[]; $barrier barrier; }; struct OMP_HELPER_SIGNAL { int value; }; /* *********************** Functions *********************** */ $atomic_f $omp_gteam $omp_gteam_create($scope scope, int nthreads) { $omp_gteam result = ($omp_gteam)$malloc(scope, sizeof(struct OMP_gteam)); _Bool f = $false; $mem empty = $mem_empty(); //$seq_init(&empty, 0, NULL); result->scope = scope; result->nthreads = nthreads; $seq_init(&result->init, nthreads, &f); $seq_init(&result->work, 0, NULL); $seq_init(&result->shared, 0, NULL); $seq_init(&result->read_sets, nthreads, &empty); $seq_init(&result->write_sets, nthreads, &empty); result->gbarrier = $gbarrier_create(scope, nthreads); return result; } $atomic_f void $omp_gteam_destroy($omp_gteam gteam) { int length = $seq_length(>eam->work); $assert(length == 0, "There are still %d queued worksharing events.", length); $free(gteam->gbarrier); $free(gteam); } /* creates new local team object for a specific thread. */ $atomic_f $omp_team $omp_team_create($scope scope, $omp_gteam gteam, int tid) { $omp_team result = ($omp_team)$malloc(scope, sizeof(struct OMP_team)); $assert(!gteam->init[tid], "Thread %d has already joined gteam", tid); gteam->init[tid] = $true; result->gteam = gteam; result->scope = scope; result->tid = tid; $seq_init(&result->shared, 0, NULL); result->barrier = $barrier_create(scope, gteam->gbarrier, tid); return result; } /* destroys the local team object */ $atomic_f void $omp_team_destroy($omp_team team) { /*int numShared = $seq_length(&team->shared); for(int i = 0; i < numShared; i++){ $free(team->shared[i]); }*/ $free(team->barrier); $free(team); } /* creates new global shared object, associated to the given * global team. A pointer to the shared variable that this * object corresponds to is given. */ $atomic_f $omp_gshared $omp_gshared_create($omp_gteam gteam, void *original) { $omp_gshared result = ($omp_gshared)$malloc(gteam->scope, sizeof(struct OMP_gshared)); _Bool f = $false; $omp_shared null = NULL; result->nthreads = gteam->nthreads; $seq_init(&result->init, gteam->nthreads, &f); result->original = original; $seq_init(&result->shares, gteam->nthreads, &null); //result->status = status; return result; } /* destroys the global shared object, copying the content * to the original variable. */ $atomic_f void $omp_gshared_destroy($omp_gshared gshared) { $free(gshared); } $atomic_f $omp_shared $omp_shared_create($omp_team team, $omp_gshared gshared, void *local, void *status) { $omp_shared result = ($omp_shared)$malloc(team->scope, sizeof(struct OMP_shared)); //void *statusRefs[]; //int numStatusRefs; int sharedLength; $assert(!gshared->init[team->tid], "Thread %d has already created its local copy for %p.\n", team->tid, gshared); result->gshared = gshared; result->tid = team->tid; result->local = local; // copies the shared data to the local copy $copy(local, gshared->original); result->status = status; // set all leaf nodes of status to FULL $set_leaf_nodes(status, FULL); //$leaf_node_ptrs(&statusRefs, status); //numStatusRefs = $seq_length(&statusRefs); //for(int i = 0; i < numStatusRefs; i++) //*((int*)(statusRefs[i])) = FULL; sharedLength = $seq_length(&team->shared); $seq_insert(&team->shared, sharedLength, &result, 1); //$seq_append(&team->shared, &result, 1); gshared->shares[team->tid] = result; return result; } $atomic_f void $omp_shared_destroy($omp_shared shared) { $free(shared); } int $omp_modified_tid($omp_shared shared, void *ref){ int tid = shared->tid; $omp_gshared gshared = shared->gshared; int nthreads = gshared->nthreads; _Bool isIdentityRef = $is_identity_ref(ref); for(int i=0; i < nthreads;i++){ if(i == tid) continue; else{ $omp_shared other = gshared->shares[i]; $omp_var_status other_status; _Bool segFault = $false; if(!$is_derefable_pointer(other)) continue; if(isIdentityRef){ segFault = $has_leaf_node_equal_to(other->status, MODIFIED); }else{ int *other_status_ref = (int*)$translate_ptr(ref, other->status); segFault = (*other_status_ref) == MODIFIED; } if(segFault) return i; } } return -1; } void $omp_read($omp_shared shared, void *result, void *ref) { $atomic{ int otherTid = $omp_modified_tid(shared, ref); if (otherTid >= 0) { void * p = $translate_ptr(ref, shared->gshared->original); $assert($false, "Thread %d can not safely read memory location %p, " "because thread %d has written to\nthat memory location and hasn't flushed yet.", shared->tid, p, otherTid); } if($is_identity_ref(ref)){ if($leaf_nodes_equal_to(shared->status, EMPTY)){ $copy(ref, shared->gshared->original); // copy shared to local $set_leaf_nodes(shared->status, FULL); } }else{ int *status_ref = (int*)$translate_ptr(ref, shared->status); int status = *status_ref; if(status == EMPTY){ void *global = $translate_ptr(ref, shared->gshared->original); $copy(ref, global); // copy shared to local *status_ref = FULL; // set status to FULL } } // read local $copy(result, ref); } } void $omp_write($omp_shared shared, void *ref, void *value) { $atomic{ int otherTid = $omp_modified_tid(shared, ref); if (otherTid >= 0) { void *p = $translate_ptr(ref, shared->gshared->original); $assert($false, "Thread %d can not safely write to memory location %p, because thread %d has\n" "written to that memory location and hasn't flushed yet.", shared->tid, p, otherTid); } $copy(ref, value); if($is_identity_ref(ref)){ $set_leaf_nodes(shared->status, MODIFIED); }else{ int *status_ref = (int*)$translate_ptr(ref, shared->status); *status_ref = MODIFIED; } } } /* Only applicable to scalar type? */ /*@ depends_on \access(shared, local, shared->gshared->original); */ $atomic_f void $omp_apply_assoc($omp_shared shared, $operation op, void *local){ void *shared_ref = $translate_ptr(local, shared->gshared->original); $apply(shared_ref, op, local, shared_ref); } $atomic_f void $omp_reduction_combine( $operation op, void *reduction_final, void *reduction_local ) { $read_set_pop(); $write_set_pop(); $apply(reduction_final, op, reduction_local, reduction_final); $read_set_push(); $write_set_push(); } void $omp_flush($omp_shared shared, void *ref) { // need to drill down into all leaf nodes of the object // being flushed... // also, it should be ok to flush a memory unit if you are not // the owner but you also have no reads or writes to that variable // TODO: assert there is at most one thread for which this memory unit has status MODIFIED; $omp_gshared gshared = shared->gshared; int tid = shared->tid; void *refs[]; int numRefs; // get all leaf node pointers $leaf_node_ptrs(&refs, ref); numRefs = $seq_length(&refs); for(int i = 0; i < numRefs; i++){ void *leaf = refs[i]; int *leaf_status = (int *)$translate_ptr(leaf, shared->status); //void *leaf_local = (int *)$translate_ptr(leaf, shared->local); void *leaf_shared = (int *)$translate_ptr(leaf, gshared->original); switch(*leaf_status){ case EMPTY: break; case MODIFIED: $copy(leaf_shared, leaf); case FULL: *leaf_status = EMPTY; $set_default(leaf); break; default: $assert(0, "Unknown omp shared variable status: %d", *leaf_status); } } } void $omp_flush_all($omp_team team) { int num_shared = $seq_length(&team->shared); for (int i=0; ishared[i]; $omp_flush(shared, shared->local); } } $atomic_f void $check_data_race($omp_team team) { int cur_tid = team->tid; $mem out_s0 = $mem_empty(); $mem out_s1 = $mem_empty(); $mem cur_mw = $write_set_pop(); $mem cur_mr = $read_set_pop(); // Update current R/W sets team->gteam->write_sets[cur_tid] = cur_mw; team->gteam->read_sets[cur_tid] = cur_mr; // Check data race. for (int tmp_tid = 0; tmp_tid < team->gteam->nthreads; tmp_tid++) { if (tmp_tid == cur_tid) continue; $mem tmp_mr = team->gteam->read_sets[tmp_tid]; $mem tmp_mw = team->gteam->write_sets[tmp_tid]; $assert($mem_no_intersect(cur_mr, tmp_mw, &out_s0, &out_s1), "Data-race detected: %p read by thread %d intersects %p written by thread %d\n", out_s0, cur_tid, out_s1, tmp_tid); $assert($mem_no_intersect(cur_mw, tmp_mr, &out_s0, &out_s1), "Data-race detected: %p read by thread %d intersects %p written by thread %d\n", out_s0, cur_tid, out_s1, tmp_tid); $assert($mem_no_intersect(cur_mw, tmp_mw, &out_s0, &out_s1), "Data-race detected: %p written by thread %d intersects %p written by thread %d\n", out_s0, cur_tid, out_s1, tmp_tid); } // Update current R/W sets team->gteam->write_sets[cur_tid] = $mem_empty(); team->gteam->read_sets[cur_tid] = $mem_empty(); $read_set_push(); $write_set_push(); } void $read_and_write_set_update($omp_team team) { team->gteam->write_sets[team->tid] = $write_set_peek(); team->gteam->read_sets[team->tid] = $read_set_peek(); } void $omp_barrier($omp_team team) { $mem cur_mw = $write_set_pop(); $mem cur_mr = $read_set_pop(); $mem out_s0 = $mem_empty(); $mem out_s1 = $mem_empty(); int cur_tid = team->tid; team->gteam->write_sets[cur_tid] = cur_mw; team->gteam->read_sets[cur_tid] = cur_mr; for (int tmp_tid = 0; tmp_tid < team->gteam->nthreads; tmp_tid++) { if (tmp_tid == cur_tid) continue; $mem tmp_mr = team->gteam->read_sets[tmp_tid]; $mem tmp_mw = team->gteam->write_sets[tmp_tid]; $assert($mem_no_intersect(cur_mr, tmp_mw, &out_s0, &out_s1), "Data-race detected: %p read by thread %d intersects %p written by thread %d\n", out_s0, cur_tid, out_s1, tmp_tid); $assert($mem_no_intersect(cur_mw, tmp_mr, &out_s0, &out_s1), "Data-race detected: %p read by thread %d intersects %p written by thread %d\n", out_s0, cur_tid, out_s1, tmp_tid); $assert($mem_no_intersect(cur_mw, tmp_mw, &out_s0, &out_s1), "Data-race detected: %p written by thread %d intersects %p written by thread %d\n", out_s0, cur_tid, out_s1, tmp_tid); } $barrier_call(team->barrier); $read_set_push(); $write_set_push(); } /** * performs the checking of workrecords when the last thread arrives the workshared region, * including: * number of workrecords in the entry should be nthreads * all workrecords are arrived * all workrecords have the same location as specified * all workrecords are of the same kind */ void $omp_check_workrecords($omp_gteam gteam, int location, $omp_worksharing_kind kind, int index){ int nthreads = gteam->nthreads; int numWorkrecords; numWorkrecords = $seq_length(>eam->work[index]); $assert(numWorkrecords == nthreads, "The %d-th workshared entry has length %d while the expected length should be %d.", index, numWorkrecords, nthreads); for(int i = 0; i < nthreads; i++){ $omp_work_record workrecord = gteam->work[index][i]; $assert(workrecord.arrived, "The workshared event of location %d for thread %d hasn't been arrived yet.", location, i); $assert(workrecord.location == location, "Wrong workshared location of thread %d: expected %d, but saw %d.", i, location, workrecord.location); $assert(workrecord.kind == kind, "Wrong workshared kind of thread %d: expected %d but saw %d", i, kind, workrecord.kind); } } /* finds the workrecord of the given location for a certain thread in a team, * stores the workrecord in the buffer provided, * returns the index of the workrecod entry in the work queue * or -1 if the workrecord is absent. * * gteam: the global team object * tid: the id of the thread * location: the location of the workrecord being searched for * result: the buffer for store the workrecord found */ int find_work_record($omp_gteam gteam, int tid, int location, $omp_work_record *result){ int numEntires = $seq_length(>eam->work); for(int i = 0; i < numEntires; i++){ $omp_work_record workrecord = gteam->work[i][tid]; if(workrecord.location == location){ *result = workrecord; return i; } } return -1; } /* checks if all threads have arrived the workrecord at the given index */ _Bool all_arrived($omp_gteam gteam, int index){ for(int tid = 0; tid < gteam->nthreads; tid++){ if(!gteam->work[index][tid].arrived) return $false; } return $true; } void arrive($omp_gteam gteam, int tid, int index, $omp_work_record workrecord, $omp_worksharing_kind kind, int location){ $assert(!workrecord.arrived, "The loop at location %d has been arrived before by thread %d!", location, tid); $assert(workrecord.kind == kind, "Wrong workshare kind of thread %d: expected LOOP", tid); (gteam->work[index][tid]).arrived = $true; if(all_arrived(gteam, index)){//this is the last thread $assert(index == 0, "Inconsistent order or worksharing events."); $omp_check_workrecords(gteam, location, kind, index); $seq_remove(>eam->work, index, NULL, 1); } } /* TODO: checks if loop_dom gets changed if the current thread is NOT the * first thread entering the loop. */ $domain $omp_arrive_loop($omp_team team, int location, $domain loop_dom, $domain_strategy strategy){ $atomic{ $omp_gteam gteam = team->gteam; int tid = team->tid; int nthreads = gteam->nthreads; int index; $omp_work_record workrecord; $assert(gteam->init[tid], "The current thread %d has not joined the gteam!", tid); if(nthreads == 1) return loop_dom; index = find_work_record(gteam, tid, location, &workrecord); if(index >= 0){//workrecord found $assert($equals(>eam->work[index][0].loop_domain, &loop_dom), "The loop domain at location %d is changed when thread %d arrives it. \n Expected %s but saw %s.", location, tid, gteam->work[index][0].loop_domain, loop_dom); arrive(gteam, tid, index, workrecord, LOOP, location); return workrecord.subdomain; }else{//workrecord not found, this should be the first thread $omp_work_record workrecords[nthreads]; $domain_decomposition decomposition; int numEntries = $seq_length(>eam->work); decomposition = $domain_partition(loop_dom, strategy, nthreads); workrecords[0].loop_domain = loop_dom; for(int i = 0; i< nthreads; i++){ workrecords[i].kind = LOOP; workrecords[i].location = location; workrecords[i].arrived = (i == tid); workrecords[i].subdomain = decomposition.subdomains[i]; } $seq_insert(>eam->work, numEntries, &workrecords, 1); return workrecords[tid].subdomain; } } } /* TODO: need strategies? */ $domain(1) $omp_arrive_sections($omp_team team, int location, int numSections){ $atomic{ $omp_gteam gteam = team->gteam; int tid = team->tid; int nthreads = gteam->nthreads; int index; $omp_work_record workrecord; $assert(gteam->init[tid], "The current thread %d has not joined the gteam!", tid); if(nthreads == 1) return ($domain(1)) {0 .. numSections - 1 # 1}; index = find_work_record(gteam, tid, location, &workrecord); if(index >= 0){//workrecord found arrive(gteam, tid, index, workrecord, SECTIONS, location); return workrecord.subdomain; }else{//workrecord not found, this should be the first thread $omp_work_record workrecords[nthreads]; int numEntries = $seq_length(>eam->work); for(int i = 0; i < nthreads; i++){ int low = i, high = numSections - 1, step = nthreads; $range range = low .. high # step; $domain(1) dom = ($domain(1)) {range}; workrecords[i].kind = SECTIONS; workrecords[i].location = location; workrecords[i].arrived = (i == tid); workrecords[i].subdomain = dom; } $seq_insert(>eam->work, numEntries, &workrecords, 1); return workrecords[tid].subdomain; } } } /* TODO: need strategies? */ int $omp_arrive_single($omp_team team, int location){ $atomic{ $omp_gteam gteam = team->gteam; int tid = team->tid; int nthreads = gteam->nthreads; int index; $omp_work_record workrecord; $assert(gteam->init[tid], "The current thread %d has not joined the gteam!", tid); if(nthreads == 1) return tid; index = find_work_record(gteam, tid, location, &workrecord); if(index >= 0){//workrecord found arrive(gteam, tid, index, workrecord, SINGLE, location); }else{//workrecord not found, this should be the first thread $omp_work_record workrecords[nthreads]; int numEntries = $seq_length(>eam->work); for(int i = 0; i < nthreads; i++){ workrecords[i].kind = SINGLE; workrecords[i].location = location; workrecords[i].arrived = (i == tid); } $seq_insert(>eam->work, numEntries, &workrecords, 1); } return 0; } } $atomic_f $omp_helper_signal $omp_helper_signal_create( int initValue ){ $omp_helper_signal result; result.value = initValue; return result; } $atomic_f void $omp_helper_signal_wait( $omp_helper_signal *signal, int waitValue ){ $read_set_push(); $write_set_push(); $when(signal->value == waitValue) signal->value = -(signal->value)-1 /* do nothing */; $read_set_pop(); $write_set_pop(); } $atomic_f void $omp_helper_signal_send( $omp_helper_signal *signal, int sendValue ){ $read_set_push(); $write_set_push(); signal->value = sendValue; $read_set_pop(); $write_set_pop(); }