Logo Search packages:      
Sourcecode: virtualbox-ose version File versions  Download package

prtpool.c

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* ***** BEGIN LICENSE BLOCK *****
 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
 *
 * The contents of this file are subject to the Mozilla Public License Version
 * 1.1 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * The Original Code is the Netscape Portable Runtime (NSPR).
 *
 * The Initial Developer of the Original Code is
 * Netscape Communications Corporation.
 * Portions created by the Initial Developer are Copyright (C) 1999-2000
 * the Initial Developer. All Rights Reserved.
 *
 * Contributor(s):
 *
 * Alternatively, the contents of this file may be used under the terms of
 * either the GNU General Public License Version 2 or later (the "GPL"), or
 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
 * in which case the provisions of the GPL or the LGPL are applicable instead
 * of those above. If you wish to allow use of your version of this file only
 * under the terms of either the GPL or the LGPL, and not to allow others to
 * use your version of this file under the terms of the MPL, indicate your
 * decision by deleting the provisions above and replace them with the notice
 * and other provisions required by the GPL or the LGPL. If you do not delete
 * the provisions above, a recipient may use your version of this file under
 * the terms of any one of the MPL, the GPL or the LGPL.
 *
 * ***** END LICENSE BLOCK ***** */

#include "nspr.h"

/*
 * Thread pools
 *    Thread pools create and manage threads to provide support for
 *    scheduling jobs onto one or more threads.
 *
 */
#ifdef OPT_WINNT
#include <windows.h>
#endif

/*
 * worker thread
 */
typedef struct wthread {
      PRCList           links;
      PRThread    *thread;
} wthread;

/*
 * queue of timer jobs
 */
typedef struct timer_jobq {
      PRCList           list;
      PRLock            *lock;
      PRCondVar   *cv;
      PRInt32           cnt;
      PRCList     wthreads;
} timer_jobq;

/*
 * queue of jobs
 */
typedef struct tp_jobq {
      PRCList           list;
      PRInt32           cnt;
      PRLock            *lock;
      PRCondVar   *cv;
      PRCList     wthreads;
#ifdef OPT_WINNT
      HANDLE            nt_completion_port;
#endif
} tp_jobq;

/*
 * queue of IO jobs
 */
typedef struct io_jobq {
      PRCList           list;
      PRPollDesc  *pollfds;
      PRInt32     npollfds;
      PRJob       **polljobs;
      PRLock            *lock;
      PRInt32           cnt;
      PRFileDesc  *notify_fd;
      PRCList     wthreads;
} io_jobq;

/*
 * Threadpool
 */
struct PRThreadPool {
      PRInt32           init_threads;
      PRInt32           max_threads;
      PRInt32           current_threads;
      PRInt32           idle_threads;
      PRUint32    stacksize;
      tp_jobq           jobq;
      io_jobq           ioq;
      timer_jobq  timerq;
      PRLock            *join_lock;       /* used with jobp->join_cv */
      PRCondVar   *shutdown_cv;
      PRBool            shutdown;
};

typedef enum io_op_type
      { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;

#ifdef OPT_WINNT
typedef struct NT_notifier {
      OVERLAPPED overlapped;        /* must be first */
      PRJob *jobp;
} NT_notifier;
#endif

struct PRJob {
      PRCList                 links;            /*    for linking jobs */
      PRBool                  on_ioq;           /* job on ioq */
      PRBool                  on_timerq;  /* job on timerq */
      PRJobFn                 job_func;
      void              *job_arg;
      PRCondVar         *join_cv;
      PRBool                  join_wait;  /* == PR_TRUE, when waiting to join */
      PRCondVar         *cancel_cv; /* for cancelling IO jobs */
      PRBool                  cancel_io;  /* for cancelling IO jobs */
      PRThreadPool      *tpool;           /* back pointer to thread pool */
      PRJobIoDesc       *iod;
      io_op_type        io_op;
      PRInt16                 io_poll_flags;
      PRNetAddr         *netaddr;
      PRIntervalTime    timeout;    /* relative value */
      PRIntervalTime    absolute;
#ifdef OPT_WINNT
      NT_notifier       nt_notifier;      
#endif
};

#define JOB_LINKS_PTR(_qp) \
    ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))

#define WTHREAD_LINKS_PTR(_qp) \
    ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))

#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)

#define JOIN_NOTIFY(_jobp)                                              \
                        PR_BEGIN_MACRO                                        \
                        PR_Lock(_jobp->tpool->join_lock);         \
                        _jobp->join_wait = PR_FALSE;              \
                        PR_NotifyCondVar(_jobp->join_cv);         \
                        PR_Unlock(_jobp->tpool->join_lock);       \
                        PR_END_MACRO

#define CANCEL_IO_JOB(jobp)                                             \
                        PR_BEGIN_MACRO                                        \
                        jobp->cancel_io = PR_FALSE;                     \
                        jobp->on_ioq = PR_FALSE;                        \
                        PR_REMOVE_AND_INIT_LINK(&jobp->links);    \
                        tp->ioq.cnt--;                                        \
                        PR_NotifyCondVar(jobp->cancel_cv);        \
                        PR_END_MACRO

static void delete_job(PRJob *jobp);
static PRThreadPool * alloc_threadpool(void);
static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
static void notify_ioq(PRThreadPool *tp);
static void notify_timerq(PRThreadPool *tp);

/*
 * locks are acquired in the following order
 *
 *    tp->ioq.lock,tp->timerq.lock
 *                |
 *                V
 *          tp->jobq->lock          
 */

/*
 * worker thread function
 */
static void wstart(void *arg)
{
PRThreadPool *tp = (PRThreadPool *) arg;
PRCList *head;

      /*
       * execute jobs until shutdown
       */
      while (!tp->shutdown) {
            PRJob *jobp;
#ifdef OPT_WINNT
            BOOL rv;
            DWORD unused, shutdown;
            LPOVERLAPPED olp;

            PR_Lock(tp->jobq.lock);
            tp->idle_threads++;
            PR_Unlock(tp->jobq.lock);
            rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
                              &unused, &shutdown, &olp, INFINITE);
            
            PR_ASSERT(rv);
            if (shutdown)
                  break;
            jobp = ((NT_notifier *) olp)->jobp;
            PR_Lock(tp->jobq.lock);
            tp->idle_threads--;
            tp->jobq.cnt--;
            PR_Unlock(tp->jobq.lock);
#else

            PR_Lock(tp->jobq.lock);
            while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
                  tp->idle_threads++;
                  PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
                  tp->idle_threads--;
            }     
            if (tp->shutdown) {
                  PR_Unlock(tp->jobq.lock);
                  break;
            }
            head = PR_LIST_HEAD(&tp->jobq.list);
            /*
             * remove job from queue
             */
            PR_REMOVE_AND_INIT_LINK(head);
            tp->jobq.cnt--;
            jobp = JOB_LINKS_PTR(head);
            PR_Unlock(tp->jobq.lock);
#endif

            jobp->job_func(jobp->job_arg);
            if (!JOINABLE_JOB(jobp)) {
                  delete_job(jobp);
            } else {
                  JOIN_NOTIFY(jobp);
            }
      }
      PR_Lock(tp->jobq.lock);
      tp->current_threads--;
      PR_Unlock(tp->jobq.lock);
}

/*
 * add a job to the work queue
 */
static void
add_to_jobq(PRThreadPool *tp, PRJob *jobp)
{
      /*
       * add to jobq
       */
#ifdef OPT_WINNT
      PR_Lock(tp->jobq.lock);
      tp->jobq.cnt++;
      PR_Unlock(tp->jobq.lock);
      /*
       * notify worker thread(s)
       */
      PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
            FALSE, &jobp->nt_notifier.overlapped);
#else
      PR_Lock(tp->jobq.lock);
      PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
      tp->jobq.cnt++;
      if ((tp->idle_threads < tp->jobq.cnt) &&
                              (tp->current_threads < tp->max_threads)) {
            wthread *wthrp;
            /*
             * increment thread count and unlock the jobq lock
             */
            tp->current_threads++;
            PR_Unlock(tp->jobq.lock);
            /* create new worker thread */
            wthrp = PR_NEWZAP(wthread);
            if (wthrp) {
                  wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
                                    tp, PR_PRIORITY_NORMAL,
                                    PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
                  if (NULL == wthrp->thread) {
                        PR_DELETE(wthrp);  /* this sets wthrp to NULL */
                  }
            }
            PR_Lock(tp->jobq.lock);
            if (NULL == wthrp) {
                  tp->current_threads--;
            } else {
                  PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
            }
      }
      /*
       * wakeup a worker thread
       */
      PR_NotifyCondVar(tp->jobq.cv);
      PR_Unlock(tp->jobq.lock);
#endif
}

/*
 * io worker thread function
 */
static void io_wstart(void *arg)
{
PRThreadPool *tp = (PRThreadPool *) arg;
int pollfd_cnt, pollfds_used;
int rv;
PRCList *qp;
PRPollDesc *pollfds;
PRJob **polljobs;
int poll_timeout;
PRIntervalTime now;

      /*
       * scan io_jobq
       * construct poll list
       * call PR_Poll
       * for all fds, for which poll returns true, move the job to
       * jobq and wakeup worker thread.
       */
      while (!tp->shutdown) {
            PRJob *jobp;

            pollfd_cnt = tp->ioq.cnt + 10;
            if (pollfd_cnt > tp->ioq.npollfds) {

                  /*
                   * re-allocate pollfd array if the current one is not large
                   * enough
                   */
                  if (NULL != tp->ioq.pollfds)
                        PR_Free(tp->ioq.pollfds);
                  tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
                                    (sizeof(PRPollDesc) + sizeof(PRJob *)));
                  PR_ASSERT(NULL != tp->ioq.pollfds);
                  /*
                   * array of pollfds
                   */
                  pollfds = tp->ioq.pollfds;
                  tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
                  /*
                   * parallel array of jobs
                   */
                  polljobs = tp->ioq.polljobs;
                  tp->ioq.npollfds = pollfd_cnt;
            }

            pollfds_used = 0;
            /*
             * add the notify fd; used for unblocking io thread(s)
             */
            pollfds[pollfds_used].fd = tp->ioq.notify_fd;
            pollfds[pollfds_used].in_flags = PR_POLL_READ;
            pollfds[pollfds_used].out_flags = 0;
            polljobs[pollfds_used] = NULL;
            pollfds_used++;
            /*
             * fill in the pollfd array
             */
            PR_Lock(tp->ioq.lock);
            for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) {
                  jobp = JOB_LINKS_PTR(qp);
                  if (jobp->cancel_io) {
                        CANCEL_IO_JOB(jobp);
                        continue;
                  }
                  if (pollfds_used == (pollfd_cnt))
                        break;
                  pollfds[pollfds_used].fd = jobp->iod->socket;
                  pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
                  pollfds[pollfds_used].out_flags = 0;
                  polljobs[pollfds_used] = jobp;

                  pollfds_used++;
            }
            if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
                  qp = tp->ioq.list.next;
                  jobp = JOB_LINKS_PTR(qp);
                  if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
                        poll_timeout = PR_INTERVAL_NO_TIMEOUT;
                  else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
                        poll_timeout = PR_INTERVAL_NO_WAIT;
                  else {
                        poll_timeout = jobp->absolute - PR_IntervalNow();
                        if (poll_timeout <= 0) /* already timed out */
                              poll_timeout = PR_INTERVAL_NO_WAIT;
                  }
            } else {
                  poll_timeout = PR_INTERVAL_NO_TIMEOUT;
            }
            PR_Unlock(tp->ioq.lock);

            /*
             * XXXX
             * should retry if more jobs have been added to the queue?
             *
             */
            PR_ASSERT(pollfds_used <= pollfd_cnt);
            rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);

            if (tp->shutdown) {
                  break;
            }

            if (rv > 0) {
                  /*
                   * at least one io event is set
                   */
                  PRStatus rval_status;
                  PRInt32 index;

                  PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
                  /*
                   * reset the pollable event, if notified
                   */
                  if (pollfds[0].out_flags & PR_POLL_READ) {
                        rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
                        PR_ASSERT(PR_SUCCESS == rval_status);
                  }

                  for(index = 1; index < (pollfds_used); index++) {
                PRInt16 events = pollfds[index].in_flags;
                PRInt16 revents = pollfds[index].out_flags; 
                        jobp = polljobs[index]; 

                if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
                  (revents & PR_POLL_ERR) ||
                              ((events & PR_POLL_WRITE) &&
                                          (revents & PR_POLL_HUP))) { /* write op & hup */
                              PR_Lock(tp->ioq.lock);
                              if (jobp->cancel_io) {
                                    CANCEL_IO_JOB(jobp);
                                    PR_Unlock(tp->ioq.lock);
                                    continue;
                              }
                              PR_REMOVE_AND_INIT_LINK(&jobp->links);
                              tp->ioq.cnt--;
                              jobp->on_ioq = PR_FALSE;
                              PR_Unlock(tp->ioq.lock);

                              /* set error */
                    if (PR_POLL_NVAL & revents)
                                    jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
                    else if (PR_POLL_HUP & revents)
                                    jobp->iod->error = PR_CONNECT_RESET_ERROR;
                    else 
                                    jobp->iod->error = PR_IO_ERROR;

                              /*
                               * add to jobq
                               */
                              add_to_jobq(tp, jobp);
                        } else if (revents) {
                              /*
                               * add to jobq
                               */
                              PR_Lock(tp->ioq.lock);
                              if (jobp->cancel_io) {
                                    CANCEL_IO_JOB(jobp);
                                    PR_Unlock(tp->ioq.lock);
                                    continue;
                              }
                              PR_REMOVE_AND_INIT_LINK(&jobp->links);
                              tp->ioq.cnt--;
                              jobp->on_ioq = PR_FALSE;
                              PR_Unlock(tp->ioq.lock);

                              if (jobp->io_op == JOB_IO_CONNECT) {
                                    if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
                                          jobp->iod->error = 0;
                                    else
                                          jobp->iod->error = PR_GetError();
                              } else
                                    jobp->iod->error = 0;

                              add_to_jobq(tp, jobp);
                        }
                  }
            }
            /*
             * timeout processing
             */
            now = PR_IntervalNow();
            PR_Lock(tp->ioq.lock);
            for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) {
                  jobp = JOB_LINKS_PTR(qp);
                  if (jobp->cancel_io) {
                        CANCEL_IO_JOB(jobp);
                        continue;
                  }
                  if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
                        break;
                  if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
                                                ((PRInt32)(jobp->absolute - now) > 0))
                        break;
                  PR_REMOVE_AND_INIT_LINK(&jobp->links);
                  tp->ioq.cnt--;
                  jobp->on_ioq = PR_FALSE;
                  jobp->iod->error = PR_IO_TIMEOUT_ERROR;
                  add_to_jobq(tp, jobp);
            }
            PR_Unlock(tp->ioq.lock);
      }
}

/*
 * timer worker thread function
 */
static void timer_wstart(void *arg)
{
PRThreadPool *tp = (PRThreadPool *) arg;
PRCList *qp;
PRIntervalTime timeout;
PRIntervalTime now;

      /*
       * call PR_WaitCondVar with minimum value of all timeouts
       */
      while (!tp->shutdown) {
            PRJob *jobp;

            PR_Lock(tp->timerq.lock);
            if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
                  timeout = PR_INTERVAL_NO_TIMEOUT;
            } else {
                  PRCList *qp;

                  qp = tp->timerq.list.next;
                  jobp = JOB_LINKS_PTR(qp);

                  timeout = jobp->absolute - PR_IntervalNow();
            if (timeout <= 0)
                        timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
            }
            if (PR_INTERVAL_NO_WAIT != timeout)
                  PR_WaitCondVar(tp->timerq.cv, timeout);
            if (tp->shutdown) {
                  PR_Unlock(tp->timerq.lock);
                  break;
            }
            /*
             * move expired-timer jobs to jobq
             */
            now = PR_IntervalNow(); 
            while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
                  qp = tp->timerq.list.next;
                  jobp = JOB_LINKS_PTR(qp);

                  if ((PRInt32)(jobp->absolute - now) > 0) {
                        break;
                  }
                  /*
                   * job timed out
                   */
                  PR_REMOVE_AND_INIT_LINK(&jobp->links);
                  tp->timerq.cnt--;
                  jobp->on_timerq = PR_FALSE;
                  add_to_jobq(tp, jobp);
            }
            PR_Unlock(tp->timerq.lock);
      }
}

static void
delete_threadpool(PRThreadPool *tp)
{
      if (NULL != tp) {
            if (NULL != tp->shutdown_cv)
                  PR_DestroyCondVar(tp->shutdown_cv);
            if (NULL != tp->jobq.cv)
                  PR_DestroyCondVar(tp->jobq.cv);
            if (NULL != tp->jobq.lock)
                  PR_DestroyLock(tp->jobq.lock);
            if (NULL != tp->join_lock)
                  PR_DestroyLock(tp->join_lock);
#ifdef OPT_WINNT
            if (NULL != tp->jobq.nt_completion_port)
                  CloseHandle(tp->jobq.nt_completion_port);
#endif
            /* Timer queue */
            if (NULL != tp->timerq.cv)
                  PR_DestroyCondVar(tp->timerq.cv);
            if (NULL != tp->timerq.lock)
                  PR_DestroyLock(tp->timerq.lock);

            if (NULL != tp->ioq.lock)
                  PR_DestroyLock(tp->ioq.lock);
            if (NULL != tp->ioq.pollfds)
                  PR_Free(tp->ioq.pollfds);
            if (NULL != tp->ioq.notify_fd)
                  PR_DestroyPollableEvent(tp->ioq.notify_fd);
            PR_Free(tp);
      }
      return;
}

static PRThreadPool *
alloc_threadpool(void)
{
PRThreadPool *tp;

      tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
      if (NULL == tp)
            goto failed;
      tp->jobq.lock = PR_NewLock();
      if (NULL == tp->jobq.lock)
            goto failed;
      tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
      if (NULL == tp->jobq.cv)
            goto failed;
      tp->join_lock = PR_NewLock();
      if (NULL == tp->join_lock)
            goto failed;
#ifdef OPT_WINNT
      tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                                      NULL, 0, 0);
      if (NULL == tp->jobq.nt_completion_port)
            goto failed;
#endif

      tp->ioq.lock = PR_NewLock();
      if (NULL == tp->ioq.lock)
            goto failed;

      /* Timer queue */

      tp->timerq.lock = PR_NewLock();
      if (NULL == tp->timerq.lock)
            goto failed;
      tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
      if (NULL == tp->timerq.cv)
            goto failed;

      tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
      if (NULL == tp->shutdown_cv)
            goto failed;
      tp->ioq.notify_fd = PR_NewPollableEvent();
      if (NULL == tp->ioq.notify_fd)
            goto failed;
      return tp;
failed:
      delete_threadpool(tp);
      PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
      return NULL;
}

/* Create thread pool */
PR_IMPLEMENT(PRThreadPool *)
PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
                                PRUint32 stacksize)
{
PRThreadPool *tp;
PRThread *thr;
int i;
wthread *wthrp;

      tp = alloc_threadpool();
      if (NULL == tp)
            return NULL;

      tp->init_threads = initial_threads;
      tp->max_threads = max_threads;
      tp->stacksize = stacksize;
      PR_INIT_CLIST(&tp->jobq.list);
      PR_INIT_CLIST(&tp->ioq.list);
      PR_INIT_CLIST(&tp->timerq.list);
      PR_INIT_CLIST(&tp->jobq.wthreads);
      PR_INIT_CLIST(&tp->ioq.wthreads);
      PR_INIT_CLIST(&tp->timerq.wthreads);
      tp->shutdown = PR_FALSE;

      PR_Lock(tp->jobq.lock);
      for(i=0; i < initial_threads; ++i) {

            thr = PR_CreateThread(PR_USER_THREAD, wstart,
                                    tp, PR_PRIORITY_NORMAL,
                                    PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
            PR_ASSERT(thr);
            wthrp = PR_NEWZAP(wthread);
            PR_ASSERT(wthrp);
            wthrp->thread = thr;
            PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
      }
      tp->current_threads = initial_threads;

      thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
                              tp, PR_PRIORITY_NORMAL,
                              PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
      PR_ASSERT(thr);
      wthrp = PR_NEWZAP(wthread);
      PR_ASSERT(wthrp);
      wthrp->thread = thr;
      PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);

      thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
                              tp, PR_PRIORITY_NORMAL,
                              PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
      PR_ASSERT(thr);
      wthrp = PR_NEWZAP(wthread);
      PR_ASSERT(wthrp);
      wthrp->thread = thr;
      PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);

      PR_Unlock(tp->jobq.lock);
      return tp;
}

static void
delete_job(PRJob *jobp)
{
      if (NULL != jobp) {
            if (NULL != jobp->join_cv) {
                  PR_DestroyCondVar(jobp->join_cv);
                  jobp->join_cv = NULL;
            }
            if (NULL != jobp->cancel_cv) {
                  PR_DestroyCondVar(jobp->cancel_cv);
                  jobp->cancel_cv = NULL;
            }
            PR_DELETE(jobp);
      }
}

static PRJob *
alloc_job(PRBool joinable, PRThreadPool *tp)
{
      PRJob *jobp;

      jobp = PR_NEWZAP(PRJob);
      if (NULL == jobp) 
            goto failed;
      if (joinable) {
            jobp->join_cv = PR_NewCondVar(tp->join_lock);
            jobp->join_wait = PR_TRUE;
            if (NULL == jobp->join_cv)
                  goto failed;
      } else {
            jobp->join_cv = NULL;
      }
#ifdef OPT_WINNT
      jobp->nt_notifier.jobp = jobp;
#endif
      return jobp;
failed:
      delete_job(jobp);
      PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
      return NULL;
}

/* queue a job */
PR_IMPLEMENT(PRJob *)
PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
{
      PRJob *jobp;

      jobp = alloc_job(joinable, tpool);
      if (NULL == jobp)
            return NULL;

      jobp->job_func = fn;
      jobp->job_arg = arg;
      jobp->tpool = tpool;

      add_to_jobq(tpool, jobp);
      return jobp;
}

/* queue a job, when a socket is readable or writeable */
static PRJob *
queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
                        PRBool joinable, io_op_type op)
{
      PRJob *jobp;
      PRIntervalTime now;

      jobp = alloc_job(joinable, tpool);
      if (NULL == jobp) {
            return NULL;
      }

      /*
       * Add a new job to io_jobq
       * wakeup io worker thread
       */

      jobp->job_func = fn;
      jobp->job_arg = arg;
      jobp->tpool = tpool;
      jobp->iod = iod;
      if (JOB_IO_READ == op) {
            jobp->io_op = JOB_IO_READ;
            jobp->io_poll_flags = PR_POLL_READ;
      } else if (JOB_IO_WRITE == op) {
            jobp->io_op = JOB_IO_WRITE;
            jobp->io_poll_flags = PR_POLL_WRITE;
      } else if (JOB_IO_ACCEPT == op) {
            jobp->io_op = JOB_IO_ACCEPT;
            jobp->io_poll_flags = PR_POLL_READ;
      } else if (JOB_IO_CONNECT == op) {
            jobp->io_op = JOB_IO_CONNECT;
            jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
      } else {
            delete_job(jobp);
            PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
            return NULL;
      }

      jobp->timeout = iod->timeout;
      if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
                  (PR_INTERVAL_NO_WAIT == iod->timeout)) {
            jobp->absolute = iod->timeout;
      } else {
            now = PR_IntervalNow();
            jobp->absolute = now + iod->timeout;
      }


      PR_Lock(tpool->ioq.lock);

      if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
                  (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
            PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
      } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
            PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
      } else {
            PRCList *qp;
            PRJob *tmp_jobp;
            /*
             * insert into the timeout-sorted ioq
             */
            for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
                                          qp = qp->prev) {
                  tmp_jobp = JOB_LINKS_PTR(qp);
                  if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
                        break;
                  }
            }
            PR_INSERT_AFTER(&jobp->links,qp);
      }

      jobp->on_ioq = PR_TRUE;
      tpool->ioq.cnt++;
      /*
       * notify io worker thread(s)
       */
      PR_Unlock(tpool->ioq.lock);
      notify_ioq(tpool);
      return jobp;
}

/* queue a job, when a socket is readable */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
                                                                  PRBool joinable)
{
      return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
}

/* queue a job, when a socket is writeable */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
                                                            PRBool joinable)
{
      return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
}


/* queue a job, when a socket has a pending connection */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
                                                void * arg, PRBool joinable)
{
      return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
}

/* queue a job, when a socket can be connected */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
                  const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
{
      PRStatus rv;
      PRErrorCode err;

      rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
      if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
            /* connection pending */
            return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
      } else {
            /*
             * connection succeeded or failed; add to jobq right away
             */
            if (rv == PR_FAILURE)
                  iod->error = err;
            else
                  iod->error = 0;
            return(PR_QueueJob(tpool, fn, arg, joinable));
      }
}

/* queue a job, when a timer expires */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
                                          PRJobFn fn, void * arg, PRBool joinable)
{
      PRIntervalTime now;
      PRJob *jobp;

      if (PR_INTERVAL_NO_TIMEOUT == timeout) {
            PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
            return NULL;
      }
      if (PR_INTERVAL_NO_WAIT == timeout) {
            /*
             * no waiting; add to jobq right away
             */
            return(PR_QueueJob(tpool, fn, arg, joinable));
      }
      jobp = alloc_job(joinable, tpool);
      if (NULL == jobp) {
            return NULL;
      }

      /*
       * Add a new job to timer_jobq
       * wakeup timer worker thread
       */

      jobp->job_func = fn;
      jobp->job_arg = arg;
      jobp->tpool = tpool;
      jobp->timeout = timeout;

      now = PR_IntervalNow();
      jobp->absolute = now + timeout;


      PR_Lock(tpool->timerq.lock);
      jobp->on_timerq = PR_TRUE;
      if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
            PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
      else {
            PRCList *qp;
            PRJob *tmp_jobp;
            /*
             * insert into the sorted timer jobq
             */
            for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
                                          qp = qp->prev) {
                  tmp_jobp = JOB_LINKS_PTR(qp);
                  if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
                        break;
                  }
            }
            PR_INSERT_AFTER(&jobp->links,qp);
      }
      tpool->timerq.cnt++;
      /*
       * notify timer worker thread(s)
       */
      notify_timerq(tpool);
      PR_Unlock(tpool->timerq.lock);
      return jobp;
}

static void
notify_timerq(PRThreadPool *tp)
{
      /*
       * wakeup the timer thread(s)
       */
      PR_NotifyCondVar(tp->timerq.cv);
}

static void
notify_ioq(PRThreadPool *tp)
{
PRStatus rval_status;

      /*
       * wakeup the io thread(s)
       */
      rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
      PR_ASSERT(PR_SUCCESS == rval_status);
}

/*
 * cancel a job
 *
 *    XXXX: is this needed? likely to be removed
 */
PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob *jobp) {

      PRStatus rval = PR_FAILURE;
      PRThreadPool *tp;

      if (jobp->on_timerq) {
            /*
             * now, check again while holding the timerq lock
             */
            tp = jobp->tpool;
            PR_Lock(tp->timerq.lock);
            if (jobp->on_timerq) {
                  jobp->on_timerq = PR_FALSE;
                  PR_REMOVE_AND_INIT_LINK(&jobp->links);
                  tp->timerq.cnt--;
                  PR_Unlock(tp->timerq.lock);
                  if (!JOINABLE_JOB(jobp)) {
                        delete_job(jobp);
                  } else {
                        JOIN_NOTIFY(jobp);
                  }
                  rval = PR_SUCCESS;
            } else
                  PR_Unlock(tp->timerq.lock);
      } else if (jobp->on_ioq) {
            /*
             * now, check again while holding the ioq lock
             */
            tp = jobp->tpool;
            PR_Lock(tp->ioq.lock);
            if (jobp->on_ioq) {
                  jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
                  if (NULL == jobp->cancel_cv) {
                        PR_Unlock(tp->ioq.lock);
                        PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
                        return PR_FAILURE;
                  }
                  /*
                   * mark job 'cancelled' and notify io thread(s)
                   * XXXX:
                   *          this assumes there is only one io thread; when there
                   *          are multiple threads, the io thread processing this job
                   *          must be notified.
                   */
                  jobp->cancel_io = PR_TRUE;
                  PR_Unlock(tp->ioq.lock);      /* release, reacquire ioq lock */
                  notify_ioq(tp);
                  PR_Lock(tp->ioq.lock);
                  while (jobp->cancel_io)
                        PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
                  PR_Unlock(tp->ioq.lock);
                  PR_ASSERT(!jobp->on_ioq);
                  if (!JOINABLE_JOB(jobp)) {
                        delete_job(jobp);
                  } else {
                        JOIN_NOTIFY(jobp);
                  }
                  rval = PR_SUCCESS;
            } else
                  PR_Unlock(tp->ioq.lock);
      }
      if (PR_FAILURE == rval)
            PR_SetError(PR_INVALID_STATE_ERROR, 0);
      return rval;
}

/* join a job, wait until completion */
PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob *jobp)
{
      if (!JOINABLE_JOB(jobp)) {
            PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
            return PR_FAILURE;
      }
      PR_Lock(jobp->tpool->join_lock);
      while(jobp->join_wait)
            PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
      PR_Unlock(jobp->tpool->join_lock);
      delete_job(jobp);
      return PR_SUCCESS;
}

/* shutdown threadpool */
PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool *tpool)
{
PRStatus rval = PR_SUCCESS;

      PR_Lock(tpool->jobq.lock);
      tpool->shutdown = PR_TRUE;
      PR_NotifyAllCondVar(tpool->shutdown_cv);
      PR_Unlock(tpool->jobq.lock);

      return rval;
}

/*
 * join thread pool
 *    wait for termination of worker threads
 *    reclaim threadpool resources
 */
PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool *tpool)
{
PRStatus rval = PR_SUCCESS;
PRCList *head;
PRStatus rval_status;

      PR_Lock(tpool->jobq.lock);
      while (!tpool->shutdown)
            PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);

      /*
       * wakeup worker threads
       */
#ifdef OPT_WINNT
      /*
       * post shutdown notification for all threads
       */
      {
            int i;
            for(i=0; i < tpool->current_threads; i++) {
                  PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
                                                                        TRUE, NULL);
            }
      }
#else
      PR_NotifyAllCondVar(tpool->jobq.cv);
#endif

      /*
       * wakeup io thread(s)
       */
      notify_ioq(tpool);

      /*
       * wakeup timer thread(s)
       */
      PR_Lock(tpool->timerq.lock);
      notify_timerq(tpool);
      PR_Unlock(tpool->timerq.lock);

      while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
            wthread *wthrp;

            head = PR_LIST_HEAD(&tpool->jobq.wthreads);
            PR_REMOVE_AND_INIT_LINK(head);
            PR_Unlock(tpool->jobq.lock);
            wthrp = WTHREAD_LINKS_PTR(head);
            rval_status = PR_JoinThread(wthrp->thread);
            PR_ASSERT(PR_SUCCESS == rval_status);
            PR_DELETE(wthrp);
            PR_Lock(tpool->jobq.lock);
      }
      PR_Unlock(tpool->jobq.lock);
      while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
            wthread *wthrp;

            head = PR_LIST_HEAD(&tpool->ioq.wthreads);
            PR_REMOVE_AND_INIT_LINK(head);
            wthrp = WTHREAD_LINKS_PTR(head);
            rval_status = PR_JoinThread(wthrp->thread);
            PR_ASSERT(PR_SUCCESS == rval_status);
            PR_DELETE(wthrp);
      }

      while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
            wthread *wthrp;

            head = PR_LIST_HEAD(&tpool->timerq.wthreads);
            PR_REMOVE_AND_INIT_LINK(head);
            wthrp = WTHREAD_LINKS_PTR(head);
            rval_status = PR_JoinThread(wthrp->thread);
            PR_ASSERT(PR_SUCCESS == rval_status);
            PR_DELETE(wthrp);
      }

      /*
       * Delete queued jobs
       */
      while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
            PRJob *jobp;

            head = PR_LIST_HEAD(&tpool->jobq.list);
            PR_REMOVE_AND_INIT_LINK(head);
            jobp = JOB_LINKS_PTR(head);
            tpool->jobq.cnt--;
            delete_job(jobp);
      }

      /* delete io jobs */
      while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
            PRJob *jobp;

            head = PR_LIST_HEAD(&tpool->ioq.list);
            PR_REMOVE_AND_INIT_LINK(head);
            tpool->ioq.cnt--;
            jobp = JOB_LINKS_PTR(head);
            delete_job(jobp);
      }

      /* delete timer jobs */
      while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
            PRJob *jobp;

            head = PR_LIST_HEAD(&tpool->timerq.list);
            PR_REMOVE_AND_INIT_LINK(head);
            tpool->timerq.cnt--;
            jobp = JOB_LINKS_PTR(head);
            delete_job(jobp);
      }

      PR_ASSERT(0 == tpool->jobq.cnt);
      PR_ASSERT(0 == tpool->ioq.cnt);
      PR_ASSERT(0 == tpool->timerq.cnt);

      delete_threadpool(tpool);
      return rval;
}

Generated by  Doxygen 1.6.0   Back to index