/* Non-Blocking Concurrent Queue Algorithm * from Michael and Scott * https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html. * Originally from "Simple, Fast, and * Practical Non-Blocking and Blocking * Concurrent Queue Algorithms", PODC96. * * The free in the algorithm (setFree method * in Dequeue in this code) is meant to * represent a function putting the node back * on to a locally-maintained special-use free * list and not the partner to malloc. * http://blog.shealevy.com/2015/04/23/use-after-free-bug-in-maged-m-michael-and-michael-l-scotts-non-blocking-concurrent-queue-algorithm/#up1 */ #include #include #include #include #include typedef struct pointer_t pointer_t; typedef struct queue_t queue_t; typedef struct node_t node_t; typedef struct free_t free_t; struct pointer_t { node_t* ptr; int count; }; struct node_t { int value; pointer_t next; }; struct queue_t { pointer_t Head; pointer_t Tail; }; /* Structure for linked list consisting of nodes * that we want to free eventually */ struct free_t { node_t *node; free_t *next; }; /* Global list of nodes to be freed */ free_t* free_list; void initialize(queue_t *Q) { node_t *node = (node_t*)malloc(sizeof(node_t)); // why isn't this NULL? (isn't the free_list empty???) free_list=(free_t*)malloc(sizeof(free_t)); free_list->node = NULL; free_list->next = NULL; node->next.ptr = NULL; node->next.count = 0; Q->Head.ptr = Q->Tail.ptr = node; } /* WHAT DOES THIS DO???? Adds the node to the free_list */ /* free_later ? */ void setFree(node_t* freeNode) { $atomic { free_t *temp = (free_t*)malloc(sizeof(free_t)); temp->node = freeNode; temp->next = free_list->next; free_list->next = temp; } } /* what does this do ? free_all(); // deallocates all nodes in free_list */ void deallocate() { free_t *list = free_list; while (list != NULL) { free_t *tmp = list->next; free(list->node); free(list); list = tmp; } } /* ??? */ _Bool ptr_equal(pointer_t p1, pointer_t p2) { return (p1.ptr == p2.ptr) && (p1.count == p2.count); } /* ??? */ _Bool CAS(pointer_t *dest, pointer_t oldval, pointer_t newval){ $atomic { if (ptr_equal(*dest, oldval)) { *dest = newval; return true; } return false; } } void enqueue(queue_t *Q, int value) { pointer_t tail, next; node_t *node = (node_t*)malloc(sizeof(node_t)); node->value = value; node->next.ptr = NULL; while (true) { tail = Q->Tail; next = tail.ptr->next; if (ptr_equal(tail, Q->Tail)) { if (next.ptr == NULL) { if (CAS(&tail.ptr->next, next, (pointer_t){node, next.count+1})) break; } else { CAS(&Q->Tail, tail, (pointer_t){next.ptr, tail.count+1}); } } } CAS(&Q->Tail, tail, (pointer_t){node, tail.count+1}); } _Bool dequeue(queue_t *Q, int *pvalue) { pointer_t head, tail, next; while (true) { head = Q->Head; tail = Q->Tail; next = head.ptr->next; if (ptr_equal(head, Q->Head)) { if (head.ptr == tail.ptr) { if (next.ptr == NULL) return false; // Tail is falling behind. Try to advance it: CAS(&Q->Tail, tail, (pointer_t){next.ptr, tail.count+1}); } else { *pvalue = next.ptr->value; if (CAS(&Q->Head, head, (pointer_t){next.ptr, head.count+1})) break; } } } setFree(head.ptr); return true; } /******************** Tests **************************/ /* Auxiliary function to determine whether an array of * n integers is a permutation of the numbers 0..n-1. */ _Bool is_permutation(int n, int *data) { _Bool seen[n]; for (int i=0; i= n) return 0; if (seen[value]) return 0; seen[value] = 1; } return 1; } /* A single thread enqueues 10 items and then dequeues * 10 times. Checks that the dequeued sequence is the * same as the enqueued sequence. */ void sequentialTest() { int d; queue_t sq; initialize(&sq); for (int i = 0; i < 10; i++) { enqueue(&sq, i); } for (int i = 0; i < 10; i++) { _Bool result = dequeue(&sq, &d); assert(result); assert(d == i); } deallocate(); free(sq.Head.ptr); } /* n threads executing concurrently; each does one enqueue * and one dequeue. Check that the result is a permutation * of the numbers 0..n-1. */ void permuteTest(int n) { queue_t sq; int array[n]; initialize(&sq); $parfor (int i: 0 .. (n-1)) { enqueue(&sq, i); dequeue(&sq, &array[i]); } assert(is_permutation(n, array)); deallocate(); free(sq.Head.ptr); } /* Checks that a sequence of integers is obtained * by interleaving blocks of integers. * It is assumed that there are nthreads threads, * each of which generates the integers * nvals*tid, ..., nvals*(tid+1)-1. */ void assertFIFO(int nthreads, int nvals, int *data) { // for each thread, the next value you expect to see // from that thread: int expect[nthreads]; for (int tid=0; tid