/******************************************************************************* * (C) Xenadyne Inc, 2001. All Rights Reserved * * Permission to use, copy, modify and distribute this software for * any purpose and without fee is hereby granted, provided that the * above copyright notice appears in all copies. * * XENADYNE INC DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, * INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. * IN NO EVENT SHALL XENADYNE BE LIABLE FOR ANY SPECIAL, INDIRECT OR * CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM THE * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. * * File: nft_queue.c * * Description: * * Provides synchronized message queues for use in threaded applications. * Please refer to tmi_queue.h for a detailed description of the package. * * TODO * ==== * * We could add an _apply() method to enable users to safely iterate over * the queue. It may also be useful to provide _search() and _delete() methods. * A problem with _delete() is that it cannot be called from _apply() since it * must lock the list, which the _apply() call already has done. We could * define a _filter() method that would delete items as indicated by a * caller-provided filter function. * * $Id: nft_queue.c,v 1.3 2005/10/09 22:12:57 drechsau Exp $ ******************************************************************************* */ #include #include #include #ifndef WIN32 #include #include #endif #include #include /* struct nft_queue * * The queued items are stored in a "circular array", meaning that * we access it using modulo arithmetic. This array is allocated * initially to the minimum size defined below, and grows by doubling. * Whenever the queue is less than one quarter full, the array size * is halved to free memory. */ #define MIN_SIZE 8 struct nft_queue { unsigned valid; /* Was Q initialized/destroyed? */ int first; /* First item in array. */ int next; /* Next free item in array. */ int size; /* Size of array. */ int limit; /* Maximum number of items. */ void ** array; /* Array holding queued items. */ pthread_mutex_t mutex; /* Lock to protect queue. */ unsigned nwait; /* Number of waiting threads. */ pthread_cond_t cond; /* Signals waiting threads. */ void (*destroy_func)(void * user_data); }; /* Live nft_q's should have valid equal to this. */ #define NFTQ_VALID 0x2468ACDC /* Here are some macros to help us manage the circular array. * Note that an empty queue is indicated by setting q->first = -1, * and this also requires that q->next = 0; * Also note the distinction between FULL(q) and LIMIT(q). */ #define EMPTY(q) (q->first < 0) #define FULL(q) (q->next == q->first) #define NEXT(i) ((i + 1) % q->size) #define COUNT(q) (EMPTY(q) ? 0 : ((q->next + ((q->next <= q->first) ? q->size : 0)) - q->first)) #define LIMIT(q) ((q->limit > 0) && (COUNT(q) >= q->limit)) #define VALID(q) ((q != NULL) && (q->valid == NFTQ_VALID)) /* Define when to grow and shrink the queue's array. * Don't grow past the queue limit, if there is one. * We shrink the array by halves, but only when the count * has dropped to a quarter of the size, to prevent thrashing. */ #define GROW(q) (FULL(q) && (!q->limit || (q->size < q->limit))) #define SHRINK(q) ((COUNT(q) < (q->size/4)) && (MIN_SIZE <= (q->size/2))) /*---------------------------------------------------------------------- * * nft_queue_validate() * * Validate the consistency of the queue. * *---------------------------------------------------------------------- */ static int queue_validate( nft_queue_t * q) { assert(q->first < q->size); assert(q->next < q->size); assert((q->first != -1) || (q->next == 0)); /* This test assumes that all queue entries are nonnull, which is not * true in general. It may be a useful check for certain tests, though. */ #if 0 if (!EMPTY(q)) { int i = q->first; do { assert(q->array[i] != NULL); i = NEXT(i); } while (i != q->next); } #endif return 1; } /*---------------------------------------------------------------------- * * queue_grow() - Allocate more space for q->array. * Returns 0 on success, else ENOMEM on malloc failure. * *---------------------------------------------------------------------- */ static int queue_grow(nft_queue_t * q) { /* This has to be void** for pointer arithmetic to work! */ void ** new; /* Double the size of the array - the logic below assumes this. */ int nsize = 2 * q->size; assert(queue_validate(q)); assert(GROW(q)); /* Don't let new size exceed max signed integer. */ if ((nsize < 0) || ((new = realloc(q->array, nsize * sizeof(void*))) == NULL)) return ENOMEM; /* If the items are "wrapped" we need to move the tail (the part of the * queue that wrapped around to the start of array) to the end of the head. * We are sure the new queue won't wrap, since we have doubled the array size. */ if (q->next <= q->first) { /* Copy the items in new[0...next-1] to new[size..(size+next-1)]. */ memcpy(new + q->size, new, q->next * sizeof(void*)); q->next += q->size; } q->array = new; q->size = nsize; return 0; } /*---------------------------------------------------------------------- * * queue_shrink() - Reduce the space allocated for the array. * *---------------------------------------------------------------------- */ static void queue_shrink(nft_queue_t * q) { int count = COUNT(q); assert(queue_validate(q)); assert(SHRINK(q)); if (!EMPTY(q)) { /* Rearrange the queue items to fit in the smaller area. * If the items aren't wrapped, move them to the start of array. */ if (q->first < q->next) { memmove(q->array, q->array + q->first, count * sizeof(void*)); } else { /* When wrapped, shift the tail back, and the head to the front of array. * This is safe because the queue is less than one quarter full. */ memmove(q->array + (q->size - q->first), q->array, q->next * sizeof(void*)); memmove(q->array, q->array + q->first, (q->size - q->first) * sizeof(void*)); } q->first = 0; q->next = count; } q->size /= 2; q->array = realloc(q->array, q->size * sizeof(void*)); assert(COUNT(q) == count); } /*---------------------------------------------------------------------- * * queue_cleanup() - cancellation cleanup handler. * *---------------------------------------------------------------------- */ static void queue_cleanup(void * arg) { nft_queue_t * q = arg; q->nwait--; /* If queue was shutdown while we were waiting, and we are * the last waiter, signal the nft_queue_shutdown() thread, * which will block until all waiters have left the queue. */ if (!VALID(q) && (q->nwait == 0)) pthread_cond_broadcast(&q->cond); pthread_mutex_unlock(&q->mutex); } /*---------------------------------------------------------------------- * * nft_queue_create() * * Allocate the base struct which contains head and tail list pointers, * and various other data. Returns queue pointer to the caller, * or NULL if malloc fails. * *---------------------------------------------------------------------- */ nft_queue_t * nft_queue_create(int limit, void (*destroy_func)(void *)) { nft_queue_t * q = malloc(sizeof(nft_queue_t)); void * b = malloc(MIN_SIZE * sizeof(void*)); if (!q || !b || (limit < 0)) return NULL; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init (&q->cond, NULL); q->first = -1; q->next = 0; q->size = MIN_SIZE; q->limit = limit; q->array = b; q->nwait = 0; q->destroy_func = destroy_func; q->valid = NFTQ_VALID; return q; } /*---------------------------------------------------------------------- * * nft_queue_destroy() * * Destroy the list and any queued items if the destroy_func is set. * *---------------------------------------------------------------------- */ void nft_queue_destroy( nft_queue_t * q) { int rc; /* If the destroy_func is set, apply it to each item in the queue. * The iteration logic is funky because when the queue is full, * q->first == q->next. */ if (q->destroy_func && !EMPTY(q)) { int i = q->first; do { q->destroy_func(q->array[i]); i = NEXT(i); } while (i != q->next); } rc = pthread_mutex_destroy(&q->mutex); assert(rc == 0); rc = pthread_cond_destroy (&q->cond); assert(rc == 0); free(q->array); free(q); } /*---------------------------------------------------------------------- * * nft_queue_append() - Add one entry on the end of the queue. * * This function will block indefinitely if the queue limit is exceeded. * * Returns zero on success, otherwise: * EINVAL - not a valid queue. * ENOMEM - malloc failed * *---------------------------------------------------------------------- */ int nft_queue_append(nft_queue_t * q, void * item) { return nft_queue_append_wait(q, item, -1); } /*---------------------------------------------------------------------- * * nft_queue_append_wait() - Add one entry on the end of the queue. * * This function will wait for up to timeout seconds if the queue * limit has been reached, or indefinitely when timeout is -1. * The queue limit is ignored if timeout is zero. * * Returns zero on success, otherwise: * EINVAL - not a valid queue. * ENOMEM - malloc failed * ETIMEDOUT - timeout reached * *---------------------------------------------------------------------- */ int nft_queue_append_wait(nft_queue_t * q, void * item, int timeout) { int result = 0; int rc; if (!VALID(q)) return EINVAL; rc = pthread_mutex_lock(&q->mutex); /* Lock the queue */ assert(rc == 0); assert((q->first != -1) || (q->next == 0)); /* If a limit is set, and the limit has been reached, and timeout * is nonzero, wait for an item to be popped from the list. */ if (VALID(q) && LIMIT(q) && timeout) { /* Wait for an item to be popped. */ q->nwait++; /* Push a cancellation cleanup handler in case we get cancelled. */ pthread_cleanup_push(queue_cleanup, q); /* If timeout is positive, do a timed wait, else wait indefinitely. */ if (timeout > 0) { struct timespec abstime; gettime(&abstime); abstime.tv_sec += timeout; /* pthread_cond_timed_wait returns ETIMEDOUT on timeout. */ while (q->valid && LIMIT(q)) if ((rc = pthread_cond_timedwait(&q->cond, &q->mutex, &abstime)) != 0) break; assert(rc == 0 || rc == ETIMEDOUT); } else if (timeout < 0) { while (q->valid && LIMIT(q)) if ((rc = pthread_cond_wait(&q->cond, &q->mutex)) != 0) break; assert(rc == 0); } /* Pop cleanup without executing it. */ pthread_cleanup_pop(0); /* Done waiting. */ q->nwait--; /* If queue was destroyed while we were waiting, and we are * the last waiter, signal the nft_queue_destroy() thread. */ if (!q->valid && !q->nwait) { rc = pthread_cond_broadcast(&q->cond); assert(rc == 0); } } if (VALID(q)) { /* Attempt to grow the queue if necessary. */ if (GROW(q) && ((rc = queue_grow(q)) != 0)) { pthread_mutex_unlock(&q->mutex); /* Unlock the queue */ return rc; } /* Now the queue should not be full unless it is at the limit. */ assert(!FULL(q) || LIMIT(q)); /* If there is no limit, or the limit is not reached, append item. */ if (!LIMIT(q)) { q->array[q->next] = item; q->next = NEXT(q->next); /* Is queue no longer empty? */ if (EMPTY(q)) { q->first = 0; assert(q->next == 1); } /* If threads are waiting in nft_queue_pop_wait, wake them. * Since the condition is shared between append and pop threads, * we need to do a broadcast. */ if (q->nwait > 0) { rc = pthread_cond_broadcast(&q->cond); assert(rc == 0); } } else result = ETIMEDOUT; } else result = EINVAL; assert((q->first != -1) || (q->next == 0)); rc = pthread_mutex_unlock(&q->mutex); /* Unlock the queue */ assert(rc == 0); return result; } /*---------------------------------------------------------------------- * * nft_queue_pop() - Remove and return the head item on the queue. * * This call blocks indefinitely while the queue is empty. * *---------------------------------------------------------------------- */ void * nft_queue_pop( nft_queue_t * q) { return nft_queue_pop_wait(q, -1); } /*---------------------------------------------------------------------- * * nft_queue_pop_wait() - Remove and return the head item on the queue. * * If the queue is empty, this call will block for up to timeout * seconds, and return NULL if no item is queued in that time. * Blocks indefinitely if the timeout is -1. * *---------------------------------------------------------------------- */ void * nft_queue_pop_wait(nft_queue_t * q, int timeout) { void * item = NULL; int rc; if (!VALID(q)) return NULL; rc = pthread_mutex_lock(&q->mutex); /* Lock the queue */ assert(rc == 0); if (VALID(q) && EMPTY(q) && timeout) { /* Wait for an item to be queued. */ q->nwait++; /* Push a cancellation cleanup handler in case we get cancelled. */ pthread_cleanup_push(queue_cleanup, q); /* If timeout is positive, do a timed wait, else wait indefinitely. */ if (timeout > 0) { struct timespec abstime; gettime(&abstime); abstime.tv_sec += timeout; /* pthread_cond_timed_wait returns ETIMEDOUT on timeout. */ while (VALID(q) && EMPTY(q)) if ((rc = pthread_cond_timedwait(&q->cond, &q->mutex, &abstime)) != 0) break; assert(rc == 0 || rc == ETIMEDOUT); } else if (timeout < 0) { /* Wait indefinitely. */ while (VALID(q) && EMPTY(q)) if ((rc = pthread_cond_wait(&q->cond, &q->mutex)) != 0) break; assert(rc == 0); } /* Pop cleanup without executing it. */ pthread_cleanup_pop(0); /* Done waiting. */ q->nwait--; /* If queue was shut down while we were waiting, and we are * the last waiter, signal the nft_queue_shutdown() thread. */ if (!VALID(q) && (q->nwait == 0)) { rc = pthread_cond_broadcast(&q->cond); assert(rc == 0); } } /* If the wait timed out, the list may still be empty. */ if (VALID(q) && !EMPTY(q)) { item = q->array[q->first]; q->first = NEXT(q->first); /* If the queue appears to be FULL after popping this item, * set queue to empty state. */ if (FULL(q)) { q->first = -1; q->next = 0; } /* If there could be threads blocked in nft_queue_append_wait(), * wake them now. Since the condition is shared between append * and pop threads, we need to do a broadcast. */ if (q->nwait && q->limit) { rc = pthread_cond_broadcast(&q->cond); assert(rc == 0); } } /* If the queue is less than one quarter full, shrink it by half. */ if (SHRINK(q)) queue_shrink(q); rc = pthread_mutex_unlock(&q->mutex); /* Unlock the queue */ assert(rc == 0); return item; } /*---------------------------------------------------------------------- * * nft_queue_shutdown() * * Invalidate the queue and awaken blocked threads. * Returns when all blocked threads have released the queue. * * Returns zero - on success. * EINVAL - not a valid queue. * *---------------------------------------------------------------------- */ int nft_queue_shutdown( nft_queue_t * q) { int rc; if (!VALID(q)) return EINVAL; rc = pthread_mutex_lock(&q->mutex); assert(rc == 0); q->valid = 0; /* Mark as a dead queue. */ /* Flush any threads waiting on the queue. */ if (q->nwait > 0) { rc = pthread_cond_broadcast(&q->cond); assert(rc == 0); /* Wait for all of the threads to exit. */ while (q->nwait > 0) { rc = pthread_cond_wait(&q->cond, &q->mutex); assert(rc == 0); } } rc = pthread_mutex_unlock(&q->mutex); /* Unlock the queue */ assert(rc == 0); return 0; } /*---------------------------------------------------------------------- * * nft_queue_peek() - Return the first item on the queue. * Returns NULL on empty or invalid queue. * *---------------------------------------------------------------------- */ void * nft_queue_peek( nft_queue_t * q) { void * result = NULL; if (VALID(q)) { pthread_mutex_lock(&q->mutex); /* Lock the queue */ if (!EMPTY(q)) result = q->array[q->first]; pthread_mutex_unlock(&q->mutex); /* Unlock the queue */ } return result; } /*---------------------------------------------------------------------- * * nft_queue_count() - Return the number of items in the list. * Returns -1 for an invalid list. * *---------------------------------------------------------------------- */ int nft_queue_count( nft_queue_t * q) { int result = -1; if (VALID(q)) { pthread_mutex_lock(&q->mutex); /* Lock the queue */ result = COUNT(q); pthread_mutex_unlock(&q->mutex); /* Unlock the queue */ } return result; } /******************************************************************************* ******************************************************************************* * * TEST DRIVER * ******************************************************************************* ******************************************************************************* */ #ifdef MAIN #include #include #include #include #include /* * These strings are used for the simple tests. */ static char * Strings[] = { "and", "the", "shorter", "of", "the", "porter's", "daughters", "dips", "her", "hand", "in", "the", "deadly", "waters", NULL }; static void t1( void); static void t2( void); static void t3( void); static void t4( void); static void t5( void); static void t6( void); static void t7( void); #define BUFFSZ 120 #define NUM_WORKERS 3 #define Q_LIMIT 32 nft_queue_t *input_Q = NULL; nft_queue_t *output_Q = NULL; int countin = 0; int countout = 0; pthread_t input_thread = 0; pthread_t output_thread = 0; pthread_t worker_threads[NUM_WORKERS]; /* This is spawned as a single thread that reads lines from * standard input, and passes them to the raw message queue. */ static void * poll_input(void *arg) { char * buff; while (1) { buff = malloc(BUFFSZ); if (fgets(buff, BUFFSZ, stdin) == NULL) break; if (nft_queue_append(input_Q, buff) != 0) break; countin++; } fputs("input thread done\n", stderr); return NULL; } /* Multiple worker threads are spawned that read messages * from the raw message queue, convert them to upper case, * and append them to the output queue. */ static void * worker_thread(void * index) { while (1) { char * msg = nft_queue_pop(input_Q); char * s = msg; if (!msg) break; while ((*s = toupper(*s))) s++; if (nft_queue_append(output_Q, msg) != 0) break; } fprintf(stderr, "worker_thread[%d] done\n", (int) index); return NULL; } /* This function is spawned as a single thread that reads messages * from the output queue and prints them to standard output. */ static void * poll_output(void * arg) { while (1) { char * msg = nft_queue_pop(output_Q); if (!msg) break; free(msg); countout++; } fputs("output thread done\n", stderr); return NULL; } int main() { pthread_attr_t attr; int rc, i; /* First, do the basic single-threaded tests. */ t1(); t2(); t3(); t4(); t5(); t6(); t7(); /* * Multithreaded test - best run on a multprocessor machine. * * In this test, we create a work pipeline consisting of * two queues - an input queue and an output queue. The * input thread reads lines of text from standard input, * and appends them to the input queue. * * A number of worker threads pop strings from the input * queue, convert them to upper case, and append them to * the output queue. * * An output thread pops strings from the output queue and * prints them to standard output. The test concludes when * the user presses Ctrl-C, at which time the queues are * shutdown and then destroyed, and the program exits. */ /* Create the input and output queues. */ input_Q = nft_queue_create(Q_LIMIT, free); output_Q = nft_queue_create(Q_LIMIT, free); /* Create the input, output, and worker threads. */ pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); pthread_create(&input_thread, NULL, poll_input, 0); pthread_create(&output_thread, NULL, poll_output, 0); for (i = 0; i < NUM_WORKERS; i++) pthread_create(&worker_threads[i], NULL, worker_thread, (void *) i); /* Wait for the input thread to finish. */ pthread_join(input_thread, NULL); /* Write nulls to the input queue, and wait for workers to finish. */ for (i = 0; i < NUM_WORKERS; i++) nft_queue_append(input_Q, NULL); for (i = 0; i < NUM_WORKERS; i++) pthread_join(worker_threads[i], NULL); /* Now finish the output queue. */ rc = nft_queue_append(output_Q, NULL); assert(rc == 0); rc = pthread_join(output_thread, NULL); assert(rc == 0); assert(countin == countout); assert(nft_queue_count(input_Q) == 0); assert(nft_queue_count(output_Q) == 0); fprintf(stderr, "words in: %d words out: %d\n", countin, countout); /* Shutdown pipeline. */ rc = nft_queue_shutdown(input_Q); assert(rc == 0); rc = nft_queue_shutdown(output_Q); assert(rc == 0); /* Destroy queues. */ nft_queue_destroy(input_Q); nft_queue_destroy(output_Q); #ifdef NDEBUG fprintf(stderr, "You must recompile this test driver without NDEBUG!\n"); #else fprintf(stderr, "All tests passed.\n"); #endif exit(0); } /* * t1 - Test basic API calls. */ static void t1( void) { nft_queue_t * q; int i; fprintf(stderr, "t1 (create/destroy):"); /* Create an unlimited queue. */ q = nft_queue_create(0, free); /* With the queue empty, test the pop timeout. */ nft_queue_pop_wait(q, 1); /* Test the _append(), _count() and _peek() operations. */ for ( i = 0 ; Strings[ i] != NULL ; i++) { nft_queue_append( q, strdup( Strings[ i])); assert(nft_queue_count(q) == (i + 1)); } assert(strcmp(nft_queue_peek(q), Strings[0]) == 0); /* Set the queue limit, and test the append timeout. */ q->limit = i; assert(nft_queue_append_wait(q, "", 0) == ETIMEDOUT); /* Shutdown the queue, and verify invalid queue returns. */ nft_queue_shutdown(q); assert(nft_queue_append(q, 0) == EINVAL); assert(nft_queue_pop(q) == NULL); assert(nft_queue_peek(q) == NULL); assert(nft_queue_count(q) == -1); /* Destroy the queue. */ nft_queue_destroy(q); fprintf(stderr, "passed.\n"); } /* * t2 - Test append/pop operations. */ static void t2( void) { nft_queue_t * q; int i; void * ss; fprintf(stderr, "t2 (append/pop): "); q = nft_queue_create(0, free); for ( i = 0 ; Strings[ i] != NULL ; i++) nft_queue_append( q, strdup(Strings[i])); i = 0; while ((ss = nft_queue_pop_wait(q, 0)) != NULL) { if (strcmp(ss, Strings[i++])) { fprintf(stderr, " failed.\n"); return; } free(ss); } nft_queue_destroy( q); fprintf(stderr, " passed.\n"); } /* * t3 - Test destroy on nonempty queue. */ static void t3( void) { nft_queue_t * q; int i; fprintf(stderr, "t3 (append/destroy): "); q = nft_queue_create(0, free); for ( i = 0 ; Strings[ i] != NULL ; i++) nft_queue_append( q, strdup( Strings[ i])); nft_queue_destroy( q); fprintf(stderr, "passed.\n"); } static void * append_thread(void * arg) { nft_queue_t * q = arg; return (void*) nft_queue_append(q, "second"); } static void * pop_thread(void * arg) { nft_queue_t * q = arg; return nft_queue_pop_wait(q, -1); } /* * t4 - Test queue shutdown during append. */ static void t4( void) { pthread_t th; nft_queue_t * q; void * value; int rc; fprintf(stderr, "t4 (append/shutdown): "); q = nft_queue_create(1, NULL); /* The queue limit is one, so the thread will block. */ nft_queue_append(q, "first"); pthread_create(&th, 0, append_thread, q); sleep(1); assert(1 == q->nwait); /* Shut down the queue, and join the append_thread. */ nft_queue_shutdown(q); rc = pthread_join(th, &value); assert(0 == rc); assert(EINVAL == (int) value); assert(0 == q->nwait); nft_queue_destroy(q); fprintf(stderr, "passed.\n"); } /* * t5 - Test queue shutdown during pop. */ static void t5( void) { pthread_t th; nft_queue_t * q; void * value; int rc; fprintf(stderr, "t5(pop/shutdown): "); q = nft_queue_create(1, NULL); /* The queue is empty, so the thread will block. */ pthread_create(&th, 0, pop_thread, q); sleep(1); assert(1 == q->nwait); /* Shut down the queue, and join the pop_thread. */ nft_queue_shutdown(q); rc = pthread_join(th, &value); assert(0 == rc); assert(NULL == (int) value); assert(0 == q->nwait); nft_queue_destroy(q); fprintf(stderr, "passed.\n"); } /* * t6 - Test cancellation during append. */ static void t6( void) { #ifndef WIN32 /* no pthread_cancel() on WIN32 */ pthread_t th; nft_queue_t * q; fprintf(stderr, "t6 (append/cancel): "); q = nft_queue_create(1, NULL); /* The queue limit is one, so the thread will block. */ nft_queue_append(q, "first"); pthread_create(&th, 0, append_thread, q); sleep(1); assert(q->nwait == 1); pthread_cancel(th); sleep(1); assert(q->nwait == 0); assert(strcmp("first", (char*) nft_queue_pop(q)) == 0); nft_queue_destroy(q); fprintf(stderr, "passed.\n"); #endif /* WIN32 */ } /* * t7 - Test cancellation during pop. */ static void t7( void) { #ifndef WIN32 /* no pthread_cancel() on WIN32 */ pthread_t th; nft_queue_t * q; fprintf(stderr, "t7(pop/cancel): "); q = nft_queue_create(1, NULL); /* The queue is empty, so the thread will block. */ pthread_create(&th, 0, pop_thread, q); sleep(1); assert(q->nwait == 1); pthread_cancel(th); sleep(1); assert(q->nwait == 0); assert(nft_queue_pop_wait(q, 0) == NULL); nft_queue_destroy(q); fprintf(stderr, "passed.\n"); #endif /* WIN32 */ } #endif /* MAIN */