/** Java NIO  - Ron Hitchens, O'Reilly Media*/

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.LinkedList;
import java.io.IOException;

/**
 * Specialization of the SelectSockets class which uses a thread pool
 * to service channels.  The thread pool is an ad-hoc implementation
 * quicky lashed togther in a few hours for demonstration purposes.
 * It's definitely not production quality.
 *
 * @author Ron Hitchens (ron@ronsoft.com)
 */
public class SelectSocketsThreadPool extends SelectSockets
{
    private static final int MAX_THREADS = 5;

    private ThreadPool pool = new ThreadPool (MAX_THREADS);

    // -------------------------------------------------------------

    public static void main (String [] argv)
        throws Exception
    {
        new SelectSocketsThreadPool(  ).go (argv);
    }

    // -------------------------------------------------------------

    /**
     * Sample data handler method for a channel with data ready to read.
     * This method is invoked from the go(  ) method in the parent class.
     * This handler delegates to a worker thread in a thread pool to
     * service the channel, then returns immediately.
     * @param key A SelectionKey object representing a channel
     *  determined by the selector to be ready for reading.  If the
     *  channel returns an EOF condition, it is closed here, which
     *  automatically invalidates the associated key.  The selector
     *  will then de-register the channel on the next select call.
     */
    protected void readDataFromSocket (SelectionKey key)
        throws Exception
    {
        WorkerThread worker = pool.getWorker(  );

        if (worker == null) {
            // No threads available. Do nothing. The selection
            // loop will keep calling this method until a
            // thread becomes available.  This design could
            // be improved.
            return;    
        }

        // Invoking this wakes up the worker thread, then returns
        worker.serviceChannel (key);
    }

    // ---------------------------------------------------------------

    /**
     * A very simple thread pool class.  The pool size is set at
     * construction time and remains fixed.  Threads are cycled
     * through a FIFO idle queue.
     */
    private class ThreadPool
    {
        List<Thread> idle = new LinkedList<Thread>(  );

        ThreadPool (int poolSize)
        {
            // Fill up the pool with worker threads
            for (int i = 0; i < poolSize; i++) {
                WorkerThread thread = new WorkerThread (this);

                // Set thread name for debugging. Start it.
                thread.setName ("Worker" + (i + 1));
                thread.start(  );

                idle.add (thread);
            }
        }

        /**
         * Find an idle worker thread, if any.  Could return null.
         */
        WorkerThread getWorker(  )
        {
            WorkerThread worker = null;

            synchronized (idle) {
                if (idle.size(  ) > 0) {
                    worker = (WorkerThread) idle.remove (0);
                }
            }

            return (worker);
        }

        /**
         * Called by the worker thread to return itself to the
         * idle pool.
         */
        void returnWorker (WorkerThread worker)
        {
            synchronized (idle) {
                idle.add (worker);
            }
        }
    }

    /**
     * A worker thread class which can drain channels and echo-back
     * the input.  Each instance is constructed with a reference to
     * the owning thread pool object. When started, the thread loops
     * forever waiting to be awakened to service the channel associated
     * with a SelectionKey object.
     * The worker is tasked by calling its serviceChannel(  ) method
     * with a SelectionKey object.  The serviceChannel(  ) method stores
     * the key reference in the thread object then calls notify(  )
     * to wake it up.  When the channel has been drained, the worker
     * thread returns itself to its parent pool.
     */
    private class WorkerThread extends Thread
    {
        private ByteBuffer buffer = ByteBuffer.allocate (1024);
        private ThreadPool pool;
        private SelectionKey key;

        WorkerThread (ThreadPool pool)
        {
            this.pool = pool;
        }

        // Loop forever waiting for work to do
        public synchronized void run(  )
        {
            System.out.println (this.getName(  ) + " is ready");

            while (true) {
                try {
                    // Sleep and release object lock
                    this.wait(  );
                } catch (InterruptedException e) {
                    e.printStackTrace(  );
                    // Clear interrupt status
                    this.interrupted(  );
                }

                if (key == null) {
                    continue;    // just in case
                }

                System.out.println (this.getName(  )
                    + " has been awakened");

                try {
                    drainChannel (key);
                } catch (Exception e) {
                    System.out.println ("Caught '"
                        + e + "' closing channel");

                    // Close channel and nudge selector
                    try {
                        key.channel().close(  );
                    } catch (IOException ex) {
                        ex.printStackTrace(  );
                    }

                    key.selector().wakeup(  );
                }

                key = null;

                // Done. Ready for more. Return to pool
                this.pool.returnWorker (this);
            }
        }

        /**
         * Called to initiate a unit of work by this worker thread
         * on the provided SelectionKey object.  This method is
         * synchronized, as is the run(  ) method, so only one key
         * can be serviced at a given time.
         * Before waking the worker thread, and before returning
         * to the main selection loop, this key's interest set is
         * updated to remove OP_READ.  This will cause the selector
         * to ignore read-readiness for this channel while the
         * worker thread is servicing it.
         */
        synchronized void serviceChannel (SelectionKey key)
        {
            this.key = key;

            key.interestOps (key.interestOps(  ) & 
(~SelectionKey.OP_READ));

            this.notify(  );        // Awaken the thread
        }

        /**
         * The actual code which drains the channel associated with
         * the given key.  This method assumes the key has been
         * modified prior to invocation to turn off selection
         * interest in OP_READ.  When this method completes it
         * re-enables OP_READ and calls wakeup(  ) on the selector
         * so the selector will resume watching this channel.
         */
        void drainChannel (SelectionKey key)
            throws Exception
        {
            SocketChannel channel = (SocketChannel) key.channel(  );
            int count;

            buffer.clear(  );            // Empty buffer

            // Loop while data is available; channel is nonblocking
            while ((count = channel.read (buffer)) > 0) {
                buffer.flip(  );         // make buffer readable

                // Send the data; may not go all at once
                while (buffer.hasRemaining(  )) {
                    channel.write (buffer);
                }
                // WARNING: the above loop is evil.
                // See comments in superclass.

                buffer.clear(  );        // Empty buffer
            }

            if (count < 0) {
                // Close channel on EOF; invalidates the key
                channel.close(  );
                return;
            }

            // Resume interest in OP_READ
            key.interestOps (key.interestOps(  ) | SelectionKey.OP_READ);

            // Cycle the selector so this key is active again
            key.selector().wakeup(  );
        }
    }
}

					  

