ThreadPoolBase Class Reference

ThreadPoolBase defines the non-templated portion of ThreadPool. More...

#include <ThreadPool.h>

Inheritance diagram for ThreadPoolBase:

SynchMonitoredObject ThreadPool< Task > List of all members.

Public Member Functions

void start (uint nThreads)
 Starts the given number of threads in the pool.
void stop ()
 Shuts down the pool, waiting for any pending tasks to complete.
void setThreadTracker (ThreadTracker &threadTracker)
 Sets a tracker to use for created threads.

Protected Types

enum  State { STATE_STARTED, STATE_STOPPING, STATE_STOPPED }

Protected Member Functions

 ThreadPoolBase ()
virtual ~ThreadPoolBase ()
virtual bool isQueueEmpty ()=0
virtual void runOneTask (StrictMutexGuard &)=0

Protected Attributes

std::vector< PooledThread * > threads
State state
LocalCondition stoppingCondition
ThreadTrackerpThreadTracker
StrictMutex mutex
LocalCondition condition

Private Member Functions

void runPooledThread ()

Friends

class PooledThread

Detailed Description

ThreadPoolBase defines the non-templated portion of ThreadPool.

Definition at line 40 of file ThreadPool.h.


Member Enumeration Documentation

enum ThreadPoolBase::State [protected]

Enumerator:
STATE_STARTED 
STATE_STOPPING 
STATE_STOPPED 

Definition at line 46 of file ThreadPool.h.

00046                {
00047         STATE_STARTED,
00048         STATE_STOPPING,
00049         STATE_STOPPED
00050     };


Constructor & Destructor Documentation

ThreadPoolBase::ThreadPoolBase (  )  [explicit, protected]

Definition at line 50 of file ThreadPool.cpp.

References state, and STATE_STOPPED.

00051 {
00052     state = STATE_STOPPED;
00053 }

ThreadPoolBase::~ThreadPoolBase (  )  [protected, virtual]

Definition at line 55 of file ThreadPool.cpp.

References state, and STATE_STOPPED.

00056 {
00057     assert(state == STATE_STOPPED);
00058 }


Member Function Documentation

void ThreadPoolBase::runPooledThread (  )  [private]

Definition at line 101 of file ThreadPool.cpp.

References SynchMonitoredObject::condition, isQueueEmpty(), SynchMonitoredObject::mutex, ThreadTracker::onThreadEnd(), ThreadTracker::onThreadStart(), pThreadTracker, runOneTask(), state, STATE_STOPPED, and stoppingCondition.

Referenced by PooledThread::run().

00102 {
00103     // TODO jvs 28-Jul-2008:  resource acquisition as initialization
00104     if (pThreadTracker) {
00105         pThreadTracker->onThreadStart();
00106     }
00107     try {
00108         StrictMutexGuard guard(mutex);
00109         for (;;) {
00110             while ((state != STATE_STOPPED) && isQueueEmpty()) {
00111                 condition.wait(guard);
00112             }
00113             if (state == STATE_STOPPED) {
00114                 break;
00115             }
00116             runOneTask(guard);
00117             stoppingCondition.notify_one();
00118         }
00119     } catch (...) {
00120         if (pThreadTracker) {
00121             pThreadTracker->onThreadEnd();
00122         }
00123         throw;
00124     }
00125     if (pThreadTracker) {
00126         pThreadTracker->onThreadEnd();
00127     }
00128 }

virtual bool ThreadPoolBase::isQueueEmpty (  )  [protected, pure virtual]

Implemented in ThreadPool< Task >, ThreadPool< ExternalSortTask >, ThreadPool< ParallelExecTask >, and ThreadPool< RandomAccessRequest >.

Referenced by runPooledThread(), and stop().

virtual void ThreadPoolBase::runOneTask ( StrictMutexGuard  )  [protected, pure virtual]

Implemented in ThreadPool< Task >, ThreadPool< ExternalSortTask >, ThreadPool< ParallelExecTask >, and ThreadPool< RandomAccessRequest >.

Referenced by runPooledThread().

void ThreadPoolBase::start ( uint  nThreads  ) 

Starts the given number of threads in the pool.

Parameters:
nThreads number of threads to start

Definition at line 60 of file ThreadPool.cpp.

References SynchMonitoredObject::mutex, PooledThread, state, STATE_STARTED, STATE_STOPPED, and threads.

Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::start(), and ThreadPoolScheduler::ThreadPoolScheduler().

00061 {
00062     StrictMutexGuard guard(mutex);
00063     assert(state == STATE_STOPPED);
00064     assert(nThreads > 0);
00065     state = STATE_STARTED;
00066     for (uint i = 0; i < nThreads; ++i) {
00067         PooledThread *pThread = new PooledThread(*this);
00068         pThread->start();
00069         threads.push_back(pThread);
00070     }
00071 }

void ThreadPoolBase::stop (  ) 

Shuts down the pool, waiting for any pending tasks to complete.

The start/stop calls should never be invoked from more than one thread simultaneously.

Definition at line 73 of file ThreadPool.cpp.

References SynchMonitoredObject::condition, deleteAndNullify(), isQueueEmpty(), SynchMonitoredObject::mutex, state, STATE_STOPPED, STATE_STOPPING, stoppingCondition, and threads.

Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::stop(), and ThreadPoolScheduler::stop().

00074 {
00075     StrictMutexGuard guard(mutex);
00076     assert(state != STATE_STOPPING);
00077     if (state == STATE_STOPPED) {
00078         return;
00079     }
00080     state = STATE_STOPPING;
00081 
00082     while (!isQueueEmpty()) {
00083         stoppingCondition.wait(guard);
00084     }
00085 
00086     state = STATE_STOPPED;
00087     condition.notify_all();
00088     guard.unlock();
00089 
00090     for (uint i = 0; i < threads.size(); ++i) {
00091         threads[i]->join();
00092     }
00093 
00094     guard.lock();
00095     for (uint i = 0; i < threads.size(); ++i) {
00096         deleteAndNullify(threads[i]);
00097     }
00098     threads.clear();
00099 }

void ThreadPoolBase::setThreadTracker ( ThreadTracker threadTracker  ) 

Sets a tracker to use for created threads.

Parameters:
threadTracker tracker to use

Definition at line 130 of file ThreadPool.cpp.

References pThreadTracker.

Referenced by ParallelExecStreamScheduler::ParallelExecStreamScheduler().

00131 {
00132     pThreadTracker = &threadTracker;
00133 }


Friends And Related Function Documentation

friend class PooledThread [friend]

Definition at line 42 of file ThreadPool.h.

Referenced by start().


Member Data Documentation

std::vector<PooledThread *> ThreadPoolBase::threads [protected]

Definition at line 52 of file ThreadPool.h.

Referenced by start(), and stop().

State ThreadPoolBase::state [protected]

Definition at line 53 of file ThreadPool.h.

Referenced by runPooledThread(), start(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), ThreadPoolBase(), and ~ThreadPoolBase().

LocalCondition ThreadPoolBase::stoppingCondition [protected]

Definition at line 54 of file ThreadPool.h.

Referenced by runPooledThread(), and stop().

ThreadTracker* ThreadPoolBase::pThreadTracker [protected]

Definition at line 55 of file ThreadPool.h.

Referenced by runPooledThread(), setThreadTracker(), and ThreadPool< RandomAccessRequest >::ThreadPool().

StrictMutex SynchMonitoredObject::mutex [protected, inherited]

Definition at line 38 of file SynchMonitoredObject.h.

Referenced by ParallelExecStreamScheduler::abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), ParallelExecStreamScheduler::executeManager(), ParallelExecStreamScheduler::executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), ParallelExecStreamScheduler::signalSentinel(), start(), TimerThread::stop(), stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().

LocalCondition SynchMonitoredObject::condition [protected, inherited]

Definition at line 39 of file SynchMonitoredObject.h.

Referenced by ParallelExecStreamScheduler::abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), ParallelExecStreamScheduler::executeTask(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:48 2009 for Fennel by  doxygen 1.5.1