source: CIVL/examples/experimental/non_blocking_queue_final.cvl@ e669378

1.23 2.0 main test-branch
Last change on this file since e669378 was 8265082, checked in by Si Li <sili@…>, 11 years ago

add non_blocking_queue_final.cvl to experimental folder

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@2635 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 6.6 KB
Line 
1/* Non-Blocking Concurrent Queue Algorithm
2 * from Michael and Scott
3 * https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html.
4 * Originally from "Simple, Fast, and
5 * Practical Non-Blocking and Blocking
6 * Concurrent Queue Algorithms", PODC96.
7 *
8 * The free in the algorithm (setFree method
9 * in Dequeue in this code) is meant to
10 * represent a function putting the node back
11 * on to a locally-maintained special-use free
12 * list and not the partner to malloc.
13 * http://blog.shealevy.com/2015/04/23/use-after-free-bug-in-maged-m-michael-and-michael-l-scotts-non-blocking-concurrent-queue-algorithm/#up1
14 */
15#include <civlc.cvh>
16#include <stdio.h>
17#include <stdbool.h>
18#include <stdlib.h>
19#include <assert.h>
20
21typedef struct pointer_t pointer_t;
22typedef struct queue_t queue_t;
23typedef struct node_t node_t;
24typedef struct freeList freeList;
25
26struct node_t;
27
28struct pointer_t {
29 node_t* ptr;
30 int count;
31};
32
33struct node_t {
34 int value;
35 pointer_t next;
36};
37
38struct queue_t {
39 pointer_t Head;
40 pointer_t Tail;
41};
42
43struct freeList{ //sepcial-use free list for putting node
44 node_t *node;
45 freeList *next;
46};
47
48freeList* list; //declare global list
49
50void initialize(queue_t *Q) {
51 node_t *node = (node_t*)malloc(sizeof(node_t));
52
53 list=(freeList*)malloc(sizeof(freeList));
54 list->node = NULL;
55 list->next = NULL;
56 node->next.ptr = NULL;
57 node->next.count = 0;
58 Q->Head.ptr = Q->Tail.ptr = node;
59}
60
61void setFree(node_t* freeNode){
62 $atomic{
63 freeList *temp = (freeList*)malloc(sizeof(freeList));
64
65 temp->node = freeNode;
66 temp->next = list->next;
67 list->next = temp;
68 }
69}
70
71void deallocate(freeList *list){
72 freeList *q;
73
74 while(list != NULL){
75 q = list->next;
76 free(list->node);
77 free(list);
78 list = q;
79 }
80}
81
82_Bool equal(pointer_t p1, pointer_t p2){
83 return (p1.ptr == p2.ptr) && (p1.count == p2.count);
84}
85
86_Bool CAS(pointer_t *dest, pointer_t oldval, pointer_t newval){
87 $atomic {
88 if (equal(*dest, oldval)) {
89 *dest = newval;
90 return true;
91 }
92 return false;
93 }
94}
95
96void enqueue(queue_t *Q, int value) {
97 pointer_t tail, next;
98 node_t *node = (node_t*)malloc(sizeof(node_t));
99 node->value = value;
100 node->next.ptr = NULL;
101
102 while (true){
103 tail = Q->Tail; // Read Tail.ptr and Tail.count together
104 next = tail.ptr->next; // Read next ptr and count fields together
105 if (equal(tail, Q->Tail)) // Are tail and next consistent?
106 // Was Tail pointing to the last node?
107 if (next.ptr == NULL){
108 // Try to link node at the end of the linked list
109 if (CAS(&tail.ptr->next, next, (pointer_t){ node, next.count + 1 }))
110 break; // **Enqueue is done. Exit loop
111 }
112 else{ // Tail was not pointing to the last node
113 // Try to swing Tail to the next node
114 CAS(&Q->Tail, tail, (pointer_t){ next.ptr, tail.count + 1 });
115 }
116 }
117 // Enqueue is done. Try to swing Tail to the inserted node
118 CAS(&Q->Tail, tail, (pointer_t){ node, tail.count + 1 });
119}
120
121_Bool dequeue(queue_t *Q, int *pvalue) {
122 pointer_t head, tail, next;
123
124 while (true){
125 head = Q->Head; // Read Head
126 tail = Q->Tail; // Read Tail
127 next = head.ptr->next; // Read Head.ptr->next
128 if (equal(head, Q->Head)) // Are head, tail, and next consistent?
129 if (head.ptr == tail.ptr){
130 if (next.ptr == NULL) // Is queue empty?
131 return false; // Queue is empty, couldn't dequeue
132 // Tail is falling behind. Try to advance it
133 CAS(&Q->Tail, tail, (pointer_t){ next.ptr, tail.count + 1 });
134 }
135 else{
136 // Read value before CAS
137 // Otherwise, another dequeue might free the next node
138 *pvalue = next.ptr->value;
139 if (CAS(&Q->Head, head, (pointer_t){ next.ptr, head.count + 1 }))
140 break;// **Dequeue is done. Exit loop
141 }
142 }
143 setFree(head.ptr); // It is safe now to "free" the old node
144 return true; // Queue was not empty, dequeue succeeded
145}
146
147/*****************************************************/
148/******************** Tests **************************/
149/*****************************************************/
150
151/* Determines whether an array of n integers is
152 * a permutation of the numbers 0..n-1. */
153_Bool is_permutation(int n, int *data) {
154 _Bool seen[n];
155
156 for (int i=0; i<n; i++)
157 seen[i] = 0;
158 for (int i=0; i<n; i++) {
159 int value = data[i];
160
161 if (value < 0 || value >= n)
162 return 0;
163 if (seen[value])
164 return 0;
165 seen[value] = 1;
166 }
167 return 1;
168}
169
170void test1() {
171 int d;
172 queue_t sq;
173
174 initialize(&sq);
175 for (int i = 0; i < 10; i++) {
176 enqueue(&sq, i);
177 }
178 for (int i = 0; i < 10; i++) {
179 _Bool result = dequeue(&sq, &d);
180
181 assert(result);
182 assert(d == i);
183 }
184 deallocate(list);
185 free(sq.Head.ptr);
186}
187
188void test2(int n) { //Test whether dequeued array is a permutation
189 queue_t sq;
190 int array[n];
191
192 initialize(&sq);
193 $parfor(int i: 0 .. (n-1)) {
194 enqueue(&sq, i);
195 dequeue(&sq, &array[i]);
196 }
197 assert(is_permutation(n, array));
198 deallocate(list);
199 free(sq.Head.ptr);
200}
201
202void test3(int t, int n) { //t is the number of threads,
203 int RESULT[t*n]; //n is the number of enqueued values
204 int R[t][n]; //global array to store scaned result
205 int counter[t]; //global array, each thread has a counter;
206 queue_t sq;
207
208 void thread(int tid){
209 for(int i=0; i<n; i++) {
210 enqueue(&sq, i+1+tid*n);
211 }
212 }
213 void scan(int x){ // helper method for assertFIFO()
214 int tid = 0; // thread id
215
216 tid = (x-1)/n; //calculate the id of thread;
217 R[tid][counter[tid]++] = x; //store scaned elements
218 }
219 void assertFIFO(int *data) { // assert method for testing FIFO
220 for(int i=0; i<t*n; i++)
221 scan(data[i]); //scan dequeued RESULT[t*n];
222 for(int i=0; i<t; i++) //assert FIFO for each thread
223 for(int j=0; j<n-1; j++)
224 assert(R[i][j] < R[i][j+1]);
225 }
226 for(int i=0; i<t; i++) //initialize global R[t][n]
227 for(int j=0; j<n; j++)
228 R[i][j] = 0;
229 for(int i=0; i<t;i++) ///initialize global counter[t]
230 counter[i] = 0;
231 for(int i=0; i<t*n; i++)
232 RESULT[i] = 0;
233
234 initialize(&sq);
235 $parfor(int i: 0 .. t-1)
236 thread(i);
237 for(int i=0; i<t*n; i++)
238 dequeue(&sq, &RESULT[i]);
239 for(int i=0; i<t*n; i++){ //print dequeued result
240 if(i%(t*n)==0)
241 printf("dequeue: ");
242 printf("%d\t", RESULT[i]);
243 }
244 printf("\n");
245 assertFIFO(RESULT);
246 deallocate(list);
247 free(sq.Head.ptr);
248}
249
250void main() {
251 test1();
252 test2(2);
253 test3(2, 3);
254}
255
256
257
258
259
260
Note: See TracBrowser for help on using the repository browser.