/*******************************************************************************
 * (C) Xenadyne Inc, 2001.	All Rights Reserved
 * 
 * Permission to use, copy, modify and distribute this software for
 * any purpose and without fee is hereby granted, provided that the 
 * above copyright notice appears in all copies.
 * 
 * XENADYNE INC DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 
 * INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS.  
 * IN NO EVENT SHALL XENADYNE BE LIABLE FOR ANY SPECIAL, INDIRECT OR 
 * CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM THE 
 * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 
 * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 
 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 * 
 * File: nft_queue.c
 *
 * Description:
 *
 * Provides synchronized message queues for use in threaded applications.
 * Please refer to tmi_queue.h for a detailed description of the package.
 * 
 * TODO
 * ====
 *
 * We could add an _apply() method to enable users to safely iterate over
 * the queue. It may also be useful to provide _search() and _delete() methods.
 * A problem with _delete() is that it cannot be called from _apply() since it
 * must lock the list, which the _apply() call already has done. We could
 * define a _filter() method that would delete items as indicated by a 
 * caller-provided filter function. 
 *
 * $Id: nft_queue.c,v 1.3 2005/10/09 22:12:57 drechsau Exp $
 *******************************************************************************
 */
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#ifndef WIN32
#include <strings.h>
#include <unistd.h>
#endif

#include <nft_gettime.h>
#include <nft_queue.h>

/* struct nft_queue
 *
 * The queued items are stored in a "circular array", meaning that
 * we access it using modulo arithmetic. This array is allocated
 * initially to the minimum size defined below, and grows by doubling.
 * Whenever the queue is less than one quarter full, the array size
 * is halved to free memory.
 */
#define MIN_SIZE 8

struct nft_queue
{
    unsigned		valid;	/* Was Q initialized/destroyed? */
    int			first;	/* First item in array.		*/
    int			next;	/* Next free item in array.	*/
    int			size;	/* Size of array.		*/
    int			limit;	/* Maximum number of items.	*/
    void	     ** array;	/* Array holding queued items.	*/

    pthread_mutex_t	mutex;	/* Lock to protect queue. 	*/

    unsigned		nwait;	/* Number of waiting threads.	*/
    pthread_cond_t	cond;	/* Signals waiting threads.	*/

    void     		(*destroy_func)(void * user_data);
};


/* Live nft_q's should have valid equal to this.
 */
#define  NFTQ_VALID 0x2468ACDC

/* Here are some macros to help us manage the circular array.
 * Note that an empty queue is indicated by setting q->first = -1,
 * and this also requires that q->next = 0;
 * Also note the distinction between FULL(q) and LIMIT(q).
 */
#define EMPTY(q) (q->first < 0)
#define FULL(q)  (q->next == q->first)
#define NEXT(i)  ((i + 1)  % q->size)
#define COUNT(q) (EMPTY(q) ? 0 : ((q->next + ((q->next <= q->first) ? q->size : 0)) - q->first))
#define LIMIT(q) ((q->limit > 0) && (COUNT(q) >= q->limit))
#define VALID(q) ((q != NULL) && (q->valid == NFTQ_VALID))

/* Define when to grow and shrink the queue's array.
 * Don't grow past the queue limit, if there is one.
 * We shrink the array by halves, but only when the count
 * has dropped to a quarter of the size, to prevent thrashing.
 */
#define GROW(q)   (FULL(q) && (!q->limit || (q->size < q->limit)))
#define SHRINK(q) ((COUNT(q) < (q->size/4)) && (MIN_SIZE <= (q->size/2)))

/*----------------------------------------------------------------------
 *
 *  nft_queue_validate()
 *
 *  Validate the consistency of the queue.
 *
 *----------------------------------------------------------------------
 */
static int
queue_validate( nft_queue_t * q)
{

    assert(q->first < q->size);
    assert(q->next  < q->size);
    assert((q->first != -1) || (q->next == 0));

    /* This test assumes that all queue entries are nonnull, which is not 
     * true in general. It may be a useful check for certain tests, though.
     */
#if 0
    if (!EMPTY(q))
    {
	int i = q->first;
	do {
	    assert(q->array[i] != NULL);
	    i = NEXT(i);
	} while (i != q->next);
    }
#endif

    return 1;
}

/*----------------------------------------------------------------------
 *
 *  queue_grow() - Allocate more space for q->array.
 *		   Returns 0 on success, else ENOMEM on malloc failure.
 *
 *----------------------------------------------------------------------
 */
static int
queue_grow(nft_queue_t * q)
{
    /* This has to be void** for pointer arithmetic to work!
     */
    void ** new;  

    /* Double the size of the array - the logic below assumes this.
     */
    int     nsize = 2 * q->size;

    assert(queue_validate(q));
    assert(GROW(q));

    /* Don't let new size exceed max signed integer.
     */
    if ((nsize < 0) ||
	((new = realloc(q->array,  nsize * sizeof(void*))) == NULL))
	return ENOMEM;

    /* If the items are "wrapped" we need to move the tail (the part of the
     * queue that wrapped around to the start of array) to the end of the head.
     * We are sure the new queue won't wrap, since we have doubled the array size.
     */
    if (q->next <= q->first)
    {
	/* Copy the items in new[0...next-1] to new[size..(size+next-1)].
	 */
	memcpy(new + q->size, new, q->next * sizeof(void*));
	q->next += q->size;
    }

    q->array = new;
    q->size  = nsize;

    return 0;
}


/*----------------------------------------------------------------------
 *
 *  queue_shrink() - Reduce the space allocated for the array.
 *
 *----------------------------------------------------------------------
 */
static void
queue_shrink(nft_queue_t * q)
{
    int count = COUNT(q);

    assert(queue_validate(q));
    assert(SHRINK(q));

    if (!EMPTY(q))
    {
	/* Rearrange the queue items to fit in the smaller area.
	 * If the items aren't wrapped, move them to the start of array.
	 */
	if (q->first < q->next)
	{
	    memmove(q->array, q->array + q->first, count * sizeof(void*));
	}
	else
	{
	    /* When wrapped, shift the tail back, and the head to the front of array.
	     * This is safe because the queue is less than one quarter full.
	     */
	    memmove(q->array + (q->size - q->first), q->array, q->next   * sizeof(void*));
	    memmove(q->array,  q->array + q->first, (q->size - q->first) * sizeof(void*));
	}

	q->first = 0;
	q->next  = count;
    }

    q->size /= 2;
    q->array = realloc(q->array, q->size * sizeof(void*));

    assert(COUNT(q) == count);
}


/*----------------------------------------------------------------------
 *
 *  queue_cleanup() - cancellation cleanup handler.
 *
 *----------------------------------------------------------------------
 */
static void
queue_cleanup(void * arg)
{
    nft_queue_t * q = arg;

    q->nwait--;

    /* If queue was shutdown while we were waiting, and we are
     * the last waiter, signal the nft_queue_shutdown() thread,
     * which will block until all waiters have left the queue.
     */
    if (!VALID(q) && (q->nwait == 0))
	pthread_cond_broadcast(&q->cond);

    pthread_mutex_unlock(&q->mutex);
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_create()
 *
 *  Allocate the base struct which contains head and tail list pointers,
 *  and various other data. Returns queue pointer to the caller,
 *  or NULL if malloc fails.
 *
 *----------------------------------------------------------------------
 */
nft_queue_t *
nft_queue_create(int limit,  void  (*destroy_func)(void *))
{
    nft_queue_t * q = malloc(sizeof(nft_queue_t));
    void        * b = malloc(MIN_SIZE * sizeof(void*));

    if (!q || !b || (limit < 0))
	return NULL;

    pthread_mutex_init(&q->mutex, NULL);
    pthread_cond_init (&q->cond,  NULL);

    q->first = -1;
    q->next  = 0;
    q->size  = MIN_SIZE;
    q->limit = limit;
    q->array = b;
    q->nwait = 0;
    q->destroy_func = destroy_func;

    q->valid = NFTQ_VALID;
    
    return q;
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_destroy()
 *
 *  Destroy the list and any queued items if the destroy_func is set.
 *
 *----------------------------------------------------------------------
 */
void
nft_queue_destroy( nft_queue_t * q)
{
    int rc;

    /* If the destroy_func is set, apply it to each item in the queue.
     * The iteration logic is funky because when the queue is full,
     * q->first == q->next.
     */
    if (q->destroy_func && !EMPTY(q))
    {
	int i = q->first;
	do {
	    q->destroy_func(q->array[i]);
	    i = NEXT(i);
	} while (i != q->next);
    }

    rc = pthread_mutex_destroy(&q->mutex);
    assert(rc == 0);
    rc = pthread_cond_destroy (&q->cond);
    assert(rc == 0);

    free(q->array);
    free(q);
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_append() - Add one entry on the end of the queue.
 *
 *  This function will block indefinitely if the queue limit is exceeded.
 *
 *  Returns zero on success, otherwise:
 *  EINVAL	- not a valid queue.
 *  ENOMEM	- malloc failed
 *
 *----------------------------------------------------------------------
 */
int
nft_queue_append(nft_queue_t * q, void * item)
{
    return nft_queue_append_wait(q, item, -1);
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_append_wait() - Add one entry on the end of the queue.
 *
 *  This function will wait for up to timeout seconds if the queue
 *  limit has been reached, or indefinitely when timeout is -1.
 *  The queue limit is ignored if timeout is zero.
 *
 *  Returns zero on success, otherwise:
 *  EINVAL	- not a valid queue.
 *  ENOMEM	- malloc failed
 *  ETIMEDOUT	- timeout reached
 *
 *----------------------------------------------------------------------
 */
int
nft_queue_append_wait(nft_queue_t * q,  void * item,  int timeout)
{
    int      result = 0;
    int      rc;

    if (!VALID(q))
	return EINVAL;

    rc = pthread_mutex_lock(&q->mutex);		/* Lock the queue */
    assert(rc == 0);

    assert((q->first != -1) || (q->next == 0));

    /* If a limit is set, and the limit has been reached, and timeout
     * is nonzero, wait for an item to be popped from the list.
     */
    if (VALID(q) && LIMIT(q) && timeout)
    {
	/* Wait for an item to be popped.
	 */
	q->nwait++;

	/* Push a cancellation cleanup handler in case we get cancelled.
	 */
	pthread_cleanup_push(queue_cleanup, q);

	/* If timeout is positive, do a timed wait, else wait indefinitely.
	 */
	if (timeout > 0)
	{
	    struct timespec abstime;
	    gettime(&abstime);
	    abstime.tv_sec += timeout;

	    /* pthread_cond_timed_wait returns ETIMEDOUT on timeout.
	     */
	    while (q->valid && LIMIT(q))
		if ((rc = pthread_cond_timedwait(&q->cond, &q->mutex, &abstime)) != 0)
		    break;
	    assert(rc == 0 || rc == ETIMEDOUT);
	}
	else if (timeout < 0)
	{
	    while (q->valid && LIMIT(q))
		if ((rc = pthread_cond_wait(&q->cond, &q->mutex)) != 0)
		    break;
	    assert(rc == 0);
	}

	/* Pop cleanup without executing it.
	 */
	pthread_cleanup_pop(0);

	/* Done waiting.
	 */
	q->nwait--;

	/* If queue was destroyed while we were waiting, and we are
	 * the last waiter, signal the nft_queue_destroy() thread.
	 */
	if (!q->valid && !q->nwait)
	{
	    rc = pthread_cond_broadcast(&q->cond);
	    assert(rc == 0);
	}
    }

    if (VALID(q))
    {
	/* Attempt to grow the queue if necessary.
	 */
	if (GROW(q) && ((rc = queue_grow(q)) != 0))
	{
	    pthread_mutex_unlock(&q->mutex); /* Unlock the queue */
	    return rc;
	}

	/* Now the queue should not be full unless it is at the limit.
	 */
	assert(!FULL(q) || LIMIT(q));

	/* If there is no limit, or the limit is not reached, append item.
	 */
	if (!LIMIT(q))
	{
	    q->array[q->next] = item;
	    q->next = NEXT(q->next);

	    /* Is queue no longer empty?
	     */
	    if (EMPTY(q))
	    {
		q->first = 0;
		assert(q->next == 1);
	    }

	    /* If threads are waiting in nft_queue_pop_wait, wake them.
	     * Since the condition is shared between append and pop threads,
	     * we need to do a broadcast.
	     */
	    if (q->nwait > 0)
	    {
		rc = pthread_cond_broadcast(&q->cond);
		assert(rc == 0);
	    }
	}
	else
	    result = ETIMEDOUT;
    }
    else
	result = EINVAL;

    assert((q->first != -1) || (q->next == 0));

    rc = pthread_mutex_unlock(&q->mutex);	/* Unlock the queue */
    assert(rc == 0);

    return result;
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_pop() - Remove and return the head item on the queue.
 *
 *  This call blocks indefinitely while the queue is empty.
 *
 *----------------------------------------------------------------------
 */
void *
nft_queue_pop( nft_queue_t * q)
{
    return nft_queue_pop_wait(q, -1);
}
	

/*----------------------------------------------------------------------
 *
 *  nft_queue_pop_wait() - Remove and return the head item on the queue.
 *
 *  If the queue is empty, this call will block for up to timeout
 *  seconds, and return NULL if no item is queued in that time.
 *  Blocks indefinitely if the timeout is -1.
 *
 *----------------------------------------------------------------------
 */
void *
nft_queue_pop_wait(nft_queue_t * q, int timeout)
{
    void * item = NULL;
    int    rc;

    if (!VALID(q))
	return NULL;

    rc = pthread_mutex_lock(&q->mutex);		/* Lock the queue */
    assert(rc == 0);

    if (VALID(q) && EMPTY(q) && timeout)
    {
	/* Wait for an item to be queued.
	 */
	q->nwait++;

	/* Push a cancellation cleanup handler in case we get cancelled.
	 */
	pthread_cleanup_push(queue_cleanup, q);

	/* If timeout is positive, do a timed wait, else wait indefinitely.
	 */
	if (timeout > 0)
	{
	    struct timespec abstime;
	    gettime(&abstime);
	    abstime.tv_sec += timeout;

	    /* pthread_cond_timed_wait returns ETIMEDOUT on timeout.
	     */
	    while (VALID(q) && EMPTY(q))
		if ((rc = pthread_cond_timedwait(&q->cond, &q->mutex, &abstime)) != 0)
		    break;
	    assert(rc == 0 || rc == ETIMEDOUT);
	}
	else if (timeout < 0)
	{
	    /* Wait indefinitely.
	     */
	    while (VALID(q) && EMPTY(q))
		if ((rc = pthread_cond_wait(&q->cond, &q->mutex)) != 0)
		    break;
	    assert(rc == 0);
	}

	/* Pop cleanup without executing it.
	 */
	pthread_cleanup_pop(0);

	/* Done waiting.
	 */
	q->nwait--;

	/* If queue was shut down while we were waiting, and we are
	 * the last waiter, signal the nft_queue_shutdown() thread.
	 */
	if (!VALID(q) && (q->nwait == 0))
	{
	    rc = pthread_cond_broadcast(&q->cond);
	    assert(rc == 0);
	}
    }

    /* If the wait timed out, the list may still be empty.
     */
    if (VALID(q) && !EMPTY(q))
    {
	item     = q->array[q->first];
	q->first = NEXT(q->first);

	/* If the queue appears to be FULL after popping this item,
	 * set queue to empty state.
	 */
	if (FULL(q))
	{
	    q->first = -1;
	    q->next  =  0;
	}

	/* If there could be threads blocked in nft_queue_append_wait(),
	 * wake them now. Since the condition is shared between append
	 * and pop threads, we need to do a broadcast.
	 */
	if (q->nwait && q->limit)
	{
	    rc = pthread_cond_broadcast(&q->cond);
	    assert(rc == 0);
	}
    }

    /* If the queue is less than one quarter full, shrink it by half.
     */
    if (SHRINK(q))
	queue_shrink(q);

    rc = pthread_mutex_unlock(&q->mutex);	/* Unlock the queue */
    assert(rc == 0);

    return item;
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_shutdown()
 *
 *  Invalidate the queue and awaken blocked threads.
 *  Returns when all blocked threads have released the queue.
 *
 *  Returns zero 	- on success.
 *  	    EINVAL	- not a valid queue.
 *
 *----------------------------------------------------------------------
 */
int
nft_queue_shutdown( nft_queue_t * q)
{
    int      rc;

    if (!VALID(q))
	return EINVAL;

    rc = pthread_mutex_lock(&q->mutex);
    assert(rc == 0);

    q->valid = 0;	/* Mark as a dead queue. */

    /* Flush any threads waiting on the queue.
     */
    if (q->nwait > 0)
    {
	rc = pthread_cond_broadcast(&q->cond);
	assert(rc == 0);

	/* Wait for all of the threads to exit.
	 */
	while (q->nwait > 0)
	{
	    rc = pthread_cond_wait(&q->cond, &q->mutex);
	    assert(rc == 0);
	}
    }

    rc = pthread_mutex_unlock(&q->mutex);	/* Unlock the queue */
    assert(rc == 0);

    return 0;
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_peek() -	Return the first item on the queue.
 *			Returns NULL on empty or invalid queue.
 *
 *----------------------------------------------------------------------
 */
void *
nft_queue_peek( nft_queue_t * q)
{
    void * result = NULL;

    if (VALID(q))
    {
	pthread_mutex_lock(&q->mutex);		/* Lock the queue */

	if (!EMPTY(q))
	    result = q->array[q->first];

	pthread_mutex_unlock(&q->mutex);	/* Unlock the queue */
    }

    return result;
}


/*----------------------------------------------------------------------
 *
 *  nft_queue_count() - Return the number of items in the list.
 *			Returns -1 for an invalid list.
 *
 *----------------------------------------------------------------------
 */
int
nft_queue_count( nft_queue_t * q)
{
    int result = -1;

    if (VALID(q))
    {
	pthread_mutex_lock(&q->mutex);		/* Lock the queue */

	result = COUNT(q);

	pthread_mutex_unlock(&q->mutex);	/* Unlock the queue */
    }

    return result;
}


/*******************************************************************************
 *******************************************************************************
 *
 *				TEST DRIVER
 *
 *******************************************************************************
 *******************************************************************************
 */
#ifdef MAIN

#include <assert.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/*
 * These strings are used for the simple tests.
 */
static char * Strings[] =
{
    "and",	"the",	"shorter",	"of",	"the",	"porter's",	"daughters",
    "dips",	"her",	"hand",		"in",	"the",	"deadly",	"waters",
    NULL
};

static void t1( void);
static void t2( void);
static void t3( void);
static void t4( void);
static void t5( void);
static void t6( void);
static void t7( void);


#define BUFFSZ		120
#define NUM_WORKERS	3
#define Q_LIMIT		32

nft_queue_t	*input_Q  = NULL;
nft_queue_t	*output_Q = NULL;

int		countin   = 0;
int		countout  = 0;

pthread_t	input_thread  = 0;
pthread_t	output_thread = 0;
pthread_t	worker_threads[NUM_WORKERS];


/* This is spawned as a single thread that reads lines from
 * standard input, and passes them to the raw message queue.
 */
static void *
poll_input(void *arg)
{
    char * buff;
    
    while (1)
    {
	buff = malloc(BUFFSZ);

	if (fgets(buff, BUFFSZ, stdin) == NULL)
	    break;

	if (nft_queue_append(input_Q, buff) != 0)
	    break;

	countin++;
    }

    fputs("input thread done\n", stderr);
    return NULL;
}


/* Multiple worker threads are spawned that read messages
 * from the raw message queue, convert them to upper case,
 * and append them to the output queue.
 */
static void *
worker_thread(void * index)
{
    while (1)
    {
	char * msg = nft_queue_pop(input_Q);
	char * s   = msg;

	if (!msg)
	    break;

	while ((*s = toupper(*s))) s++;

	if (nft_queue_append(output_Q, msg) != 0)
	    break;
    }

    fprintf(stderr, "worker_thread[%d] done\n", (int) index);
    return NULL;
}


/* This function is spawned as a single thread that reads messages
 * from the output queue and prints them to standard output.
 */
static void *
poll_output(void * arg)
{
    while (1)
    {
	char * msg = nft_queue_pop(output_Q);

	if (!msg)
	    break;

	free(msg);

	countout++;
    }

    fputs("output thread done\n", stderr);
    return NULL;
}

int
main()
{
    pthread_attr_t attr;
    int rc, i;

    /* First, do the basic single-threaded tests.
     */
    t1();
    t2();
    t3();
    t4();
    t5();
    t6();
    t7();

    /*
     * Multithreaded test - best run on a multprocessor machine.
     *
     * In this test, we create a work pipeline consisting of
     * two queues - an input queue and an output queue. The
     * input thread reads lines of text from standard input,
     * and appends them to the input queue.
     *
     * A number of worker threads pop strings from the input
     * queue, convert them to upper case, and append them to
     * the output queue.
     *
     * An output thread pops strings from the output queue and
     * prints them to standard output. The test concludes when
     * the user presses Ctrl-C, at which time the queues are
     * shutdown and then destroyed, and the program exits.
     */

    /* Create the input and output queues.
     */
    input_Q  = nft_queue_create(Q_LIMIT, free);
    output_Q = nft_queue_create(Q_LIMIT, free);

    /* Create the input, output, and worker threads.
     */
    pthread_attr_init(&attr);
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
    pthread_create(&input_thread,  NULL, poll_input,  0);
    pthread_create(&output_thread, NULL, poll_output, 0);
    for (i = 0; i < NUM_WORKERS; i++)
	pthread_create(&worker_threads[i], NULL, worker_thread, (void *) i);
    
    /* Wait for the input thread to finish.
     */
    pthread_join(input_thread, NULL);

    /* Write nulls to the input queue, and wait for workers to finish.
     */
    for (i = 0; i < NUM_WORKERS; i++)
	nft_queue_append(input_Q, NULL);
    for (i = 0; i < NUM_WORKERS; i++)
	pthread_join(worker_threads[i], NULL);

    /* Now finish the output queue.
     */
    rc = nft_queue_append(output_Q, NULL);
    assert(rc == 0);
    rc = pthread_join(output_thread, NULL);
    assert(rc == 0);
    
    assert(countin == countout);
    assert(nft_queue_count(input_Q)  == 0);
    assert(nft_queue_count(output_Q) == 0);

    fprintf(stderr, "words in: %d	words out: %d\n", countin, countout);

    /* Shutdown pipeline.
     */
    rc = nft_queue_shutdown(input_Q);
    assert(rc == 0);
    rc = nft_queue_shutdown(output_Q);
    assert(rc == 0);

    /* Destroy queues.
     */
    nft_queue_destroy(input_Q);
    nft_queue_destroy(output_Q);

#ifdef NDEBUG
    fprintf(stderr, "You must recompile this test driver without NDEBUG!\n");
#else
    fprintf(stderr, "All tests passed.\n");
#endif
    exit(0);
}

/*
 * t1 - Test basic API calls.
 */
static void
t1( void)
{
    nft_queue_t * q;
    int i;

    fprintf(stderr, "t1 (create/destroy):");

    /* Create an unlimited queue.
     */
    q = nft_queue_create(0, free);

    /* With the queue empty, test the pop timeout.
     */
    nft_queue_pop_wait(q, 1);

    /* Test the _append(), _count() and _peek() operations.
     */
    for ( i = 0 ; Strings[ i] != NULL ; i++)
    {
	nft_queue_append( q, strdup( Strings[ i]));
	assert(nft_queue_count(q) == (i + 1));
    }
    assert(strcmp(nft_queue_peek(q), Strings[0]) == 0);

    /* Set the queue limit, and test the append timeout.
     */
    q->limit = i;
    assert(nft_queue_append_wait(q, "", 0) == ETIMEDOUT);

    /* Shutdown the queue, and verify invalid queue returns.
     */
    nft_queue_shutdown(q);
    assert(nft_queue_append(q, 0) == EINVAL);
    assert(nft_queue_pop(q)       == NULL);
    assert(nft_queue_peek(q)      == NULL);
    assert(nft_queue_count(q)     == -1);
    
    /* Destroy the queue.
     */
    nft_queue_destroy(q);

    fprintf(stderr, "passed.\n");
}

/*
 * t2 - Test append/pop operations.
 */
static void
t2( void)
{
    nft_queue_t * q;
    int i;
    void * ss;

    fprintf(stderr, "t2 (append/pop): ");

    q = nft_queue_create(0, free);

    for ( i = 0 ; Strings[ i] != NULL ; i++)
	nft_queue_append( q, strdup(Strings[i]));

    i = 0;
    while ((ss = nft_queue_pop_wait(q, 0)) != NULL)
    {
	if (strcmp(ss, Strings[i++]))
	{
	    fprintf(stderr, " failed.\n");
	    return;
	}
	free(ss);
    }
    nft_queue_destroy( q);

    fprintf(stderr, " passed.\n");
}

/*
 * t3 - Test destroy on nonempty queue.
 */
static void
t3( void)
{
    nft_queue_t * q;
    int i;

    fprintf(stderr, "t3 (append/destroy): ");

    q = nft_queue_create(0, free);

    for ( i = 0 ; Strings[ i] != NULL ; i++)
	nft_queue_append( q, strdup( Strings[ i]));

    nft_queue_destroy( q);

    fprintf(stderr, "passed.\n");
}


static void *
append_thread(void * arg)
{
    nft_queue_t * q = arg;
    return (void*) nft_queue_append(q, "second");
}
			      
static void *
pop_thread(void * arg)
{
    nft_queue_t * q = arg;
    return nft_queue_pop_wait(q, -1);
}


/*
 * t4 - Test queue shutdown during append.
 */
static void
t4( void)
{
    pthread_t     th;
    nft_queue_t * q;
    void	* value;
    int           rc;

    fprintf(stderr, "t4 (append/shutdown): ");

    q = nft_queue_create(1, NULL);

    /* The queue limit is one, so the thread will block.
     */
    nft_queue_append(q, "first");
    pthread_create(&th, 0, append_thread, q);
    sleep(1);
    assert(1 == q->nwait);

    /* Shut down the queue, and join the append_thread.
     */
    nft_queue_shutdown(q);
    rc = pthread_join(th, &value);
    assert(0      == rc);
    assert(EINVAL == (int) value);
    assert(0      == q->nwait);

    nft_queue_destroy(q);

    fprintf(stderr, "passed.\n");
}


/*
 * t5 - Test queue shutdown during pop.
 */
static void
t5( void)
{
    pthread_t     th;
    nft_queue_t * q;
    void	* value;
    int           rc;

    fprintf(stderr, "t5(pop/shutdown): ");

    q = nft_queue_create(1, NULL);

    /* The queue is empty, so the thread will block.
     */
    pthread_create(&th, 0, pop_thread, q);
    sleep(1);
    assert(1 == q->nwait);

    /* Shut down the queue, and join the pop_thread.
     */
    nft_queue_shutdown(q);
    rc = pthread_join(th, &value);
    assert(0    == rc);
    assert(NULL == (int) value);
    assert(0    == q->nwait);

    nft_queue_destroy(q);

    fprintf(stderr, "passed.\n");
}


/*
 * t6 - Test cancellation during append.
 */
static void
t6( void)
{
#ifndef WIN32	/* no pthread_cancel() on WIN32 */
    pthread_t     th;
    nft_queue_t * q;

    fprintf(stderr, "t6 (append/cancel): ");

    q = nft_queue_create(1, NULL);

    /* The queue limit is one, so the thread will block.
     */
    nft_queue_append(q, "first");
    pthread_create(&th, 0, append_thread, q);
    sleep(1);
    assert(q->nwait == 1);
    pthread_cancel(th);
    sleep(1);
    assert(q->nwait == 0);
    assert(strcmp("first", (char*) nft_queue_pop(q)) == 0);

    nft_queue_destroy(q);

    fprintf(stderr, "passed.\n");
#endif /* WIN32 */
}


/*
 * t7 - Test cancellation during pop.
 */
static void
t7( void)
{
#ifndef WIN32	/* no pthread_cancel() on WIN32 */
    pthread_t     th;
    nft_queue_t * q;

    fprintf(stderr, "t7(pop/cancel): ");

    q = nft_queue_create(1, NULL);

    /* The queue is empty, so the thread will block.
     */
    pthread_create(&th, 0, pop_thread, q);
    sleep(1);
    assert(q->nwait == 1);
    pthread_cancel(th);
    sleep(1);
    assert(q->nwait == 0);
    assert(nft_queue_pop_wait(q, 0) == NULL);

    nft_queue_destroy(q);

    fprintf(stderr, "passed.\n");
#endif /* WIN32 */
}

#endif	/* MAIN */
