#include <ThreadPool.h>
Inheritance diagram for ThreadPool< Task >:
Public Member Functions | |
ThreadPool () | |
Constructor. | |
virtual | ~ThreadPool () |
Destructor: stop must already have been called. | |
void | submitTask (Task &task) |
Submits a task to the pool. | |
void | start (uint nThreads) |
Starts the given number of threads in the pool. | |
void | stop () |
Shuts down the pool, waiting for any pending tasks to complete. | |
void | setThreadTracker (ThreadTracker &threadTracker) |
Sets a tracker to use for created threads. | |
Protected Types | |
enum | State { STATE_STARTED, STATE_STOPPING, STATE_STOPPED } |
Protected Attributes | |
std::vector< PooledThread * > | threads |
State | state |
LocalCondition | stoppingCondition |
ThreadTracker * | pThreadTracker |
StrictMutex | mutex |
LocalCondition | condition |
Private Member Functions | |
virtual bool | isQueueEmpty () |
virtual void | runOneTask (StrictMutexGuard &guard) |
Private Attributes | |
std::deque< Task > | queue |
It's a template to avoid requiring task queue entries to be dynamically allocated.
The Task template parameter must behave as a concrete data type, and must have a method execute().
Definition at line 95 of file ThreadPool.h.
enum ThreadPoolBase::State [protected, inherited] |
Definition at line 46 of file ThreadPool.h.
00046 { 00047 STATE_STARTED, 00048 STATE_STOPPING, 00049 STATE_STOPPED 00050 };
ThreadPool< Task >::ThreadPool | ( | ) | [inline, explicit] |
Constructor.
Definition at line 117 of file ThreadPool.h.
00118 { 00119 pThreadTracker = NULL; 00120 }
virtual ThreadPool< Task >::~ThreadPool | ( | ) | [inline, virtual] |
virtual bool ThreadPool< Task >::isQueueEmpty | ( | ) | [inline, private, virtual] |
Implements ThreadPoolBase.
Definition at line 99 of file ThreadPool.h.
00100 { 00101 return queue.empty(); 00102 }
virtual void ThreadPool< Task >::runOneTask | ( | StrictMutexGuard & | guard | ) | [inline, private, virtual] |
Implements ThreadPoolBase.
Definition at line 104 of file ThreadPool.h.
00105 { 00106 Task task = queue.front(); 00107 queue.pop_front(); 00108 guard.unlock(); 00109 task.execute(); 00110 guard.lock(); 00111 }
void ThreadPool< Task >::submitTask | ( | Task & | task | ) | [inline] |
Submits a task to the pool.
It will be executed as soon as a thread is available.
task | the task to execute, expressed as a function object |
Definition at line 135 of file ThreadPool.h.
Referenced by ParallelExecStreamScheduler::addToQueue(), ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ThreadPoolScheduler::schedule(), and ParallelExecStreamScheduler::start().
00136 { 00137 StrictMutexGuard guard(mutex); 00138 assert(state == STATE_STARTED); 00139 queue.push_back(task); 00140 condition.notify_one(); 00141 }
void ThreadPoolBase::start | ( | uint | nThreads | ) | [inherited] |
Starts the given number of threads in the pool.
nThreads | number of threads to start |
Definition at line 60 of file ThreadPool.cpp.
References SynchMonitoredObject::mutex, ThreadPoolBase::PooledThread, ThreadPoolBase::state, ThreadPoolBase::STATE_STARTED, ThreadPoolBase::STATE_STOPPED, and ThreadPoolBase::threads.
Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::start(), and ThreadPoolScheduler::ThreadPoolScheduler().
00061 { 00062 StrictMutexGuard guard(mutex); 00063 assert(state == STATE_STOPPED); 00064 assert(nThreads > 0); 00065 state = STATE_STARTED; 00066 for (uint i = 0; i < nThreads; ++i) { 00067 PooledThread *pThread = new PooledThread(*this); 00068 pThread->start(); 00069 threads.push_back(pThread); 00070 } 00071 }
void ThreadPoolBase::stop | ( | ) | [inherited] |
Shuts down the pool, waiting for any pending tasks to complete.
The start/stop calls should never be invoked from more than one thread simultaneously.
Definition at line 73 of file ThreadPool.cpp.
References SynchMonitoredObject::condition, deleteAndNullify(), ThreadPoolBase::isQueueEmpty(), SynchMonitoredObject::mutex, ThreadPoolBase::state, ThreadPoolBase::STATE_STOPPED, ThreadPoolBase::STATE_STOPPING, ThreadPoolBase::stoppingCondition, and ThreadPoolBase::threads.
Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::stop(), and ThreadPoolScheduler::stop().
00074 { 00075 StrictMutexGuard guard(mutex); 00076 assert(state != STATE_STOPPING); 00077 if (state == STATE_STOPPED) { 00078 return; 00079 } 00080 state = STATE_STOPPING; 00081 00082 while (!isQueueEmpty()) { 00083 stoppingCondition.wait(guard); 00084 } 00085 00086 state = STATE_STOPPED; 00087 condition.notify_all(); 00088 guard.unlock(); 00089 00090 for (uint i = 0; i < threads.size(); ++i) { 00091 threads[i]->join(); 00092 } 00093 00094 guard.lock(); 00095 for (uint i = 0; i < threads.size(); ++i) { 00096 deleteAndNullify(threads[i]); 00097 } 00098 threads.clear(); 00099 }
void ThreadPoolBase::setThreadTracker | ( | ThreadTracker & | threadTracker | ) | [inherited] |
Sets a tracker to use for created threads.
threadTracker | tracker to use |
Definition at line 130 of file ThreadPool.cpp.
References ThreadPoolBase::pThreadTracker.
Referenced by ParallelExecStreamScheduler::ParallelExecStreamScheduler().
00131 { 00132 pThreadTracker = &threadTracker; 00133 }
std::deque<Task> ThreadPool< Task >::queue [private] |
Definition at line 97 of file ThreadPool.h.
Referenced by ThreadPool< RandomAccessRequest >::isQueueEmpty(), ThreadPool< RandomAccessRequest >::runOneTask(), and ThreadPool< RandomAccessRequest >::submitTask().
std::vector<PooledThread *> ThreadPoolBase::threads [protected, inherited] |
Definition at line 52 of file ThreadPool.h.
Referenced by ThreadPoolBase::start(), and ThreadPoolBase::stop().
State ThreadPoolBase::state [protected, inherited] |
Definition at line 53 of file ThreadPool.h.
Referenced by ThreadPoolBase::runPooledThread(), ThreadPoolBase::start(), ThreadPoolBase::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ThreadPoolBase::ThreadPoolBase(), and ThreadPoolBase::~ThreadPoolBase().
LocalCondition ThreadPoolBase::stoppingCondition [protected, inherited] |
Definition at line 54 of file ThreadPool.h.
Referenced by ThreadPoolBase::runPooledThread(), and ThreadPoolBase::stop().
ThreadTracker* ThreadPoolBase::pThreadTracker [protected, inherited] |
Definition at line 55 of file ThreadPool.h.
Referenced by ThreadPoolBase::runPooledThread(), ThreadPoolBase::setThreadTracker(), and ThreadPool< RandomAccessRequest >::ThreadPool().
StrictMutex SynchMonitoredObject::mutex [protected, inherited] |
Definition at line 38 of file SynchMonitoredObject.h.
Referenced by ParallelExecStreamScheduler::abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), ParallelExecStreamScheduler::executeManager(), ParallelExecStreamScheduler::executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), ParallelExecStreamScheduler::signalSentinel(), ThreadPoolBase::start(), TimerThread::stop(), ThreadPoolBase::stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().
LocalCondition SynchMonitoredObject::condition [protected, inherited] |
Definition at line 39 of file SynchMonitoredObject.h.
Referenced by ParallelExecStreamScheduler::abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), ParallelExecStreamScheduler::executeTask(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), ThreadPoolBase::stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().