ThreadPool.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/synch/ThreadPool.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/synch/ThreadPool.h"
00026 #include "fennel/synch/Thread.h"
00027 #include "fennel/synch/ThreadTracker.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/synch/ThreadPool.cpp#11 $");
00030 
00034 class PooledThread : public Thread
00035 {
00036     ThreadPoolBase &pool;
00037 
00038 public:
00039     explicit PooledThread(ThreadPoolBase &poolInit)
00040         : pool(poolInit)
00041     {
00042     }
00043 
00044     virtual void run()
00045     {
00046         pool.runPooledThread();
00047     }
00048 };
00049 
00050 ThreadPoolBase::ThreadPoolBase()
00051 {
00052     state = STATE_STOPPED;
00053 }
00054 
00055 ThreadPoolBase::~ThreadPoolBase()
00056 {
00057     assert(state == STATE_STOPPED);
00058 }
00059 
00060 void ThreadPoolBase::start(uint nThreads)
00061 {
00062     StrictMutexGuard guard(mutex);
00063     assert(state == STATE_STOPPED);
00064     assert(nThreads > 0);
00065     state = STATE_STARTED;
00066     for (uint i = 0; i < nThreads; ++i) {
00067         PooledThread *pThread = new PooledThread(*this);
00068         pThread->start();
00069         threads.push_back(pThread);
00070     }
00071 }
00072 
00073 void ThreadPoolBase::stop()
00074 {
00075     StrictMutexGuard guard(mutex);
00076     assert(state != STATE_STOPPING);
00077     if (state == STATE_STOPPED) {
00078         return;
00079     }
00080     state = STATE_STOPPING;
00081 
00082     while (!isQueueEmpty()) {
00083         stoppingCondition.wait(guard);
00084     }
00085 
00086     state = STATE_STOPPED;
00087     condition.notify_all();
00088     guard.unlock();
00089 
00090     for (uint i = 0; i < threads.size(); ++i) {
00091         threads[i]->join();
00092     }
00093 
00094     guard.lock();
00095     for (uint i = 0; i < threads.size(); ++i) {
00096         deleteAndNullify(threads[i]);
00097     }
00098     threads.clear();
00099 }
00100 
00101 void ThreadPoolBase::runPooledThread()
00102 {
00103     // TODO jvs 28-Jul-2008:  resource acquisition as initialization
00104     if (pThreadTracker) {
00105         pThreadTracker->onThreadStart();
00106     }
00107     try {
00108         StrictMutexGuard guard(mutex);
00109         for (;;) {
00110             while ((state != STATE_STOPPED) && isQueueEmpty()) {
00111                 condition.wait(guard);
00112             }
00113             if (state == STATE_STOPPED) {
00114                 break;
00115             }
00116             runOneTask(guard);
00117             stoppingCondition.notify_one();
00118         }
00119     } catch (...) {
00120         if (pThreadTracker) {
00121             pThreadTracker->onThreadEnd();
00122         }
00123         throw;
00124     }
00125     if (pThreadTracker) {
00126         pThreadTracker->onThreadEnd();
00127     }
00128 }
00129 
00130 void ThreadPoolBase::setThreadTracker(ThreadTracker &threadTracker)
00131 {
00132     pThreadTracker = &threadTracker;
00133 }
00134 
00135 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/synch/ThreadPool.cpp#11 $");
00136 
00137 // End ThreadPool.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1