#include <ThreadPool.h>
Inheritance diagram for ThreadPoolBase:
Public Member Functions | |
void | start (uint nThreads) |
Starts the given number of threads in the pool. | |
void | stop () |
Shuts down the pool, waiting for any pending tasks to complete. | |
void | setThreadTracker (ThreadTracker &threadTracker) |
Sets a tracker to use for created threads. | |
Protected Types | |
enum | State { STATE_STARTED, STATE_STOPPING, STATE_STOPPED } |
Protected Member Functions | |
ThreadPoolBase () | |
virtual | ~ThreadPoolBase () |
virtual bool | isQueueEmpty ()=0 |
virtual void | runOneTask (StrictMutexGuard &)=0 |
Protected Attributes | |
std::vector< PooledThread * > | threads |
State | state |
LocalCondition | stoppingCondition |
ThreadTracker * | pThreadTracker |
StrictMutex | mutex |
LocalCondition | condition |
Private Member Functions | |
void | runPooledThread () |
Friends | |
class | PooledThread |
Definition at line 40 of file ThreadPool.h.
enum ThreadPoolBase::State [protected] |
Definition at line 46 of file ThreadPool.h.
00046 { 00047 STATE_STARTED, 00048 STATE_STOPPING, 00049 STATE_STOPPED 00050 };
ThreadPoolBase::ThreadPoolBase | ( | ) | [explicit, protected] |
Definition at line 50 of file ThreadPool.cpp.
References state, and STATE_STOPPED.
00051 { 00052 state = STATE_STOPPED; 00053 }
ThreadPoolBase::~ThreadPoolBase | ( | ) | [protected, virtual] |
Definition at line 55 of file ThreadPool.cpp.
References state, and STATE_STOPPED.
00056 { 00057 assert(state == STATE_STOPPED); 00058 }
void ThreadPoolBase::runPooledThread | ( | ) | [private] |
Definition at line 101 of file ThreadPool.cpp.
References SynchMonitoredObject::condition, isQueueEmpty(), SynchMonitoredObject::mutex, ThreadTracker::onThreadEnd(), ThreadTracker::onThreadStart(), pThreadTracker, runOneTask(), state, STATE_STOPPED, and stoppingCondition.
Referenced by PooledThread::run().
00102 { 00103 // TODO jvs 28-Jul-2008: resource acquisition as initialization 00104 if (pThreadTracker) { 00105 pThreadTracker->onThreadStart(); 00106 } 00107 try { 00108 StrictMutexGuard guard(mutex); 00109 for (;;) { 00110 while ((state != STATE_STOPPED) && isQueueEmpty()) { 00111 condition.wait(guard); 00112 } 00113 if (state == STATE_STOPPED) { 00114 break; 00115 } 00116 runOneTask(guard); 00117 stoppingCondition.notify_one(); 00118 } 00119 } catch (...) { 00120 if (pThreadTracker) { 00121 pThreadTracker->onThreadEnd(); 00122 } 00123 throw; 00124 } 00125 if (pThreadTracker) { 00126 pThreadTracker->onThreadEnd(); 00127 } 00128 }
virtual bool ThreadPoolBase::isQueueEmpty | ( | ) | [protected, pure virtual] |
Implemented in ThreadPool< Task >, ThreadPool< ExternalSortTask >, ThreadPool< ParallelExecTask >, and ThreadPool< RandomAccessRequest >.
Referenced by runPooledThread(), and stop().
virtual void ThreadPoolBase::runOneTask | ( | StrictMutexGuard & | ) | [protected, pure virtual] |
Implemented in ThreadPool< Task >, ThreadPool< ExternalSortTask >, ThreadPool< ParallelExecTask >, and ThreadPool< RandomAccessRequest >.
Referenced by runPooledThread().
void ThreadPoolBase::start | ( | uint | nThreads | ) |
Starts the given number of threads in the pool.
nThreads | number of threads to start |
Definition at line 60 of file ThreadPool.cpp.
References SynchMonitoredObject::mutex, PooledThread, state, STATE_STARTED, STATE_STOPPED, and threads.
Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::start(), and ThreadPoolScheduler::ThreadPoolScheduler().
00061 { 00062 StrictMutexGuard guard(mutex); 00063 assert(state == STATE_STOPPED); 00064 assert(nThreads > 0); 00065 state = STATE_STARTED; 00066 for (uint i = 0; i < nThreads; ++i) { 00067 PooledThread *pThread = new PooledThread(*this); 00068 pThread->start(); 00069 threads.push_back(pThread); 00070 } 00071 }
void ThreadPoolBase::stop | ( | ) |
Shuts down the pool, waiting for any pending tasks to complete.
The start/stop calls should never be invoked from more than one thread simultaneously.
Definition at line 73 of file ThreadPool.cpp.
References SynchMonitoredObject::condition, deleteAndNullify(), isQueueEmpty(), SynchMonitoredObject::mutex, state, STATE_STOPPED, STATE_STOPPING, stoppingCondition, and threads.
Referenced by ExternalSortExecStreamImpl::computeFirstResultParallel(), ThreadedTestBase::runThreadedTestCase(), ParallelExecStreamScheduler::stop(), and ThreadPoolScheduler::stop().
00074 { 00075 StrictMutexGuard guard(mutex); 00076 assert(state != STATE_STOPPING); 00077 if (state == STATE_STOPPED) { 00078 return; 00079 } 00080 state = STATE_STOPPING; 00081 00082 while (!isQueueEmpty()) { 00083 stoppingCondition.wait(guard); 00084 } 00085 00086 state = STATE_STOPPED; 00087 condition.notify_all(); 00088 guard.unlock(); 00089 00090 for (uint i = 0; i < threads.size(); ++i) { 00091 threads[i]->join(); 00092 } 00093 00094 guard.lock(); 00095 for (uint i = 0; i < threads.size(); ++i) { 00096 deleteAndNullify(threads[i]); 00097 } 00098 threads.clear(); 00099 }
void ThreadPoolBase::setThreadTracker | ( | ThreadTracker & | threadTracker | ) |
Sets a tracker to use for created threads.
threadTracker | tracker to use |
Definition at line 130 of file ThreadPool.cpp.
References pThreadTracker.
Referenced by ParallelExecStreamScheduler::ParallelExecStreamScheduler().
00131 { 00132 pThreadTracker = &threadTracker; 00133 }
friend class PooledThread [friend] |
std::vector<PooledThread *> ThreadPoolBase::threads [protected] |
State ThreadPoolBase::state [protected] |
Definition at line 53 of file ThreadPool.h.
Referenced by runPooledThread(), start(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), ThreadPoolBase(), and ~ThreadPoolBase().
LocalCondition ThreadPoolBase::stoppingCondition [protected] |
ThreadTracker* ThreadPoolBase::pThreadTracker [protected] |
Definition at line 55 of file ThreadPool.h.
Referenced by runPooledThread(), setThreadTracker(), and ThreadPool< RandomAccessRequest >::ThreadPool().
StrictMutex SynchMonitoredObject::mutex [protected, inherited] |
Definition at line 38 of file SynchMonitoredObject.h.
Referenced by ParallelExecStreamScheduler::abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), ParallelExecStreamScheduler::executeManager(), ParallelExecStreamScheduler::executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), ParallelExecStreamScheduler::signalSentinel(), start(), TimerThread::stop(), stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().
LocalCondition SynchMonitoredObject::condition [protected, inherited] |
Definition at line 39 of file SynchMonitoredObject.h.
Referenced by ParallelExecStreamScheduler::abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), ParallelExecStreamScheduler::executeTask(), ParallelExecStreamScheduler::readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), stop(), ParallelExecStreamScheduler::stop(), ThreadPool< RandomAccessRequest >::submitTask(), ParallelExecStreamScheduler::tryExecuteManager(), ParallelExecStreamScheduler::tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().