ThreadPool< Task > Class Template Reference

ThreadPool is a very simple thread-pooling implementation. More...

#include <ThreadPool.h>

Inheritance diagram for ThreadPool< Task >:

ThreadPoolBase SynchMonitoredObject List of all members.

Public Member Functions

 ThreadPool ()
 Constructor.
virtual ~ThreadPool ()
 Destructor: stop must already have been called.
void submitTask (Task &task)
 Submits a task to the pool.
void start (uint nThreads)
 Starts the given number of threads in the pool.
void stop ()
 Shuts down the pool, waiting for any pending tasks to complete.
void setThreadTracker (ThreadTracker &threadTracker)
 Sets a tracker to use for created threads.

Protected Types

enum  State { STATE_STARTED, STATE_STOPPING, STATE_STOPPED }

Protected Attributes

std::vector< PooledThread * > threads
State state
LocalCondition stoppingCondition
ThreadTrackerpThreadTracker
StrictMutex mutex
LocalCondition condition

Private Member Functions

virtual bool isQueueEmpty ()
virtual void runOneTask (StrictMutexGuard &guard)

Private Attributes

std::deque< Task > queue

Detailed Description

template<class Task>
class ThreadPool< Task >

ThreadPool is a very simple thread-pooling implementation.

It's a template to avoid requiring task queue entries to be dynamically allocated.

The Task template parameter must behave as a concrete data type, and must have a method execute().

Definition at line 95 of file ThreadPool.h.


Member Enumeration Documentation

enum ThreadPoolBase::State [protected, inherited]

Enumerator:
STATE_STARTED 
STATE_STOPPING 
STATE_STOPPED 

Definition at line 46 of file ThreadPool.h.

00046                {
00047         STATE_STARTED,
00048         STATE_STOPPING,
00049         STATE_STOPPED
00050     };


Constructor & Destructor Documentation

template<class Task>
ThreadPool< Task >::ThreadPool (  )  [inline, explicit]

Constructor.

Definition at line 117 of file ThreadPool.h.

00118     {
00119         pThreadTracker = NULL;
00120     }

template<class Task>
virtual ThreadPool< Task >::~ThreadPool (  )  [inline, virtual]

Destructor: stop must already have been called.

Definition at line 125 of file ThreadPool.h.

00126     {
00127     }


Member Function Documentation

template<class Task>
virtual bool ThreadPool< Task >::isQueueEmpty (  )  [inline, private, virtual]

Implements ThreadPoolBase.

Definition at line 99 of file ThreadPool.h.

00100     {
00101         return queue.empty();
00102     }

template<class Task>
virtual void ThreadPool< Task >::runOneTask ( StrictMutexGuard guard  )  [inline, private, virtual]

Implements ThreadPoolBase.

Definition at line 104 of file ThreadPool.h.

00105     {
00106         Task task = queue.front();
00107         queue.pop_front();
00108         guard.unlock();
00109         task.execute();
00110         guard.lock();
00111     }

template<class Task>
void ThreadPool< Task >::submitTask ( Task &  task  )  [inline]

Submits a task to the pool.

It will be executed as soon as a thread is available.

Parameters:
task the task to execute, expressed as a function object

Definition at line 135 of file ThreadPool.h.

Referenced by ParallelExecStreamScheduler::addToQueue(), ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ThreadPoolScheduler::schedule(), and ParallelExecStreamScheduler::start().

00136     {
00137         StrictMutexGuard guard(mutex);
00138         assert(state == STATE_STARTED);
00139         queue.push_back(task);
00140         condition.notify_one();
00141     }

void ThreadPoolBase::start ( uint  nThreads  )  [inherited]

Starts the given number of threads in the pool.

Parameters:
nThreads number of threads to start

Definition at line 60 of file ThreadPool.cpp.

References SynchMonitoredObject::mutex, ThreadPoolBase::PooledThread, ThreadPoolBase::state, ThreadPoolBase::STATE_STARTED, ThreadPoolBase::STATE_STOPPED, and ThreadPoolBase::threads.

Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::start(), and ThreadPoolScheduler::ThreadPoolScheduler().

00061 {
00062     StrictMutexGuard guard(mutex);
00063     assert(state == STATE_STOPPED);
00064     assert(nThreads > 0);
00065     state = STATE_STARTED;
00066     for (uint i = 0; i < nThreads; ++i) {
00067         PooledThread *pThread = new PooledThread(*this);
00068         pThread->start();
00069         threads.push_back(pThread);
00070     }
00071 }

void ThreadPoolBase::stop (  )  [inherited]

Shuts down the pool, waiting for any pending tasks to complete.

The start/stop calls should never be invoked from more than one thread simultaneously.

Definition at line 73 of file ThreadPool.cpp.

References SynchMonitoredObject::condition, deleteAndNullify(), ThreadPoolBase::isQueueEmpty(), SynchMonitoredObject::mutex, ThreadPoolBase::state, ThreadPoolBase::STATE_STOPPED, ThreadPoolBase::STATE_STOPPING, ThreadPoolBase::stoppingCondition, and ThreadPoolBase::threads.

Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::stop(), and ThreadPoolScheduler::stop().

00074 {
00075     StrictMutexGuard guard(mutex);
00076     assert(state != STATE_STOPPING);
00077     if (state == STATE_STOPPED) {
00078         return;
00079     }
00080     state = STATE_STOPPING;
00081 
00082     while (!isQueueEmpty()) {
00083         stoppingCondition.wait(guard);
00084     }
00085 
00086     state = STATE_STOPPED;
00087     condition.notify_all();
00088     guard.unlock();
00089 
00090     for (uint i = 0; i < threads.size(); ++i) {
00091         threads[i]->join();
00092     }
00093 
00094     guard.lock();
00095     for (uint i = 0; i < threads.size(); ++i) {
00096         deleteAndNullify(threads[i]);
00097     }
00098     threads.clear();
00099 }

void ThreadPoolBase::setThreadTracker ( ThreadTracker threadTracker  )  [inherited]

Sets a tracker to use for created threads.

Parameters:
threadTracker tracker to use

Definition at line 130 of file ThreadPool.cpp.

References ThreadPoolBase::pThreadTracker.

Referenced by ParallelExecStreamScheduler::ParallelExecStreamScheduler().

00131 {
00132     pThreadTracker = &threadTracker;
00133 }


Member Data Documentation

template<class Task>
std::deque<Task> ThreadPool< Task >::queue [private]

Definition at line 97 of file ThreadPool.h.

Referenced by ThreadPool< RandomAccessRequest >::isQueueEmpty(), ThreadPool< RandomAccessRequest >::runOneTask(), and ThreadPool< RandomAccessRequest >::submitTask().

std::vector<PooledThread *> ThreadPoolBase::threads [protected, inherited]

Definition at line 52 of file ThreadPool.h.

Referenced by ThreadPoolBase::start(), and ThreadPoolBase::stop().

State ThreadPoolBase::state [protected, inherited]

Definition at line 53 of file ThreadPool.h.

Referenced by ThreadPoolBase::runPooledThread(), ThreadPoolBase::start(), ThreadPoolBase::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ThreadPoolBase::ThreadPoolBase(), and ThreadPoolBase::~ThreadPoolBase().

LocalCondition ThreadPoolBase::stoppingCondition [protected, inherited]

Definition at line 54 of file ThreadPool.h.

Referenced by ThreadPoolBase::runPooledThread(), and ThreadPoolBase::stop().

ThreadTracker* ThreadPoolBase::pThreadTracker [protected, inherited]

Definition at line 55 of file ThreadPool.h.

Referenced by ThreadPoolBase::runPooledThread(), ThreadPoolBase::setThreadTracker(), and ThreadPool< RandomAccessRequest >::ThreadPool().

StrictMutex SynchMonitoredObject::mutex [protected, inherited]

Definition at line 38 of file SynchMonitoredObject.h.

Referenced by ParallelExecStreamScheduler::abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), ParallelExecStreamScheduler::executeManager(), ParallelExecStreamScheduler::executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), ParallelExecStreamScheduler::signalSentinel(), ThreadPoolBase::start(), TimerThread::stop(), ThreadPoolBase::stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().

LocalCondition SynchMonitoredObject::condition [protected, inherited]

Definition at line 39 of file SynchMonitoredObject.h.

Referenced by ParallelExecStreamScheduler::abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), ParallelExecStreamScheduler::executeTask(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), ThreadPoolBase::stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().


The documentation for this class was generated from the following file:
Generated on Mon Jun 22 04:00:48 2009 for Fennel by  doxygen 1.5.1