#include <ParallelExecStreamScheduler.h>
Inheritance diagram for ParallelExecStreamScheduler:
Public Member Functions | |
ParallelExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name, ThreadTracker &threadTracker, uint degreeOfParallelism) | |
Constructs a new scheduler. | |
virtual | ~ParallelExecStreamScheduler () |
virtual void | addGraph (SharedExecStreamGraph pGraph) |
Adds a graph to be scheduled. | |
virtual void | removeGraph (SharedExecStreamGraph pGraph) |
Removes a graph currently being scheduled. | |
virtual void | start () |
Starts this scheduler, preparing it to execute streams. | |
virtual void | setRunnable (ExecStream &stream, bool) |
Sets whether that a specific stream should be considered for execution. | |
virtual void | makeRunnable (ExecStream &stream) |
Requests that a specific stream be considered for execution. | |
virtual void | abort (ExecStreamGraph &graph) |
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn. | |
virtual void | stop () |
Shuts down this scheduler, preventing any further streams from being scheduled. | |
virtual ExecStreamBufAccessor & | readStream (ExecStream &stream) |
Reads data from a stream, first performing any scheduling necessary to make output available. | |
virtual void | createBufferProvisionAdapter (ExecStreamEmbryo &embryo) |
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER. | |
virtual uint | getDegreeOfParallelism () |
| |
virtual void | traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel) |
Traces the contents of a stream buffer. | |
virtual SharedExecStreamBufAccessor | newBufAccessor () |
Creates a new ExecStreamBufAccessor suitable for use with this scheduler. | |
virtual void | createCopyProvisionAdapter (ExecStreamEmbryo &embryo) |
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER. | |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
Protected Member Functions | |
ExecStreamResult | executeStream (ExecStream &stream, ExecStreamQuantum const &quantum) |
Executes one stream, performing tracing if enabled. | |
virtual void | tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum) |
Traces before execution of a stream. | |
virtual void | tracePostExecution (ExecStream &stream, ExecStreamResult rc) |
Traces after execution of a stream. | |
virtual void | traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel) |
Traces the states of the input and output buffers adjacent to a stream. | |
Protected Attributes | |
bool | tracingFine |
StrictMutex | mutex |
LocalCondition | condition |
Private Types | |
enum | StreamState { SS_SLEEPING, SS_RUNNING, SS_INHIBITED } |
enum | ManagerState { MGR_RUNNING, MGR_STOPPING, MGR_STOPPED } |
typedef std::hash_map< ExecStreamId, StreamStateMapEntry > | StreamStateMap |
typedef std::deque< ExecStreamId > | InhibitedQueue |
Private Member Functions | |
void | tryExecuteManager () |
void | executeManager () |
void | tryExecuteTask (ExecStream &) |
void | executeTask (ExecStream &) |
bool | addToQueue (ExecStreamId streamId) |
void | signalSentinel (ExecStreamId sentinelId) |
void | retryInhibitedQueue () |
void | processCompletedTask (ParallelExecResult const &task) |
bool | isInhibited (ExecStreamId streamId) |
void | alterNeighborInhibition (ExecStreamId streamId, int delta) |
Private Attributes | |
SharedExecStreamGraph | pGraph |
ThreadPool< ParallelExecTask > | threadPool |
std::deque< ParallelExecResult > | completedQueue |
ThreadTracker & | threadTracker |
StreamStateMap | streamStateMap |
ManagerState | mgrState |
InhibitedQueue | inhibitedQueue |
InhibitedQueue | transitQueue |
LocalCondition | sentinelCondition |
uint | degreeOfParallelism |
boost::scoped_ptr< FennelExcn > | pPendingExcn |
Friends | |
class | ParallelExecTask |
Classes | |
struct | StreamStateMapEntry |
For more information, see the design doc in Eigenpedia.
Definition at line 98 of file ParallelExecStreamScheduler.h.
typedef std::hash_map<ExecStreamId, StreamStateMapEntry> ParallelExecStreamScheduler::StreamStateMap [private] |
Definition at line 121 of file ParallelExecStreamScheduler.h.
typedef std::deque<ExecStreamId> ParallelExecStreamScheduler::InhibitedQueue [private] |
Definition at line 122 of file ParallelExecStreamScheduler.h.
enum ParallelExecStreamScheduler::StreamState [private] |
Definition at line 101 of file ParallelExecStreamScheduler.h.
00102 { 00103 SS_SLEEPING, 00104 SS_RUNNING, 00105 SS_INHIBITED 00106 };
enum ParallelExecStreamScheduler::ManagerState [private] |
Definition at line 114 of file ParallelExecStreamScheduler.h.
00114 { 00115 MGR_RUNNING, 00116 MGR_STOPPING, 00117 MGR_STOPPED 00118 };
ParallelExecStreamScheduler::ParallelExecStreamScheduler | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name, | |||
ThreadTracker & | threadTracker, | |||
uint | degreeOfParallelism | |||
) | [explicit] |
Constructs a new scheduler.
pTraceTarget | the TraceTarget to which messages will be sent, or NULL to disable tracing entirely | |
name | the name to use for tracing this scheduler | |
threadTracker | tracker to use for threads created during parallelization | |
degreeOfParallelism | number of threads to run |
Definition at line 37 of file ParallelExecStreamScheduler.cpp.
References degreeOfParallelism, MGR_STOPPED, mgrState, ThreadPoolBase::setThreadTracker(), threadPool, and threadTracker.
00042 : TraceSource(pTraceTarget, name), 00043 ExecStreamScheduler(pTraceTarget, name), 00044 threadTracker(threadTrackerInit) 00045 { 00046 degreeOfParallelism = degreeOfParallelismInit; 00047 assert(degreeOfParallelism > 0); 00048 threadPool.setThreadTracker(threadTracker); 00049 mgrState = MGR_STOPPED; 00050 }
ParallelExecStreamScheduler::~ParallelExecStreamScheduler | ( | ) | [virtual] |
void ParallelExecStreamScheduler::tryExecuteManager | ( | ) | [private] |
Definition at line 228 of file ParallelExecStreamScheduler.cpp.
References completedQueue, SynchMonitoredObject::condition, MGR_RUNNING, mgrState, SynchMonitoredObject::mutex, pPendingExcn, processCompletedTask(), and TRACE_FINE.
Referenced by executeManager().
00229 { 00230 FENNEL_TRACE(TRACE_FINE,"manager task starting"); 00231 for (;;) { 00232 StrictMutexGuard mutexGuard(mutex); 00233 while (completedQueue.empty() && (mgrState == MGR_RUNNING) 00234 && !pPendingExcn) 00235 { 00236 condition.wait(mutexGuard); 00237 } 00238 if (pPendingExcn) { 00239 return; 00240 } 00241 if (mgrState != MGR_RUNNING) { 00242 return; 00243 } 00244 while (!completedQueue.empty()) { 00245 ParallelExecResult result = completedQueue.front(); 00246 completedQueue.pop_front(); 00247 // don't hold lock while doing expensive state maintenance 00248 mutexGuard.unlock(); 00249 processCompletedTask(result); 00250 if (pPendingExcn) { 00251 return; 00252 } 00253 mutexGuard.lock(); 00254 } 00255 } 00256 }
void ParallelExecStreamScheduler::executeManager | ( | ) | [private] |
Definition at line 212 of file ParallelExecStreamScheduler.cpp.
References MGR_STOPPED, mgrState, SynchMonitoredObject::mutex, sentinelCondition, and tryExecuteManager().
Referenced by ParallelExecTask::execute().
00213 { 00214 // TODO jvs 16-Aug-2008: RAII 00215 try { 00216 tryExecuteManager(); 00217 } catch (...) { 00218 StrictMutexGuard mutexGuard(mutex); 00219 mgrState = MGR_STOPPED; 00220 sentinelCondition.notify_all(); 00221 throw; 00222 } 00223 StrictMutexGuard mutexGuard(mutex); 00224 mgrState = MGR_STOPPED; 00225 sentinelCondition.notify_all(); 00226 }
void ParallelExecStreamScheduler::tryExecuteTask | ( | ExecStream & | ) | [private] |
Definition at line 399 of file ParallelExecStreamScheduler.cpp.
References completedQueue, SynchMonitoredObject::condition, ExecStreamScheduler::executeStream(), ExecStream::getStreamId(), and SynchMonitoredObject::mutex.
Referenced by executeTask().
00400 { 00401 ExecStreamQuantum quantum; 00402 ExecStreamResult rc = executeStream(stream, quantum); 00403 ParallelExecResult result(stream.getStreamId(), rc); 00404 00405 StrictMutexGuard mutexGuard(mutex); 00406 completedQueue.push_back(result); 00407 condition.notify_one(); 00408 }
void ParallelExecStreamScheduler::executeTask | ( | ExecStream & | ) | [private] |
Definition at line 379 of file ParallelExecStreamScheduler.cpp.
References ThreadTracker::cloneExcn(), SynchMonitoredObject::condition, SynchMonitoredObject::mutex, pPendingExcn, threadTracker, and tryExecuteTask().
Referenced by ParallelExecTask::execute().
00380 { 00381 try { 00382 tryExecuteTask(stream); 00383 } catch (std::exception &ex) { 00384 StrictMutexGuard mutexGuard(mutex); 00385 if (!pPendingExcn) { 00386 pPendingExcn.reset(threadTracker.cloneExcn(ex)); 00387 } 00388 condition.notify_one(); 00389 } catch (...) { 00390 // REVIEW jvs 22-Jul-2008: panic instead? 00391 StrictMutexGuard mutexGuard(mutex); 00392 if (!pPendingExcn) { 00393 pPendingExcn.reset(new FennelExcn("Unknown error")); 00394 } 00395 condition.notify_one(); 00396 } 00397 }
bool ParallelExecStreamScheduler::addToQueue | ( | ExecStreamId | streamId | ) | [private] |
Definition at line 424 of file ParallelExecStreamScheduler.cpp.
References alterNeighborInhibition(), ExecStreamGraphImpl::getStreamFromVertex(), inhibitedQueue, isInhibited(), pPendingExcn, SS_INHIBITED, SS_RUNNING, SS_SLEEPING, streamStateMap, ThreadPool< Task >::submitTask(), and threadPool.
Referenced by processCompletedTask(), and retryInhibitedQueue().
00425 { 00426 if (pPendingExcn) { 00427 return false; 00428 } 00429 switch (streamStateMap[streamId].state) { 00430 case SS_SLEEPING: 00431 { 00432 ExecStreamGraphImpl &graphImpl = 00433 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00434 SharedExecStream pStream = graphImpl.getStreamFromVertex(streamId); 00435 if (!pStream) { 00436 // sentinel node 00437 return true; 00438 } 00439 if (isInhibited(streamId)) { 00440 streamStateMap[streamId].state = SS_INHIBITED; 00441 inhibitedQueue.push_back(streamId); 00442 } else { 00443 streamStateMap[streamId].state = SS_RUNNING; 00444 alterNeighborInhibition(streamId, + 1); 00445 ParallelExecTask task(*this, pStream.get()); 00446 threadPool.submitTask(task); 00447 } 00448 } 00449 break; 00450 case SS_INHIBITED: 00451 case SS_RUNNING: 00452 // ignore request 00453 break; 00454 default: 00455 permAssert(false); 00456 } 00457 return false; 00458 }
void ParallelExecStreamScheduler::signalSentinel | ( | ExecStreamId | sentinelId | ) | [private] |
Definition at line 370 of file ParallelExecStreamScheduler.cpp.
References alterNeighborInhibition(), SynchMonitoredObject::mutex, sentinelCondition, SS_RUNNING, and streamStateMap.
Referenced by processCompletedTask().
00371 { 00372 alterNeighborInhibition(sentinelId, + 1); 00373 00374 StrictMutexGuard mutexGuard(mutex); 00375 streamStateMap[sentinelId].state = SS_RUNNING; 00376 sentinelCondition.notify_all(); 00377 }
void ParallelExecStreamScheduler::retryInhibitedQueue | ( | ) | [private] |
Definition at line 410 of file ParallelExecStreamScheduler.cpp.
References addToQueue(), ExecStreamId, inhibitedQueue, SS_SLEEPING, streamStateMap, and transitQueue.
Referenced by processCompletedTask().
00411 { 00412 // addToQueue may bounce some back to inhibitedQueue, 00413 // so process via transitQueue 00414 transitQueue = inhibitedQueue; 00415 inhibitedQueue.clear(); 00416 while (!transitQueue.empty()) { 00417 ExecStreamId inhibitedStreamId = transitQueue.front(); 00418 transitQueue.pop_front(); 00419 streamStateMap[inhibitedStreamId].state = SS_SLEEPING; 00420 addToQueue(inhibitedStreamId); 00421 } 00422 }
void ParallelExecStreamScheduler::processCompletedTask | ( | ParallelExecResult const & | task | ) | [private] |
Definition at line 303 of file ParallelExecStreamScheduler.cpp.
References addToQueue(), alterNeighborInhibition(), EXECBUF_EMPTY, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ParallelExecResult::getResultCode(), ExecStreamBufAccessor::getState(), ParallelExecResult::getStreamId(), retryInhibitedQueue(), signalSentinel(), SS_SLEEPING, and streamStateMap.
Referenced by tryExecuteManager().
00305 { 00306 ExecStreamId current = result.getStreamId(); 00307 ExecStreamGraphImpl &graphImpl = 00308 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00309 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00310 00311 streamStateMap[current].state = SS_SLEEPING; 00312 alterNeighborInhibition(current, -1); 00313 00314 switch (result.getResultCode()) { 00315 case EXECRC_EOS: 00316 case EXECRC_BUF_OVERFLOW: 00317 case EXECRC_BUF_UNDERFLOW: 00318 { 00319 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00320 boost::out_edges(current, graphRep); 00321 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00322 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00323 ExecStreamBufAccessor &bufAccessor = 00324 graphImpl.getBufAccessorFromEdge(edge); 00325 if (bufAccessor.getState() != EXECBUF_UNDERFLOW) { 00326 ExecStreamId consumer = boost::target(edge, graphRep); 00327 bool sentinel = addToQueue(consumer); 00328 if (sentinel) { 00329 if (bufAccessor.getState() != EXECBUF_EMPTY) { 00330 signalSentinel(consumer); 00331 } 00332 } 00333 } 00334 } 00335 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00336 boost::in_edges(current, graphRep); 00337 bool sawUnderflow = false; 00338 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00339 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00340 ExecStreamBufAccessor &bufAccessor = 00341 graphImpl.getBufAccessorFromEdge(edge); 00342 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) { 00343 ExecStreamId producer = boost::source(edge, graphRep); 00344 addToQueue(producer); 00345 sawUnderflow = true; 00346 } 00347 } 00348 if (!sawUnderflow && 00349 (result.getResultCode() == EXECRC_BUF_UNDERFLOW)) 00350 { 00351 // sometimes, a stream may return underflow, even though none 00352 // of its inputs are in underflow state; instead, some are in 00353 // EOS; we interpret this as a yield request to help keep 00354 // the stream's state machine logic simpler 00355 addToQueue(current); 00356 } 00357 } 00358 break; 00359 case EXECRC_QUANTUM_EXPIRED: 00360 addToQueue(current); 00361 break; 00362 default: 00363 permAssert(false); 00364 } 00365 00366 // REVIEW jvs 4-Jul-2008: only reschedule inhibited neighbors? 00367 retryInhibitedQueue(); 00368 }
bool ParallelExecStreamScheduler::isInhibited | ( | ExecStreamId | streamId | ) | [inline, private] |
Definition at line 80 of file ParallelExecStreamScheduler.cpp.
References streamStateMap.
Referenced by addToQueue().
00081 { 00082 return streamStateMap[streamId].inhibitionCount > 0; 00083 }
void ParallelExecStreamScheduler::alterNeighborInhibition | ( | ExecStreamId | streamId, | |
int | delta | |||
) | [inline, private] |
Definition at line 56 of file ParallelExecStreamScheduler.cpp.
References ExecStreamId, ExecStreamGraphImpl::getGraphRep(), and streamStateMap.
Referenced by addToQueue(), processCompletedTask(), signalSentinel(), and start().
00058 { 00059 ExecStreamGraphImpl &graphImpl = 00060 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00061 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00062 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00063 boost::out_edges(streamId, graphRep); 00064 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00065 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00066 ExecStreamId consumer = boost::target(edge, graphRep); 00067 streamStateMap[consumer].inhibitionCount += delta; 00068 assert(streamStateMap[consumer].inhibitionCount >= 0); 00069 } 00070 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00071 boost::in_edges(streamId, graphRep); 00072 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00073 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00074 ExecStreamId producer = boost::source(edge, graphRep); 00075 streamStateMap[producer].inhibitionCount += delta; 00076 assert(streamStateMap[producer].inhibitionCount >= 0); 00077 } 00078 }
void ParallelExecStreamScheduler::addGraph | ( | SharedExecStreamGraph | pGraph | ) | [virtual] |
Adds a graph to be scheduled.
Some implementations may require all graphs to be added before scheduler is started; others may allow graphs to be added at any time.
pGraph | the graph to be scheduled |
Reimplemented from ExecStreamScheduler.
Definition at line 85 of file ParallelExecStreamScheduler.cpp.
References ExecStreamScheduler::addGraph(), and pGraph.
00087 { 00088 assert(!pGraph); 00089 00090 ExecStreamScheduler::addGraph(pGraphInit); 00091 pGraph = pGraphInit; 00092 }
void ParallelExecStreamScheduler::removeGraph | ( | SharedExecStreamGraph | pGraph | ) | [virtual] |
Removes a graph currently being scheduled.
Some implementations may disallow graph removal except when scheduler is stopped; others may disallow graph removal altogether.
pGraph | the graph currently being scheduled |
Reimplemented from ExecStreamScheduler.
Definition at line 94 of file ParallelExecStreamScheduler.cpp.
References pGraph, and ExecStreamScheduler::removeGraph().
00096 { 00097 assert(pGraph == pGraphInit); 00098 00099 pGraph.reset(); 00100 ExecStreamScheduler::removeGraph(pGraphInit); 00101 }
void ParallelExecStreamScheduler::start | ( | ) | [virtual] |
Starts this scheduler, preparing it to execute streams.
Implements ExecStreamScheduler.
Definition at line 103 of file ParallelExecStreamScheduler.cpp.
References alterNeighborInhibition(), degreeOfParallelism, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStreamGraphImpl::getStreamFromVertex(), MGR_RUNNING, mgrState, pGraph, pPendingExcn, ExecStreamBufAccessor::requestProduction(), SS_SLEEPING, ThreadPoolBase::start(), streamStateMap, ThreadPool< Task >::submitTask(), threadPool, and TRACE_FINE.
00104 { 00105 FENNEL_TRACE(TRACE_FINE,"start"); 00106 assert(pGraph->isAcyclic()); 00107 pPendingExcn.reset(); 00108 00109 ExecStreamGraphImpl &graphImpl = 00110 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00111 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00112 ExecStreamGraphImpl::VertexIterPair vertices = boost::vertices(graphRep); 00113 while (vertices.first != vertices.second) { 00114 ExecStreamId streamId = *vertices.first; 00115 streamStateMap[streamId].state = SS_SLEEPING; 00116 streamStateMap[streamId].inhibitionCount = 0; 00117 ++vertices.first; 00118 } 00119 00120 vertices = boost::vertices(graphRep); 00121 while (vertices.first != vertices.second) { 00122 ExecStreamId streamId = *vertices.first; 00123 if (!graphImpl.getStreamFromVertex(streamId)) { 00124 // initially inhibit producers until first call to readStream 00125 alterNeighborInhibition(streamId, + 1); 00126 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00127 boost::in_edges(streamId, graphRep); 00128 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00129 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00130 ExecStreamBufAccessor &bufAccessor = 00131 graphImpl.getBufAccessorFromEdge(edge); 00132 bufAccessor.requestProduction(); 00133 } 00134 } 00135 ++vertices.first; 00136 } 00137 00138 // +1 for the manager task, which will tie up one thread 00139 threadPool.start(degreeOfParallelism + 1); 00140 00141 // kick off the manager task 00142 ParallelExecTask managerTask(*this, NULL); 00143 mgrState = MGR_RUNNING; 00144 threadPool.submitTask(managerTask); 00145 }
void ParallelExecStreamScheduler::setRunnable | ( | ExecStream & | stream, | |
bool | ||||
) | [virtual] |
Sets whether that a specific stream should be considered for execution.
stream | the stream to make runnable |
Implements ExecStreamScheduler.
Definition at line 147 of file ParallelExecStreamScheduler.cpp.
void ParallelExecStreamScheduler::makeRunnable | ( | ExecStream & | stream | ) | [virtual] |
Requests that a specific stream be considered for execution.
stream | the stream to make runnable |
Reimplemented from ExecStreamScheduler.
Definition at line 152 of file ParallelExecStreamScheduler.cpp.
void ParallelExecStreamScheduler::abort | ( | ExecStreamGraph & | graph | ) | [virtual] |
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
Returns immediately, not waiting for abort request to be fully processed.
graph | graph to abort; must be one of the graphs associated with this scheduler |
Implements ExecStreamScheduler.
Definition at line 157 of file ParallelExecStreamScheduler.cpp.
References SynchMonitoredObject::condition, SynchMonitoredObject::mutex, pPendingExcn, and TRACE_FINE.
00158 { 00159 StrictMutexGuard mutexGuard(mutex); 00160 FENNEL_TRACE(TRACE_FINE,"abort requested"); 00161 00162 if (!pPendingExcn) { 00163 pPendingExcn.reset(new AbortExcn()); 00164 } 00165 condition.notify_one(); 00166 }
void ParallelExecStreamScheduler::checkAbort | ( | ) | const [virtual] |
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
Reimplemented from ExecStreamScheduler.
Definition at line 168 of file ParallelExecStreamScheduler.cpp.
References pPendingExcn.
00169 { 00170 if (pPendingExcn) { 00171 throw AbortExcn(); 00172 } 00173 }
void ParallelExecStreamScheduler::stop | ( | ) | [virtual] |
Shuts down this scheduler, preventing any further streams from being scheduled.
Implements ExecStreamScheduler.
Definition at line 175 of file ParallelExecStreamScheduler.cpp.
References completedQueue, SynchMonitoredObject::condition, inhibitedQueue, MGR_STOPPED, MGR_STOPPING, mgrState, SynchMonitoredObject::mutex, pPendingExcn, sentinelCondition, ThreadPoolBase::stop(), threadPool, and TRACE_FINE.
00176 { 00177 FENNEL_TRACE(TRACE_FINE,"stop"); 00178 00179 StrictMutexGuard mutexGuard(mutex); 00180 if (mgrState != MGR_STOPPED) { 00181 mgrState = MGR_STOPPING; 00182 condition.notify_one(); 00183 while (mgrState != MGR_STOPPED) { 00184 sentinelCondition.wait(mutexGuard); 00185 } 00186 } 00187 mutexGuard.unlock(); 00188 00189 threadPool.stop(); 00190 00191 // NOTE jvs 10-Aug-2008: This is how we keep the cloned excn 00192 // from becoming a memory leak. It assumes that the caller 00193 // doesn't invoke pScheduler->stop() until *after* the exception 00194 // has been completely handled and is no longer referenced. 00195 pPendingExcn.reset(); 00196 00197 completedQueue.clear(); 00198 inhibitedQueue.clear(); 00199 }
ExecStreamBufAccessor & ParallelExecStreamScheduler::readStream | ( | ExecStream & | stream | ) | [virtual] |
Reads data from a stream, first performing any scheduling necessary to make output available.
stream | the stream from which to read |
Implements ExecStreamScheduler.
Definition at line 258 of file ParallelExecStreamScheduler.cpp.
References completedQueue, SynchMonitoredObject::condition, EXECBUF_EMPTY, EXECBUF_UNDERFLOW, EXECRC_BUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), SynchMonitoredObject::mutex, pPendingExcn, ExecStreamBufAccessor::requestProduction(), sentinelCondition, SS_SLEEPING, streamStateMap, and TRACE_FINE.
00260 { 00261 FENNEL_TRACE( 00262 TRACE_FINE, 00263 "entering readStream " << stream.getName()); 00264 00265 ExecStreamId current = stream.getStreamId(); 00266 ExecStreamGraphImpl &graphImpl = 00267 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00268 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00269 00270 // assert that we're reading from a designated output stream 00271 assert(boost::out_degree(current,graphRep) == 1); 00272 ExecStreamGraphImpl::Edge edge = 00273 *(boost::out_edges(current,graphRep).first); 00274 ExecStreamBufAccessor &bufAccessor = graphImpl.getBufAccessorFromEdge(edge); 00275 current = boost::target(edge, graphRep); 00276 assert(!graphImpl.getStreamFromVertex(current)); 00277 00278 if (bufAccessor.getState() == EXECBUF_EMPTY) { 00279 bufAccessor.requestProduction(); 00280 } else if (bufAccessor.getState() != EXECBUF_UNDERFLOW) { 00281 // data or EOS already available 00282 return bufAccessor; 00283 } 00284 00285 // please sir, I'd like some more 00286 ParallelExecResult result(current, EXECRC_BUF_UNDERFLOW); 00287 StrictMutexGuard mutexGuard(mutex); 00288 streamStateMap[current].state = SS_SLEEPING; 00289 completedQueue.push_back(result); 00290 condition.notify_one(); 00291 00292 while ((streamStateMap[current].state == SS_SLEEPING) && !pPendingExcn) { 00293 sentinelCondition.wait(mutexGuard); 00294 } 00295 00296 if (pPendingExcn) { 00297 pPendingExcn->throwSelf(); 00298 } 00299 00300 return bufAccessor; 00301 }
void ParallelExecStreamScheduler::createBufferProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual] |
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
Default implementation is ScratchBufferExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
embryo | receives new adapter stream |
Reimplemented from ExecStreamScheduler.
Definition at line 201 of file ParallelExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
00203 { 00204 // use double buffering so that producers and consumers can run 00205 // in parallel 00206 DoubleBufferExecStreamParams adapterParams; 00207 embryo.init( 00208 new DoubleBufferExecStream(), 00209 adapterParams); 00210 }
uint ParallelExecStreamScheduler::getDegreeOfParallelism | ( | ) | [virtual] |
Reimplemented from ExecStreamScheduler.
Definition at line 485 of file ParallelExecStreamScheduler.cpp.
References degreeOfParallelism.
00486 { 00487 return degreeOfParallelism; 00488 }
ExecStreamResult ExecStreamScheduler::executeStream | ( | ExecStream & | stream, | |
ExecStreamQuantum const & | quantum | |||
) | [inline, protected, inherited] |
Executes one stream, performing tracing if enabled.
stream | stream to execute | |
quantum | quantum controlling stream execution |
Definition at line 243 of file ExecStreamScheduler.h.
References ExecStream::execute(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), and ExecStreamScheduler::tracingFine.
Referenced by DfsTreeExecStreamScheduler::readStream(), and tryExecuteTask().
00246 { 00247 if (tracingFine) { 00248 tracePreExecution(stream, quantum); 00249 ExecStreamResult rc = stream.execute(quantum); 00250 tracePostExecution(stream, rc); 00251 return rc; 00252 } else { 00253 return stream.execute(quantum); 00254 } 00255 }
void ExecStreamScheduler::tracePreExecution | ( | ExecStream & | stream, | |
ExecStreamQuantum const & | quantum | |||
) | [protected, virtual, inherited] |
Traces before execution of a stream.
stream | stream about to be executed | |
quantum | quantum controlling stream execution |
Definition at line 112 of file ExecStreamScheduler.cpp.
References ExecStream::getName(), ExecStream::getStreamId(), isMAXU(), ExecStreamQuantum::nTuplesMax, TRACE_FINE, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().
Referenced by ExecStreamScheduler::executeStream().
00115 { 00116 FENNEL_TRACE( 00117 TRACE_FINE, 00118 "executing " << stream.getStreamId() << ' ' << stream.getName()); 00119 if (!isMAXU(quantum.nTuplesMax)) { 00120 FENNEL_TRACE( 00121 TRACE_FINE, 00122 "nTuplesMax = " << quantum.nTuplesMax); 00123 } 00124 00125 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST); 00126 }
void ExecStreamScheduler::tracePostExecution | ( | ExecStream & | stream, | |
ExecStreamResult | rc | |||
) | [protected, virtual, inherited] |
Traces after execution of a stream.
stream | stream which was just executed | |
rc | result code returned by stream |
Definition at line 128 of file ExecStreamScheduler.cpp.
References ExecStreamResult_names, ExecStream::getName(), ExecStream::getStreamId(), TRACE_FINE, TRACE_FINER, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().
Referenced by ExecStreamScheduler::executeStream().
00131 { 00132 FENNEL_TRACE( 00133 TRACE_FINE, 00134 "executed " << stream.getStreamId() << ' ' << stream.getName() 00135 << " with result " << ExecStreamResult_names[rc]); 00136 00137 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER); 00138 }
void ExecStreamScheduler::traceStreamBuffers | ( | ExecStream & | stream, | |
TraceLevel | inputTupleTraceLevel, | |||
TraceLevel | outputTupleTraceLevel | |||
) | [protected, virtual, inherited] |
Traces the states of the input and output buffers adjacent to a stream.
stream | stream whose buffers are to be traced | |
inputTupleTraceLevel | trace level at which tuple contents of input buffers are to be traced | |
outputTupleTraceLevel | trace level at which tuple contents of output buffers are to be traced |
Definition at line 140 of file ExecStreamScheduler.cpp.
References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().
Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 }
void ExecStreamScheduler::traceStreamBufferContents | ( | ExecStream & | stream, | |
ExecStreamBufAccessor & | bufAccessor, | |||
TraceLevel | traceLevel | |||
) | [virtual, inherited] |
Traces the contents of a stream buffer.
stream | stream whose buffer is being traced | |
bufAccessor | accessor for stream buffer | |
traceLevel | level at which contents should be traced |
Definition at line 193 of file ExecStreamScheduler.cpp.
References ExecStreamBufAccessor::getConsumptionEnd(), ExecStreamBufAccessor::getConsumptionStart(), TupleAccessor::getCurrentByteCount(), ExecStreamBufAccessor::getScratchTupleAccessor(), ExecStreamBufAccessor::getTupleDesc(), TuplePrinter::print(), TupleAccessor::setCurrentTupleBuf(), TraceSource::trace(), and TupleAccessor::unmarshal().
Referenced by ExecStreamScheduler::traceStreamBuffers().
00197 { 00198 TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc(); 00199 TupleData tupleData(tupleDesc); 00200 TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor(); 00201 00202 for (PConstBuffer pTuple = bufAccessor.getConsumptionStart(); 00203 pTuple != bufAccessor.getConsumptionEnd(); 00204 pTuple += tupleAccessor.getCurrentByteCount()) 00205 { 00206 tupleAccessor.setCurrentTupleBuf(pTuple); 00207 // while we're here, we might as well sanity-check the content 00208 assert(pTuple + tupleAccessor.getCurrentByteCount() 00209 <= bufAccessor.getConsumptionEnd()); 00210 tupleAccessor.unmarshal(tupleData); 00211 // TODO: sanity-check individual data values? 00212 std::ostringstream oss; 00213 TuplePrinter tuplePrinter; 00214 tuplePrinter.print(oss,tupleDesc,tupleData); 00215 stream.trace(traceLevel,oss.str()); 00216 } 00217 }
SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor | ( | ) | [virtual, inherited] |
Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
Definition at line 78 of file ExecStreamScheduler.cpp.
Referenced by ExecStreamGraphImpl::prepare().
00079 { 00080 return SharedExecStreamBufAccessor(new ExecStreamBufAccessor()); 00081 }
void ExecStreamScheduler::createCopyProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual, inherited] |
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
embryo | receives new adapter stream |
Definition at line 92 of file ExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
00094 { 00095 CopyExecStreamParams adapterParams; 00096 embryo.init( 00097 new CopyExecStream(), 00098 adapterParams); 00099 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
friend class ParallelExecTask [friend] |
Definition at line 124 of file ParallelExecStreamScheduler.h.
Definition at line 126 of file ParallelExecStreamScheduler.h.
Referenced by addGraph(), removeGraph(), and start().
Definition at line 128 of file ParallelExecStreamScheduler.h.
Referenced by addToQueue(), ParallelExecStreamScheduler(), start(), and stop().
std::deque<ParallelExecResult> ParallelExecStreamScheduler::completedQueue [private] |
Definition at line 129 of file ParallelExecStreamScheduler.h.
Referenced by readStream(), stop(), tryExecuteManager(), and tryExecuteTask().
Definition at line 131 of file ParallelExecStreamScheduler.h.
Referenced by executeTask(), and ParallelExecStreamScheduler().
Definition at line 133 of file ParallelExecStreamScheduler.h.
Referenced by addToQueue(), alterNeighborInhibition(), isInhibited(), processCompletedTask(), readStream(), retryInhibitedQueue(), signalSentinel(), and start().
Definition at line 134 of file ParallelExecStreamScheduler.h.
Referenced by executeManager(), ParallelExecStreamScheduler(), start(), stop(), and tryExecuteManager().
Definition at line 136 of file ParallelExecStreamScheduler.h.
Referenced by addToQueue(), retryInhibitedQueue(), and stop().
Definition at line 138 of file ParallelExecStreamScheduler.h.
Referenced by executeManager(), readStream(), signalSentinel(), and stop().
Definition at line 140 of file ParallelExecStreamScheduler.h.
Referenced by getDegreeOfParallelism(), ParallelExecStreamScheduler(), and start().
boost::scoped_ptr<FennelExcn> ParallelExecStreamScheduler::pPendingExcn [private] |
Definition at line 142 of file ParallelExecStreamScheduler.h.
Referenced by abort(), addToQueue(), checkAbort(), executeTask(), readStream(), start(), stop(), and tryExecuteManager().
bool ExecStreamScheduler::tracingFine [protected, inherited] |
Definition at line 47 of file ExecStreamScheduler.h.
Referenced by ExecStreamScheduler::addGraph(), ExecStreamScheduler::ExecStreamScheduler(), and ExecStreamScheduler::executeStream().
StrictMutex SynchMonitoredObject::mutex [protected, inherited] |
Definition at line 38 of file SynchMonitoredObject.h.
Referenced by abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), executeManager(), executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), signalSentinel(), ThreadPoolBase::start(), TimerThread::stop(), ThreadPoolBase::stop(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), tryExecuteManager(), tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().
LocalCondition SynchMonitoredObject::condition [protected, inherited] |
Definition at line 39 of file SynchMonitoredObject.h.
Referenced by abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), executeTask(), readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), ThreadPoolBase::stop(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), tryExecuteManager(), tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().