ParallelExecStreamScheduler Class Reference

ParallelExecStreamScheduler is a parallel implementation of the ExecStreamScheduler interface. More...

#include <ParallelExecStreamScheduler.h>

Inheritance diagram for ParallelExecStreamScheduler:

ExecStreamScheduler SynchMonitoredObject TraceSource List of all members.

Public Member Functions

 ParallelExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name, ThreadTracker &threadTracker, uint degreeOfParallelism)
 Constructs a new scheduler.
virtual ~ParallelExecStreamScheduler ()
virtual void addGraph (SharedExecStreamGraph pGraph)
 Adds a graph to be scheduled.
virtual void removeGraph (SharedExecStreamGraph pGraph)
 Removes a graph currently being scheduled.
virtual void start ()
 Starts this scheduler, preparing it to execute streams.
virtual void setRunnable (ExecStream &stream, bool)
 Sets whether that a specific stream should be considered for execution.
virtual void makeRunnable (ExecStream &stream)
 Requests that a specific stream be considered for execution.
virtual void abort (ExecStreamGraph &graph)
 Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
virtual void checkAbort () const
 Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
virtual void stop ()
 Shuts down this scheduler, preventing any further streams from being scheduled.
virtual ExecStreamBufAccessorreadStream (ExecStream &stream)
 Reads data from a stream, first performing any scheduling necessary to make output available.
virtual void createBufferProvisionAdapter (ExecStreamEmbryo &embryo)
 Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
virtual uint getDegreeOfParallelism ()
 
Returns:
the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler

virtual void traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel)
 Traces the contents of a stream buffer.
virtual SharedExecStreamBufAccessor newBufAccessor ()
 Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
virtual void createCopyProvisionAdapter (ExecStreamEmbryo &embryo)
 Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()

Protected Member Functions

ExecStreamResult executeStream (ExecStream &stream, ExecStreamQuantum const &quantum)
 Executes one stream, performing tracing if enabled.
virtual void tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum)
 Traces before execution of a stream.
virtual void tracePostExecution (ExecStream &stream, ExecStreamResult rc)
 Traces after execution of a stream.
virtual void traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel)
 Traces the states of the input and output buffers adjacent to a stream.

Protected Attributes

bool tracingFine
StrictMutex mutex
LocalCondition condition

Private Types

enum  StreamState { SS_SLEEPING, SS_RUNNING, SS_INHIBITED }
enum  ManagerState { MGR_RUNNING, MGR_STOPPING, MGR_STOPPED }
typedef std::hash_map< ExecStreamId,
StreamStateMapEntry
StreamStateMap
typedef std::deque< ExecStreamIdInhibitedQueue

Private Member Functions

void tryExecuteManager ()
void executeManager ()
void tryExecuteTask (ExecStream &)
void executeTask (ExecStream &)
bool addToQueue (ExecStreamId streamId)
void signalSentinel (ExecStreamId sentinelId)
void retryInhibitedQueue ()
void processCompletedTask (ParallelExecResult const &task)
bool isInhibited (ExecStreamId streamId)
void alterNeighborInhibition (ExecStreamId streamId, int delta)

Private Attributes

SharedExecStreamGraph pGraph
ThreadPool< ParallelExecTaskthreadPool
std::deque< ParallelExecResultcompletedQueue
ThreadTrackerthreadTracker
StreamStateMap streamStateMap
ManagerState mgrState
InhibitedQueue inhibitedQueue
InhibitedQueue transitQueue
LocalCondition sentinelCondition
uint degreeOfParallelism
boost::scoped_ptr< FennelExcnpPendingExcn

Friends

class ParallelExecTask

Classes

struct  StreamStateMapEntry

Detailed Description

ParallelExecStreamScheduler is a parallel implementation of the ExecStreamScheduler interface.

For more information, see the design doc in Eigenpedia.

Author:
John Sichi
Version:
Id
//open/dev/fennel/exec/ParallelExecStreamScheduler.h#12

Definition at line 98 of file ParallelExecStreamScheduler.h.


Member Typedef Documentation

typedef std::hash_map<ExecStreamId, StreamStateMapEntry> ParallelExecStreamScheduler::StreamStateMap [private]

Definition at line 121 of file ParallelExecStreamScheduler.h.

typedef std::deque<ExecStreamId> ParallelExecStreamScheduler::InhibitedQueue [private]

Definition at line 122 of file ParallelExecStreamScheduler.h.


Member Enumeration Documentation

enum ParallelExecStreamScheduler::StreamState [private]

Enumerator:
SS_SLEEPING 
SS_RUNNING 
SS_INHIBITED 

Definition at line 101 of file ParallelExecStreamScheduler.h.

00102     {
00103         SS_SLEEPING,
00104         SS_RUNNING,
00105         SS_INHIBITED
00106     };

enum ParallelExecStreamScheduler::ManagerState [private]

Enumerator:
MGR_RUNNING 
MGR_STOPPING 
MGR_STOPPED 

Definition at line 114 of file ParallelExecStreamScheduler.h.

00114                       {
00115         MGR_RUNNING,
00116         MGR_STOPPING,
00117         MGR_STOPPED
00118     };


Constructor & Destructor Documentation

ParallelExecStreamScheduler::ParallelExecStreamScheduler ( SharedTraceTarget  pTraceTarget,
std::string  name,
ThreadTracker threadTracker,
uint  degreeOfParallelism 
) [explicit]

Constructs a new scheduler.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent, or NULL to disable tracing entirely
name the name to use for tracing this scheduler
threadTracker tracker to use for threads created during parallelization
degreeOfParallelism number of threads to run

Definition at line 37 of file ParallelExecStreamScheduler.cpp.

References degreeOfParallelism, MGR_STOPPED, mgrState, ThreadPoolBase::setThreadTracker(), threadPool, and threadTracker.

00042     : TraceSource(pTraceTarget, name),
00043       ExecStreamScheduler(pTraceTarget, name),
00044       threadTracker(threadTrackerInit)
00045 {
00046     degreeOfParallelism = degreeOfParallelismInit;
00047     assert(degreeOfParallelism > 0);
00048     threadPool.setThreadTracker(threadTracker);
00049     mgrState = MGR_STOPPED;
00050 }

ParallelExecStreamScheduler::~ParallelExecStreamScheduler (  )  [virtual]

Definition at line 52 of file ParallelExecStreamScheduler.cpp.

00053 {
00054 }


Member Function Documentation

void ParallelExecStreamScheduler::tryExecuteManager (  )  [private]

Definition at line 228 of file ParallelExecStreamScheduler.cpp.

References completedQueue, SynchMonitoredObject::condition, MGR_RUNNING, mgrState, SynchMonitoredObject::mutex, pPendingExcn, processCompletedTask(), and TRACE_FINE.

Referenced by executeManager().

00229 {
00230     FENNEL_TRACE(TRACE_FINE,"manager task starting");
00231     for (;;) {
00232         StrictMutexGuard mutexGuard(mutex);
00233         while (completedQueue.empty() && (mgrState == MGR_RUNNING)
00234             && !pPendingExcn)
00235         {
00236             condition.wait(mutexGuard);
00237         }
00238         if (pPendingExcn) {
00239             return;
00240         }
00241         if (mgrState != MGR_RUNNING) {
00242             return;
00243         }
00244         while (!completedQueue.empty()) {
00245             ParallelExecResult result = completedQueue.front();
00246             completedQueue.pop_front();
00247             // don't hold lock while doing expensive state maintenance
00248             mutexGuard.unlock();
00249             processCompletedTask(result);
00250             if (pPendingExcn) {
00251                 return;
00252             }
00253             mutexGuard.lock();
00254         }
00255     }
00256 }

void ParallelExecStreamScheduler::executeManager (  )  [private]

Definition at line 212 of file ParallelExecStreamScheduler.cpp.

References MGR_STOPPED, mgrState, SynchMonitoredObject::mutex, sentinelCondition, and tryExecuteManager().

Referenced by ParallelExecTask::execute().

00213 {
00214     // TODO jvs 16-Aug-2008:  RAII
00215     try {
00216         tryExecuteManager();
00217     } catch (...) {
00218         StrictMutexGuard mutexGuard(mutex);
00219         mgrState = MGR_STOPPED;
00220         sentinelCondition.notify_all();
00221         throw;
00222     }
00223     StrictMutexGuard mutexGuard(mutex);
00224     mgrState = MGR_STOPPED;
00225     sentinelCondition.notify_all();
00226 }

void ParallelExecStreamScheduler::tryExecuteTask ( ExecStream  )  [private]

Definition at line 399 of file ParallelExecStreamScheduler.cpp.

References completedQueue, SynchMonitoredObject::condition, ExecStreamScheduler::executeStream(), ExecStream::getStreamId(), and SynchMonitoredObject::mutex.

Referenced by executeTask().

00400 {
00401     ExecStreamQuantum quantum;
00402     ExecStreamResult rc = executeStream(stream, quantum);
00403     ParallelExecResult result(stream.getStreamId(), rc);
00404 
00405     StrictMutexGuard mutexGuard(mutex);
00406     completedQueue.push_back(result);
00407     condition.notify_one();
00408 }

void ParallelExecStreamScheduler::executeTask ( ExecStream  )  [private]

Definition at line 379 of file ParallelExecStreamScheduler.cpp.

References ThreadTracker::cloneExcn(), SynchMonitoredObject::condition, SynchMonitoredObject::mutex, pPendingExcn, threadTracker, and tryExecuteTask().

Referenced by ParallelExecTask::execute().

00380 {
00381     try {
00382         tryExecuteTask(stream);
00383     } catch (std::exception &ex) {
00384         StrictMutexGuard mutexGuard(mutex);
00385         if (!pPendingExcn) {
00386             pPendingExcn.reset(threadTracker.cloneExcn(ex));
00387         }
00388         condition.notify_one();
00389     } catch (...) {
00390         // REVIEW jvs 22-Jul-2008:  panic instead?
00391         StrictMutexGuard mutexGuard(mutex);
00392         if (!pPendingExcn) {
00393             pPendingExcn.reset(new FennelExcn("Unknown error"));
00394         }
00395         condition.notify_one();
00396     }
00397 }

bool ParallelExecStreamScheduler::addToQueue ( ExecStreamId  streamId  )  [private]

Definition at line 424 of file ParallelExecStreamScheduler.cpp.

References alterNeighborInhibition(), ExecStreamGraphImpl::getStreamFromVertex(), inhibitedQueue, isInhibited(), pPendingExcn, SS_INHIBITED, SS_RUNNING, SS_SLEEPING, streamStateMap, ThreadPool< Task >::submitTask(), and threadPool.

Referenced by processCompletedTask(), and retryInhibitedQueue().

00425 {
00426     if (pPendingExcn) {
00427         return false;
00428     }
00429     switch (streamStateMap[streamId].state) {
00430     case SS_SLEEPING:
00431         {
00432             ExecStreamGraphImpl &graphImpl =
00433                 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00434             SharedExecStream pStream = graphImpl.getStreamFromVertex(streamId);
00435             if (!pStream) {
00436                 // sentinel node
00437                 return true;
00438             }
00439             if (isInhibited(streamId)) {
00440                 streamStateMap[streamId].state = SS_INHIBITED;
00441                 inhibitedQueue.push_back(streamId);
00442             } else {
00443                 streamStateMap[streamId].state = SS_RUNNING;
00444                 alterNeighborInhibition(streamId, + 1);
00445                 ParallelExecTask task(*this, pStream.get());
00446                 threadPool.submitTask(task);
00447             }
00448         }
00449         break;
00450     case SS_INHIBITED:
00451     case SS_RUNNING:
00452         // ignore request
00453         break;
00454     default:
00455         permAssert(false);
00456     }
00457     return false;
00458 }

void ParallelExecStreamScheduler::signalSentinel ( ExecStreamId  sentinelId  )  [private]

Definition at line 370 of file ParallelExecStreamScheduler.cpp.

References alterNeighborInhibition(), SynchMonitoredObject::mutex, sentinelCondition, SS_RUNNING, and streamStateMap.

Referenced by processCompletedTask().

00371 {
00372     alterNeighborInhibition(sentinelId, + 1);
00373 
00374     StrictMutexGuard mutexGuard(mutex);
00375     streamStateMap[sentinelId].state = SS_RUNNING;
00376     sentinelCondition.notify_all();
00377 }

void ParallelExecStreamScheduler::retryInhibitedQueue (  )  [private]

Definition at line 410 of file ParallelExecStreamScheduler.cpp.

References addToQueue(), ExecStreamId, inhibitedQueue, SS_SLEEPING, streamStateMap, and transitQueue.

Referenced by processCompletedTask().

00411 {
00412     // addToQueue may bounce some back to inhibitedQueue,
00413     // so process via transitQueue
00414     transitQueue = inhibitedQueue;
00415     inhibitedQueue.clear();
00416     while (!transitQueue.empty()) {
00417         ExecStreamId inhibitedStreamId = transitQueue.front();
00418         transitQueue.pop_front();
00419         streamStateMap[inhibitedStreamId].state = SS_SLEEPING;
00420         addToQueue(inhibitedStreamId);
00421     }
00422 }

void ParallelExecStreamScheduler::processCompletedTask ( ParallelExecResult const &  task  )  [private]

Definition at line 303 of file ParallelExecStreamScheduler.cpp.

References addToQueue(), alterNeighborInhibition(), EXECBUF_EMPTY, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ParallelExecResult::getResultCode(), ExecStreamBufAccessor::getState(), ParallelExecResult::getStreamId(), retryInhibitedQueue(), signalSentinel(), SS_SLEEPING, and streamStateMap.

Referenced by tryExecuteManager().

00305 {
00306     ExecStreamId current = result.getStreamId();
00307     ExecStreamGraphImpl &graphImpl =
00308         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00309     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00310 
00311     streamStateMap[current].state = SS_SLEEPING;
00312     alterNeighborInhibition(current, -1);
00313 
00314     switch (result.getResultCode()) {
00315     case EXECRC_EOS:
00316     case EXECRC_BUF_OVERFLOW:
00317     case EXECRC_BUF_UNDERFLOW:
00318         {
00319             ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00320                 boost::out_edges(current, graphRep);
00321             for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00322                 ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00323                 ExecStreamBufAccessor &bufAccessor =
00324                     graphImpl.getBufAccessorFromEdge(edge);
00325                 if (bufAccessor.getState() != EXECBUF_UNDERFLOW) {
00326                     ExecStreamId consumer = boost::target(edge, graphRep);
00327                     bool sentinel = addToQueue(consumer);
00328                     if (sentinel) {
00329                         if (bufAccessor.getState() != EXECBUF_EMPTY) {
00330                             signalSentinel(consumer);
00331                         }
00332                     }
00333                 }
00334             }
00335             ExecStreamGraphImpl::InEdgeIterPair inEdges =
00336                 boost::in_edges(current, graphRep);
00337             bool sawUnderflow = false;
00338             for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00339                 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00340                 ExecStreamBufAccessor &bufAccessor =
00341                     graphImpl.getBufAccessorFromEdge(edge);
00342                 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00343                     ExecStreamId producer = boost::source(edge, graphRep);
00344                     addToQueue(producer);
00345                     sawUnderflow = true;
00346                 }
00347             }
00348             if (!sawUnderflow &&
00349                 (result.getResultCode() == EXECRC_BUF_UNDERFLOW))
00350             {
00351                 // sometimes, a stream may return underflow, even though none
00352                 // of its inputs are in underflow state; instead, some are in
00353                 // EOS; we interpret this as a yield request to help keep
00354                 // the stream's state machine logic simpler
00355                 addToQueue(current);
00356             }
00357         }
00358         break;
00359     case EXECRC_QUANTUM_EXPIRED:
00360         addToQueue(current);
00361         break;
00362     default:
00363         permAssert(false);
00364     }
00365 
00366     // REVIEW jvs 4-Jul-2008:  only reschedule inhibited neighbors?
00367     retryInhibitedQueue();
00368 }

bool ParallelExecStreamScheduler::isInhibited ( ExecStreamId  streamId  )  [inline, private]

Definition at line 80 of file ParallelExecStreamScheduler.cpp.

References streamStateMap.

Referenced by addToQueue().

00081 {
00082     return streamStateMap[streamId].inhibitionCount > 0;
00083 }

void ParallelExecStreamScheduler::alterNeighborInhibition ( ExecStreamId  streamId,
int  delta 
) [inline, private]

Definition at line 56 of file ParallelExecStreamScheduler.cpp.

References ExecStreamId, ExecStreamGraphImpl::getGraphRep(), and streamStateMap.

Referenced by addToQueue(), processCompletedTask(), signalSentinel(), and start().

00058 {
00059     ExecStreamGraphImpl &graphImpl =
00060         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00061     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00062     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00063         boost::out_edges(streamId, graphRep);
00064     for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00065         ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00066         ExecStreamId consumer = boost::target(edge, graphRep);
00067         streamStateMap[consumer].inhibitionCount += delta;
00068         assert(streamStateMap[consumer].inhibitionCount >= 0);
00069     }
00070     ExecStreamGraphImpl::InEdgeIterPair inEdges =
00071         boost::in_edges(streamId, graphRep);
00072     for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00073         ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00074         ExecStreamId producer = boost::source(edge, graphRep);
00075         streamStateMap[producer].inhibitionCount += delta;
00076         assert(streamStateMap[producer].inhibitionCount >= 0);
00077     }
00078 }

void ParallelExecStreamScheduler::addGraph ( SharedExecStreamGraph  pGraph  )  [virtual]

Adds a graph to be scheduled.

Some implementations may require all graphs to be added before scheduler is started; others may allow graphs to be added at any time.

Parameters:
pGraph the graph to be scheduled

Reimplemented from ExecStreamScheduler.

Definition at line 85 of file ParallelExecStreamScheduler.cpp.

References ExecStreamScheduler::addGraph(), and pGraph.

00087 {
00088     assert(!pGraph);
00089 
00090     ExecStreamScheduler::addGraph(pGraphInit);
00091     pGraph = pGraphInit;
00092 }

void ParallelExecStreamScheduler::removeGraph ( SharedExecStreamGraph  pGraph  )  [virtual]

Removes a graph currently being scheduled.

Some implementations may disallow graph removal except when scheduler is stopped; others may disallow graph removal altogether.

Parameters:
pGraph the graph currently being scheduled

Reimplemented from ExecStreamScheduler.

Definition at line 94 of file ParallelExecStreamScheduler.cpp.

References pGraph, and ExecStreamScheduler::removeGraph().

00096 {
00097     assert(pGraph == pGraphInit);
00098 
00099     pGraph.reset();
00100     ExecStreamScheduler::removeGraph(pGraphInit);
00101 }

void ParallelExecStreamScheduler::start (  )  [virtual]

Starts this scheduler, preparing it to execute streams.

Implements ExecStreamScheduler.

Definition at line 103 of file ParallelExecStreamScheduler.cpp.

References alterNeighborInhibition(), degreeOfParallelism, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStreamGraphImpl::getStreamFromVertex(), MGR_RUNNING, mgrState, pGraph, pPendingExcn, ExecStreamBufAccessor::requestProduction(), SS_SLEEPING, ThreadPoolBase::start(), streamStateMap, ThreadPool< Task >::submitTask(), threadPool, and TRACE_FINE.

00104 {
00105     FENNEL_TRACE(TRACE_FINE,"start");
00106     assert(pGraph->isAcyclic());
00107     pPendingExcn.reset();
00108 
00109     ExecStreamGraphImpl &graphImpl =
00110         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00111     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00112     ExecStreamGraphImpl::VertexIterPair vertices = boost::vertices(graphRep);
00113     while (vertices.first != vertices.second) {
00114         ExecStreamId streamId = *vertices.first;
00115         streamStateMap[streamId].state = SS_SLEEPING;
00116         streamStateMap[streamId].inhibitionCount = 0;
00117         ++vertices.first;
00118     }
00119 
00120     vertices = boost::vertices(graphRep);
00121     while (vertices.first != vertices.second) {
00122         ExecStreamId streamId = *vertices.first;
00123         if (!graphImpl.getStreamFromVertex(streamId)) {
00124             // initially inhibit producers until first call to readStream
00125             alterNeighborInhibition(streamId, + 1);
00126             ExecStreamGraphImpl::InEdgeIterPair inEdges =
00127                 boost::in_edges(streamId, graphRep);
00128             for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00129                 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00130                 ExecStreamBufAccessor &bufAccessor =
00131                     graphImpl.getBufAccessorFromEdge(edge);
00132                 bufAccessor.requestProduction();
00133             }
00134         }
00135         ++vertices.first;
00136     }
00137 
00138     // +1 for the manager task, which will tie up one thread
00139     threadPool.start(degreeOfParallelism + 1);
00140 
00141     // kick off the manager task
00142     ParallelExecTask managerTask(*this, NULL);
00143     mgrState = MGR_RUNNING;
00144     threadPool.submitTask(managerTask);
00145 }

void ParallelExecStreamScheduler::setRunnable ( ExecStream stream,
bool   
) [virtual]

Sets whether that a specific stream should be considered for execution.

Parameters:
stream the stream to make runnable

Implements ExecStreamScheduler.

Definition at line 147 of file ParallelExecStreamScheduler.cpp.

00148 {
00149     permAssert(false);
00150 }

void ParallelExecStreamScheduler::makeRunnable ( ExecStream stream  )  [virtual]

Requests that a specific stream be considered for execution.

Parameters:
stream the stream to make runnable
Deprecated:
use setRunnable

Reimplemented from ExecStreamScheduler.

Definition at line 152 of file ParallelExecStreamScheduler.cpp.

00153 {
00154     permAssert(false);
00155 }

void ParallelExecStreamScheduler::abort ( ExecStreamGraph graph  )  [virtual]

Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.

Returns immediately, not waiting for abort request to be fully processed.

Parameters:
graph graph to abort; must be one of the graphs associated with this scheduler

Implements ExecStreamScheduler.

Definition at line 157 of file ParallelExecStreamScheduler.cpp.

References SynchMonitoredObject::condition, SynchMonitoredObject::mutex, pPendingExcn, and TRACE_FINE.

00158 {
00159     StrictMutexGuard mutexGuard(mutex);
00160     FENNEL_TRACE(TRACE_FINE,"abort requested");
00161 
00162     if (!pPendingExcn) {
00163         pPendingExcn.reset(new AbortExcn());
00164     }
00165     condition.notify_one();
00166 }

void ParallelExecStreamScheduler::checkAbort (  )  const [virtual]

Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.

Reimplemented from ExecStreamScheduler.

Definition at line 168 of file ParallelExecStreamScheduler.cpp.

References pPendingExcn.

00169 {
00170     if (pPendingExcn) {
00171         throw AbortExcn();
00172     }
00173 }

void ParallelExecStreamScheduler::stop (  )  [virtual]

Shuts down this scheduler, preventing any further streams from being scheduled.

Implements ExecStreamScheduler.

Definition at line 175 of file ParallelExecStreamScheduler.cpp.

References completedQueue, SynchMonitoredObject::condition, inhibitedQueue, MGR_STOPPED, MGR_STOPPING, mgrState, SynchMonitoredObject::mutex, pPendingExcn, sentinelCondition, ThreadPoolBase::stop(), threadPool, and TRACE_FINE.

00176 {
00177     FENNEL_TRACE(TRACE_FINE,"stop");
00178 
00179     StrictMutexGuard mutexGuard(mutex);
00180     if (mgrState != MGR_STOPPED) {
00181         mgrState = MGR_STOPPING;
00182         condition.notify_one();
00183         while (mgrState != MGR_STOPPED) {
00184             sentinelCondition.wait(mutexGuard);
00185         }
00186     }
00187     mutexGuard.unlock();
00188 
00189     threadPool.stop();
00190 
00191     // NOTE jvs 10-Aug-2008:  This is how we keep the cloned excn
00192     // from becoming a memory leak.  It assumes that the caller
00193     // doesn't invoke pScheduler->stop() until *after* the exception
00194     // has been completely handled and is no longer referenced.
00195     pPendingExcn.reset();
00196 
00197     completedQueue.clear();
00198     inhibitedQueue.clear();
00199 }

ExecStreamBufAccessor & ParallelExecStreamScheduler::readStream ( ExecStream stream  )  [virtual]

Reads data from a stream, first performing any scheduling necessary to make output available.

Parameters:
stream the stream from which to read
Returns:
accessor for output data buffer

Implements ExecStreamScheduler.

Definition at line 258 of file ParallelExecStreamScheduler.cpp.

References completedQueue, SynchMonitoredObject::condition, EXECBUF_EMPTY, EXECBUF_UNDERFLOW, EXECRC_BUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), SynchMonitoredObject::mutex, pPendingExcn, ExecStreamBufAccessor::requestProduction(), sentinelCondition, SS_SLEEPING, streamStateMap, and TRACE_FINE.

00260 {
00261     FENNEL_TRACE(
00262         TRACE_FINE,
00263         "entering readStream " << stream.getName());
00264 
00265     ExecStreamId current = stream.getStreamId();
00266     ExecStreamGraphImpl &graphImpl =
00267         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00268     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00269 
00270     // assert that we're reading from a designated output stream
00271     assert(boost::out_degree(current,graphRep) == 1);
00272     ExecStreamGraphImpl::Edge edge =
00273         *(boost::out_edges(current,graphRep).first);
00274     ExecStreamBufAccessor &bufAccessor = graphImpl.getBufAccessorFromEdge(edge);
00275     current = boost::target(edge, graphRep);
00276     assert(!graphImpl.getStreamFromVertex(current));
00277 
00278     if (bufAccessor.getState() == EXECBUF_EMPTY) {
00279         bufAccessor.requestProduction();
00280     } else if (bufAccessor.getState() != EXECBUF_UNDERFLOW) {
00281         // data or EOS already available
00282         return bufAccessor;
00283     }
00284 
00285     // please sir, I'd like some more
00286     ParallelExecResult result(current, EXECRC_BUF_UNDERFLOW);
00287     StrictMutexGuard mutexGuard(mutex);
00288     streamStateMap[current].state = SS_SLEEPING;
00289     completedQueue.push_back(result);
00290     condition.notify_one();
00291 
00292     while ((streamStateMap[current].state == SS_SLEEPING) && !pPendingExcn) {
00293         sentinelCondition.wait(mutexGuard);
00294     }
00295 
00296     if (pPendingExcn) {
00297         pPendingExcn->throwSelf();
00298     }
00299 
00300     return bufAccessor;
00301 }

void ParallelExecStreamScheduler::createBufferProvisionAdapter ( ExecStreamEmbryo embryo  )  [virtual]

Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.

Default implementation is ScratchBufferExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:
embryo receives new adapter stream

Reimplemented from ExecStreamScheduler.

Definition at line 201 of file ParallelExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

00203 {
00204     // use double buffering so that producers and consumers can run
00205     // in parallel
00206     DoubleBufferExecStreamParams adapterParams;
00207     embryo.init(
00208         new DoubleBufferExecStream(),
00209         adapterParams);
00210 }

uint ParallelExecStreamScheduler::getDegreeOfParallelism (  )  [virtual]

Returns:
the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler

Reimplemented from ExecStreamScheduler.

Definition at line 485 of file ParallelExecStreamScheduler.cpp.

References degreeOfParallelism.

00486 {
00487     return degreeOfParallelism;
00488 }

ExecStreamResult ExecStreamScheduler::executeStream ( ExecStream stream,
ExecStreamQuantum const &  quantum 
) [inline, protected, inherited]

Executes one stream, performing tracing if enabled.

Parameters:
stream stream to execute
quantum quantum controlling stream execution
Returns:
result of executing stream

Definition at line 243 of file ExecStreamScheduler.h.

References ExecStream::execute(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), and ExecStreamScheduler::tracingFine.

Referenced by DfsTreeExecStreamScheduler::readStream(), and tryExecuteTask().

00246 {
00247     if (tracingFine) {
00248         tracePreExecution(stream, quantum);
00249         ExecStreamResult rc = stream.execute(quantum);
00250         tracePostExecution(stream, rc);
00251         return rc;
00252     } else {
00253         return stream.execute(quantum);
00254     }
00255 }

void ExecStreamScheduler::tracePreExecution ( ExecStream stream,
ExecStreamQuantum const &  quantum 
) [protected, virtual, inherited]

Traces before execution of a stream.

Parameters:
stream stream about to be executed
quantum quantum controlling stream execution

Definition at line 112 of file ExecStreamScheduler.cpp.

References ExecStream::getName(), ExecStream::getStreamId(), isMAXU(), ExecStreamQuantum::nTuplesMax, TRACE_FINE, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().

Referenced by ExecStreamScheduler::executeStream().

00115 {
00116     FENNEL_TRACE(
00117         TRACE_FINE,
00118         "executing " << stream.getStreamId() << ' ' << stream.getName());
00119     if (!isMAXU(quantum.nTuplesMax)) {
00120         FENNEL_TRACE(
00121             TRACE_FINE,
00122             "nTuplesMax = " << quantum.nTuplesMax);
00123     }
00124 
00125     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST);
00126 }

void ExecStreamScheduler::tracePostExecution ( ExecStream stream,
ExecStreamResult  rc 
) [protected, virtual, inherited]

Traces after execution of a stream.

Parameters:
stream stream which was just executed
rc result code returned by stream

Definition at line 128 of file ExecStreamScheduler.cpp.

References ExecStreamResult_names, ExecStream::getName(), ExecStream::getStreamId(), TRACE_FINE, TRACE_FINER, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().

Referenced by ExecStreamScheduler::executeStream().

00131 {
00132     FENNEL_TRACE(
00133         TRACE_FINE,
00134         "executed " << stream.getStreamId() << ' ' << stream.getName()
00135         << " with result " << ExecStreamResult_names[rc]);
00136 
00137     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER);
00138 }

void ExecStreamScheduler::traceStreamBuffers ( ExecStream stream,
TraceLevel  inputTupleTraceLevel,
TraceLevel  outputTupleTraceLevel 
) [protected, virtual, inherited]

Traces the states of the input and output buffers adjacent to a stream.

Parameters:
stream stream whose buffers are to be traced
inputTupleTraceLevel trace level at which tuple contents of input buffers are to be traced
outputTupleTraceLevel trace level at which tuple contents of output buffers are to be traced

Definition at line 140 of file ExecStreamScheduler.cpp.

References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().

Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00144 {
00145     ExecStreamGraphImpl &graphImpl =
00146         dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph());
00147     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00148 
00149     ExecStreamGraphImpl::InEdgeIterPair inEdges =
00150         boost::in_edges(stream.getStreamId(),graphRep);
00151     for (uint i = 0; inEdges.first != inEdges.second;
00152          ++(inEdges.first),  ++i)
00153     {
00154         ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00155         ExecStreamBufAccessor &bufAccessor =
00156             graphImpl.getBufAccessorFromEdge(edge);
00157         FENNEL_TRACE(
00158             TRACE_FINER,
00159             "input buffer " << i << ":  "
00160             << ExecStreamBufState_names[bufAccessor.getState()]
00161             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00162             << ",  consumption available = "
00163             << bufAccessor.getConsumptionAvailable());
00164         if (stream.isTracingLevel(inputTupleTraceLevel)) {
00165             traceStreamBufferContents(
00166                 stream, bufAccessor, inputTupleTraceLevel);
00167         }
00168     }
00169 
00170     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00171         boost::out_edges(stream.getStreamId(),graphRep);
00172     for (uint i = 0; outEdges.first != outEdges.second;
00173          ++(outEdges.first),  ++i) {
00174         ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00175         ExecStreamBufAccessor &bufAccessor =
00176             graphImpl.getBufAccessorFromEdge(edge);
00177         FENNEL_TRACE(
00178             TRACE_FINER,
00179             "output buffer " << i << ":  "
00180             << ExecStreamBufState_names[bufAccessor.getState()]
00181             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00182             << ",  consumption available = "
00183             << bufAccessor.getConsumptionAvailable()
00184             << ",  production available = "
00185             << bufAccessor.getProductionAvailable());
00186         if (stream.isTracingLevel(outputTupleTraceLevel)) {
00187             traceStreamBufferContents(
00188                 stream, bufAccessor, outputTupleTraceLevel);
00189         }
00190     }
00191 }

void ExecStreamScheduler::traceStreamBufferContents ( ExecStream stream,
ExecStreamBufAccessor bufAccessor,
TraceLevel  traceLevel 
) [virtual, inherited]

Traces the contents of a stream buffer.

Parameters:
stream stream whose buffer is being traced
bufAccessor accessor for stream buffer
traceLevel level at which contents should be traced

Definition at line 193 of file ExecStreamScheduler.cpp.

References ExecStreamBufAccessor::getConsumptionEnd(), ExecStreamBufAccessor::getConsumptionStart(), TupleAccessor::getCurrentByteCount(), ExecStreamBufAccessor::getScratchTupleAccessor(), ExecStreamBufAccessor::getTupleDesc(), TuplePrinter::print(), TupleAccessor::setCurrentTupleBuf(), TraceSource::trace(), and TupleAccessor::unmarshal().

Referenced by ExecStreamScheduler::traceStreamBuffers().

00197 {
00198     TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc();
00199     TupleData tupleData(tupleDesc);
00200     TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor();
00201 
00202     for (PConstBuffer pTuple = bufAccessor.getConsumptionStart();
00203          pTuple != bufAccessor.getConsumptionEnd();
00204          pTuple += tupleAccessor.getCurrentByteCount())
00205     {
00206         tupleAccessor.setCurrentTupleBuf(pTuple);
00207         // while we're here, we might as well sanity-check the content
00208         assert(pTuple + tupleAccessor.getCurrentByteCount()
00209             <= bufAccessor.getConsumptionEnd());
00210         tupleAccessor.unmarshal(tupleData);
00211         // TODO:  sanity-check individual data values?
00212         std::ostringstream oss;
00213         TuplePrinter tuplePrinter;
00214         tuplePrinter.print(oss,tupleDesc,tupleData);
00215         stream.trace(traceLevel,oss.str());
00216     }
00217 }

SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor (  )  [virtual, inherited]

Creates a new ExecStreamBufAccessor suitable for use with this scheduler.

Returns:
new buffer accessor

Definition at line 78 of file ExecStreamScheduler.cpp.

Referenced by ExecStreamGraphImpl::prepare().

00079 {
00080     return SharedExecStreamBufAccessor(new ExecStreamBufAccessor());
00081 }

void ExecStreamScheduler::createCopyProvisionAdapter ( ExecStreamEmbryo embryo  )  [virtual, inherited]

Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.

Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:
embryo receives new adapter stream

Definition at line 92 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

00094 {
00095     CopyExecStreamParams adapterParams;
00096     embryo.init(
00097         new CopyExecStream(),
00098         adapterParams);
00099 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }


Friends And Related Function Documentation

friend class ParallelExecTask [friend]

Definition at line 124 of file ParallelExecStreamScheduler.h.


Member Data Documentation

SharedExecStreamGraph ParallelExecStreamScheduler::pGraph [private]

Definition at line 126 of file ParallelExecStreamScheduler.h.

Referenced by addGraph(), removeGraph(), and start().

ThreadPool<ParallelExecTask> ParallelExecStreamScheduler::threadPool [private]

Definition at line 128 of file ParallelExecStreamScheduler.h.

Referenced by addToQueue(), ParallelExecStreamScheduler(), start(), and stop().

std::deque<ParallelExecResult> ParallelExecStreamScheduler::completedQueue [private]

Definition at line 129 of file ParallelExecStreamScheduler.h.

Referenced by readStream(), stop(), tryExecuteManager(), and tryExecuteTask().

ThreadTracker& ParallelExecStreamScheduler::threadTracker [private]

Definition at line 131 of file ParallelExecStreamScheduler.h.

Referenced by executeTask(), and ParallelExecStreamScheduler().

StreamStateMap ParallelExecStreamScheduler::streamStateMap [private]

Definition at line 133 of file ParallelExecStreamScheduler.h.

Referenced by addToQueue(), alterNeighborInhibition(), isInhibited(), processCompletedTask(), readStream(), retryInhibitedQueue(), signalSentinel(), and start().

ManagerState ParallelExecStreamScheduler::mgrState [private]

Definition at line 134 of file ParallelExecStreamScheduler.h.

Referenced by executeManager(), ParallelExecStreamScheduler(), start(), stop(), and tryExecuteManager().

InhibitedQueue ParallelExecStreamScheduler::inhibitedQueue [private]

Definition at line 136 of file ParallelExecStreamScheduler.h.

Referenced by addToQueue(), retryInhibitedQueue(), and stop().

InhibitedQueue ParallelExecStreamScheduler::transitQueue [private]

Definition at line 137 of file ParallelExecStreamScheduler.h.

Referenced by retryInhibitedQueue().

LocalCondition ParallelExecStreamScheduler::sentinelCondition [private]

Definition at line 138 of file ParallelExecStreamScheduler.h.

Referenced by executeManager(), readStream(), signalSentinel(), and stop().

uint ParallelExecStreamScheduler::degreeOfParallelism [private]

Definition at line 140 of file ParallelExecStreamScheduler.h.

Referenced by getDegreeOfParallelism(), ParallelExecStreamScheduler(), and start().

boost::scoped_ptr<FennelExcn> ParallelExecStreamScheduler::pPendingExcn [private]

Definition at line 142 of file ParallelExecStreamScheduler.h.

Referenced by abort(), addToQueue(), checkAbort(), executeTask(), readStream(), start(), stop(), and tryExecuteManager().

bool ExecStreamScheduler::tracingFine [protected, inherited]

Definition at line 47 of file ExecStreamScheduler.h.

Referenced by ExecStreamScheduler::addGraph(), ExecStreamScheduler::ExecStreamScheduler(), and ExecStreamScheduler::executeStream().

StrictMutex SynchMonitoredObject::mutex [protected, inherited]

Definition at line 38 of file SynchMonitoredObject.h.

Referenced by abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), executeManager(), executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), signalSentinel(), ThreadPoolBase::start(), TimerThread::stop(), ThreadPoolBase::stop(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), tryExecuteManager(), tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().

LocalCondition SynchMonitoredObject::condition [protected, inherited]

Definition at line 39 of file SynchMonitoredObject.h.

Referenced by abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), executeTask(), readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), ThreadPoolBase::stop(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), tryExecuteManager(), tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:40 2009 for Fennel by  doxygen 1.5.1