ExecStreamScheduler Class Reference

ExecStreamScheduler defines an abstract base for controlling the scheduling of execution streams. More...

#include <ExecStreamScheduler.h>

Inheritance diagram for ExecStreamScheduler:

TraceSource DfsTreeExecStreamScheduler ParallelExecStreamScheduler List of all members.

Public Member Functions

virtual ~ExecStreamScheduler ()
virtual void traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel)
 Traces the contents of a stream buffer.
virtual void addGraph (SharedExecStreamGraph pGraph)
 Adds a graph to be scheduled.
virtual void removeGraph (SharedExecStreamGraph pGraph)
 Removes a graph currently being scheduled.
virtual void start ()=0
 Starts this scheduler, preparing it to execute streams.
void makeRunnable (ExecStream &stream)
 Requests that a specific stream be considered for execution.
virtual void setRunnable (ExecStream &stream, bool runnable)=0
 Sets whether that a specific stream should be considered for execution.
virtual void abort (ExecStreamGraph &graph)=0
 Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
virtual void checkAbort () const
 Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
virtual void stop ()=0
 Shuts down this scheduler, preventing any further streams from being scheduled.
virtual SharedExecStreamBufAccessor newBufAccessor ()
 Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
virtual void createBufferProvisionAdapter (ExecStreamEmbryo &embryo)
 Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
virtual void createCopyProvisionAdapter (ExecStreamEmbryo &embryo)
 Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
virtual ExecStreamBufAccessorreadStream (ExecStream &stream)=0
 Reads data from a stream, first performing any scheduling necessary to make output available.
virtual uint getDegreeOfParallelism ()
 
Returns:
the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler

virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()

Protected Member Functions

 ExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name)
 Constructs a new ExecStreamScheduler.
ExecStreamResult executeStream (ExecStream &stream, ExecStreamQuantum const &quantum)
 Executes one stream, performing tracing if enabled.
virtual void tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum)
 Traces before execution of a stream.
virtual void tracePostExecution (ExecStream &stream, ExecStreamResult rc)
 Traces after execution of a stream.
virtual void traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel)
 Traces the states of the input and output buffers adjacent to a stream.

Protected Attributes

bool tracingFine

Detailed Description

ExecStreamScheduler defines an abstract base for controlling the scheduling of execution streams.

A scheduler determines which execution streams to run and in what order. For more information, see SchedulerDesign.

Author:
John V. Sichi
Version:
Id
//open/dev/fennel/exec/ExecStreamScheduler.h#17

Definition at line 42 of file ExecStreamScheduler.h.


Constructor & Destructor Documentation

ExecStreamScheduler::ExecStreamScheduler ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [explicit, protected]

Constructs a new ExecStreamScheduler.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent, or NULL to disable tracing entirely
name the name to use for tracing this scheduler

Definition at line 38 of file ExecStreamScheduler.cpp.

References TraceSource::isTracingLevel(), TRACE_FINE, and tracingFine.

00041     : TraceSource(pTraceTargetInit, nameInit)
00042 {
00043     tracingFine = isTracingLevel(TRACE_FINE);
00044 }

ExecStreamScheduler::~ExecStreamScheduler (  )  [virtual]

Definition at line 46 of file ExecStreamScheduler.cpp.

00047 {
00048 }


Member Function Documentation

ExecStreamResult ExecStreamScheduler::executeStream ( ExecStream stream,
ExecStreamQuantum const &  quantum 
) [inline, protected]

Executes one stream, performing tracing if enabled.

Parameters:
stream stream to execute
quantum quantum controlling stream execution
Returns:
result of executing stream

Definition at line 243 of file ExecStreamScheduler.h.

References ExecStream::execute(), tracePostExecution(), tracePreExecution(), and tracingFine.

Referenced by DfsTreeExecStreamScheduler::readStream(), and ParallelExecStreamScheduler::tryExecuteTask().

00246 {
00247     if (tracingFine) {
00248         tracePreExecution(stream, quantum);
00249         ExecStreamResult rc = stream.execute(quantum);
00250         tracePostExecution(stream, rc);
00251         return rc;
00252     } else {
00253         return stream.execute(quantum);
00254     }
00255 }

void ExecStreamScheduler::tracePreExecution ( ExecStream stream,
ExecStreamQuantum const &  quantum 
) [protected, virtual]

Traces before execution of a stream.

Parameters:
stream stream about to be executed
quantum quantum controlling stream execution

Definition at line 112 of file ExecStreamScheduler.cpp.

References ExecStream::getName(), ExecStream::getStreamId(), isMAXU(), ExecStreamQuantum::nTuplesMax, TRACE_FINE, TRACE_FINEST, and traceStreamBuffers().

Referenced by executeStream().

00115 {
00116     FENNEL_TRACE(
00117         TRACE_FINE,
00118         "executing " << stream.getStreamId() << ' ' << stream.getName());
00119     if (!isMAXU(quantum.nTuplesMax)) {
00120         FENNEL_TRACE(
00121             TRACE_FINE,
00122             "nTuplesMax = " << quantum.nTuplesMax);
00123     }
00124 
00125     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST);
00126 }

void ExecStreamScheduler::tracePostExecution ( ExecStream stream,
ExecStreamResult  rc 
) [protected, virtual]

Traces after execution of a stream.

Parameters:
stream stream which was just executed
rc result code returned by stream

Definition at line 128 of file ExecStreamScheduler.cpp.

References ExecStreamResult_names, ExecStream::getName(), ExecStream::getStreamId(), TRACE_FINE, TRACE_FINER, TRACE_FINEST, and traceStreamBuffers().

Referenced by executeStream().

00131 {
00132     FENNEL_TRACE(
00133         TRACE_FINE,
00134         "executed " << stream.getStreamId() << ' ' << stream.getName()
00135         << " with result " << ExecStreamResult_names[rc]);
00136 
00137     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER);
00138 }

void ExecStreamScheduler::traceStreamBuffers ( ExecStream stream,
TraceLevel  inputTupleTraceLevel,
TraceLevel  outputTupleTraceLevel 
) [protected, virtual]

Traces the states of the input and output buffers adjacent to a stream.

Parameters:
stream stream whose buffers are to be traced
inputTupleTraceLevel trace level at which tuple contents of input buffers are to be traced
outputTupleTraceLevel trace level at which tuple contents of output buffers are to be traced

Definition at line 140 of file ExecStreamScheduler.cpp.

References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and traceStreamBufferContents().

Referenced by tracePostExecution(), and tracePreExecution().

00144 {
00145     ExecStreamGraphImpl &graphImpl =
00146         dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph());
00147     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00148 
00149     ExecStreamGraphImpl::InEdgeIterPair inEdges =
00150         boost::in_edges(stream.getStreamId(),graphRep);
00151     for (uint i = 0; inEdges.first != inEdges.second;
00152          ++(inEdges.first),  ++i)
00153     {
00154         ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00155         ExecStreamBufAccessor &bufAccessor =
00156             graphImpl.getBufAccessorFromEdge(edge);
00157         FENNEL_TRACE(
00158             TRACE_FINER,
00159             "input buffer " << i << ":  "
00160             << ExecStreamBufState_names[bufAccessor.getState()]
00161             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00162             << ",  consumption available = "
00163             << bufAccessor.getConsumptionAvailable());
00164         if (stream.isTracingLevel(inputTupleTraceLevel)) {
00165             traceStreamBufferContents(
00166                 stream, bufAccessor, inputTupleTraceLevel);
00167         }
00168     }
00169 
00170     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00171         boost::out_edges(stream.getStreamId(),graphRep);
00172     for (uint i = 0; outEdges.first != outEdges.second;
00173          ++(outEdges.first),  ++i) {
00174         ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00175         ExecStreamBufAccessor &bufAccessor =
00176             graphImpl.getBufAccessorFromEdge(edge);
00177         FENNEL_TRACE(
00178             TRACE_FINER,
00179             "output buffer " << i << ":  "
00180             << ExecStreamBufState_names[bufAccessor.getState()]
00181             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00182             << ",  consumption available = "
00183             << bufAccessor.getConsumptionAvailable()
00184             << ",  production available = "
00185             << bufAccessor.getProductionAvailable());
00186         if (stream.isTracingLevel(outputTupleTraceLevel)) {
00187             traceStreamBufferContents(
00188                 stream, bufAccessor, outputTupleTraceLevel);
00189         }
00190     }
00191 }

void ExecStreamScheduler::traceStreamBufferContents ( ExecStream stream,
ExecStreamBufAccessor bufAccessor,
TraceLevel  traceLevel 
) [virtual]

Traces the contents of a stream buffer.

Parameters:
stream stream whose buffer is being traced
bufAccessor accessor for stream buffer
traceLevel level at which contents should be traced

Definition at line 193 of file ExecStreamScheduler.cpp.

References ExecStreamBufAccessor::getConsumptionEnd(), ExecStreamBufAccessor::getConsumptionStart(), TupleAccessor::getCurrentByteCount(), ExecStreamBufAccessor::getScratchTupleAccessor(), ExecStreamBufAccessor::getTupleDesc(), TuplePrinter::print(), TupleAccessor::setCurrentTupleBuf(), TraceSource::trace(), and TupleAccessor::unmarshal().

Referenced by traceStreamBuffers().

00197 {
00198     TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc();
00199     TupleData tupleData(tupleDesc);
00200     TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor();
00201 
00202     for (PConstBuffer pTuple = bufAccessor.getConsumptionStart();
00203          pTuple != bufAccessor.getConsumptionEnd();
00204          pTuple += tupleAccessor.getCurrentByteCount())
00205     {
00206         tupleAccessor.setCurrentTupleBuf(pTuple);
00207         // while we're here, we might as well sanity-check the content
00208         assert(pTuple + tupleAccessor.getCurrentByteCount()
00209             <= bufAccessor.getConsumptionEnd());
00210         tupleAccessor.unmarshal(tupleData);
00211         // TODO:  sanity-check individual data values?
00212         std::ostringstream oss;
00213         TuplePrinter tuplePrinter;
00214         tuplePrinter.print(oss,tupleDesc,tupleData);
00215         stream.trace(traceLevel,oss.str());
00216     }
00217 }

void ExecStreamScheduler::addGraph ( SharedExecStreamGraph  pGraph  )  [virtual]

Adds a graph to be scheduled.

Some implementations may require all graphs to be added before scheduler is started; others may allow graphs to be added at any time.

Parameters:
pGraph the graph to be scheduled

Reimplemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

Definition at line 50 of file ExecStreamScheduler.cpp.

References TraceSource::isTracingLevel(), TRACE_FINE, and tracingFine.

Referenced by ParallelExecStreamScheduler::addGraph(), and DfsTreeExecStreamScheduler::addGraph().

00051 {
00052     assert(!pGraph->pScheduler);
00053     pGraph->pScheduler = this;
00054 
00055     if (tracingFine) {
00056         std::string dotFileName;
00057         const char *fennelHome = getenv("FENNEL_HOME");
00058         if (fennelHome) {
00059             dotFileName += fennelHome;
00060             dotFileName += "/trace/";
00061         }
00062         dotFileName += "ExecStreamGraph.dot";
00063         std::ofstream dotStream(dotFileName.c_str());
00064         pGraph->renderGraphviz(dotStream);
00065     }
00066 
00067     // if any of the streams in the new graph require tracing, then
00068     // disable our tracing short-circuit
00069     std::vector<SharedExecStream> streams = pGraph->getSortedStreams();
00070     for (uint i = 0; i < streams.size(); ++i) {
00071         if (streams[i]->isTracingLevel(TRACE_FINE)) {
00072             tracingFine = true;
00073             return;
00074         }
00075     }
00076 }

void ExecStreamScheduler::removeGraph ( SharedExecStreamGraph  pGraph  )  [virtual]

Removes a graph currently being scheduled.

Some implementations may disallow graph removal except when scheduler is stopped; others may disallow graph removal altogether.

Parameters:
pGraph the graph currently being scheduled

Reimplemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

Definition at line 101 of file ExecStreamScheduler.cpp.

Referenced by ParallelExecStreamScheduler::removeGraph(), and DfsTreeExecStreamScheduler::removeGraph().

00102 {
00103     assert(pGraph->pScheduler == this);
00104     pGraph->pScheduler = NULL;
00105 }

virtual void ExecStreamScheduler::start (  )  [pure virtual]

Starts this scheduler, preparing it to execute streams.

Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

void ExecStreamScheduler::makeRunnable ( ExecStream stream  )  [inline]

Requests that a specific stream be considered for execution.

Parameters:
stream the stream to make runnable
Deprecated:
use setRunnable

Reimplemented in ParallelExecStreamScheduler.

Definition at line 257 of file ExecStreamScheduler.h.

References setRunnable().

00259 {
00260     setRunnable(stream, true);
00261 }

virtual void ExecStreamScheduler::setRunnable ( ExecStream stream,
bool  runnable 
) [pure virtual]

Sets whether that a specific stream should be considered for execution.

Parameters:
stream the stream to make runnable

Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

Referenced by CorrelationJoinExecStream::execute(), makeRunnable(), and CorrelationJoinExecStream::open().

virtual void ExecStreamScheduler::abort ( ExecStreamGraph graph  )  [pure virtual]

Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.

Returns immediately, not waiting for abort request to be fully processed.

Parameters:
graph graph to abort; must be one of the graphs associated with this scheduler

Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

void ExecStreamScheduler::checkAbort (  )  const [virtual]

Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.

Reimplemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

Definition at line 219 of file ExecStreamScheduler.cpp.

Referenced by ExecStream::checkAbort().

00220 {
00221 }

virtual void ExecStreamScheduler::stop (  )  [pure virtual]

Shuts down this scheduler, preventing any further streams from being scheduled.

Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor (  )  [virtual]

Creates a new ExecStreamBufAccessor suitable for use with this scheduler.

Returns:
new buffer accessor

Definition at line 78 of file ExecStreamScheduler.cpp.

Referenced by ExecStreamGraphImpl::prepare().

00079 {
00080     return SharedExecStreamBufAccessor(new ExecStreamBufAccessor());
00081 }

void ExecStreamScheduler::createBufferProvisionAdapter ( ExecStreamEmbryo embryo  )  [virtual]

Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.

Default implementation is ScratchBufferExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:
embryo receives new adapter stream

Reimplemented in ParallelExecStreamScheduler.

Definition at line 83 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

00085 {
00086     ScratchBufferExecStreamParams adapterParams;
00087     embryo.init(
00088         new ScratchBufferExecStream(),
00089         adapterParams);
00090 }

void ExecStreamScheduler::createCopyProvisionAdapter ( ExecStreamEmbryo embryo  )  [virtual]

Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.

Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:
embryo receives new adapter stream

Definition at line 92 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

00094 {
00095     CopyExecStreamParams adapterParams;
00096     embryo.init(
00097         new CopyExecStream(),
00098         adapterParams);
00099 }

virtual ExecStreamBufAccessor& ExecStreamScheduler::readStream ( ExecStream stream  )  [pure virtual]

Reads data from a stream, first performing any scheduling necessary to make output available.

Parameters:
stream the stream from which to read
Returns:
accessor for output data buffer

Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.

Referenced by Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch().

uint ExecStreamScheduler::getDegreeOfParallelism (  )  [virtual]

Returns:
the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler

Reimplemented in ParallelExecStreamScheduler.

Definition at line 223 of file ExecStreamScheduler.cpp.

Referenced by MergeExecStream::open().

00224 {
00225     return 1;
00226 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }


Member Data Documentation

bool ExecStreamScheduler::tracingFine [protected]

Definition at line 47 of file ExecStreamScheduler.h.

Referenced by addGraph(), ExecStreamScheduler(), and executeStream().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:30 2009 for Fennel by  doxygen 1.5.1