DfsTreeExecStreamScheduler Class Reference

DfsTreeExecStreamScheduler is a reference implementation of the ExecStreamScheduler interface. More...

#include <DfsTreeExecStreamScheduler.h>

Inheritance diagram for DfsTreeExecStreamScheduler:

ExecStreamScheduler TraceSource List of all members.

Public Member Functions

 DfsTreeExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name)
 Constructs a new scheduler.
virtual ~DfsTreeExecStreamScheduler ()
virtual void addGraph (SharedExecStreamGraph pGraph)
 Adds a graph to be scheduled.
virtual void removeGraph (SharedExecStreamGraph pGraph)
 Removes a graph currently being scheduled.
virtual void start ()
 Starts this scheduler, preparing it to execute streams.
virtual void setRunnable (ExecStream &stream, bool)
 Sets whether that a specific stream should be considered for execution.
virtual void abort (ExecStreamGraph &graph)
 Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
virtual void checkAbort () const
 Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
virtual void stop ()
 Shuts down this scheduler, preventing any further streams from being scheduled.
virtual ExecStreamBufAccessorreadStream (ExecStream &stream)
 Reads data from a stream, first performing any scheduling necessary to make output available.
virtual void traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel)
 Traces the contents of a stream buffer.
void makeRunnable (ExecStream &stream)
 Requests that a specific stream be considered for execution.
virtual SharedExecStreamBufAccessor newBufAccessor ()
 Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
virtual void createBufferProvisionAdapter (ExecStreamEmbryo &embryo)
 Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
virtual void createCopyProvisionAdapter (ExecStreamEmbryo &embryo)
 Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
virtual uint getDegreeOfParallelism ()
 
Returns:
the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler

virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()

Protected Member Functions

ExecStreamResult executeStream (ExecStream &stream, ExecStreamQuantum const &quantum)
 Executes one stream, performing tracing if enabled.
virtual void tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum)
 Traces before execution of a stream.
virtual void tracePostExecution (ExecStream &stream, ExecStreamResult rc)
 Traces after execution of a stream.
virtual void traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel)
 Traces the states of the input and output buffers adjacent to a stream.

Protected Attributes

bool tracingFine

Private Member Functions

bool findNextConsumer (ExecStreamGraphImpl &graphImpl, const ExecStreamGraphImpl::GraphRep &graphRep, const ExecStream &stream, ExecStreamGraphImpl::Edge &edge, ExecStreamId &current, ExecStreamBufState skipState)
 Finds the next consumer to execute for a given producer.

Private Attributes

volatile bool aborted
SharedExecStreamGraph pGraph

Detailed Description

DfsTreeExecStreamScheduler is a reference implementation of the ExecStreamScheduler interface.

See SchedulerDesign for more details.

Author:
John V. Sichi
Version:
Id
//open/dev/fennel/exec/DfsTreeExecStreamScheduler.h#16

Definition at line 42 of file DfsTreeExecStreamScheduler.h.


Constructor & Destructor Documentation

DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [explicit]

Constructs a new scheduler.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent, or NULL to disable tracing entirely
name the name to use for tracing this scheduler

Definition at line 33 of file DfsTreeExecStreamScheduler.cpp.

00036     : TraceSource(pTraceTargetInit, nameInit),
00037       ExecStreamScheduler(pTraceTargetInit, nameInit)
00038 {
00039 }

DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler (  )  [virtual]

Definition at line 41 of file DfsTreeExecStreamScheduler.cpp.

00042 {
00043 }


Member Function Documentation

bool DfsTreeExecStreamScheduler::findNextConsumer ( ExecStreamGraphImpl graphImpl,
const ExecStreamGraphImpl::GraphRep graphRep,
const ExecStream stream,
ExecStreamGraphImpl::Edge edge,
ExecStreamId current,
ExecStreamBufState  skipState 
) [private]

Finds the next consumer to execute for a given producer.

Parameters:
graphImpl current stream graph
graphRep graph representation of current stream graph
stream currrent execution stream
edge returns edge to consumer to execute next
current returns id of consumer to execute next
skipState state to skip when looking for next consumer
Returns:
false if reached sink vertex, else true

Definition at line 183 of file DfsTreeExecStreamScheduler.cpp.

References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), and TRACE_FINE.

Referenced by readStream().

00190 {
00191     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00192         boost::out_edges(current,graphRep);
00193 
00194     bool emptyFound = false;
00195     // dummy initializations to avoid compiler error
00196     ExecStreamGraphImpl::Edge emptyEdge = edge;
00197     ExecStreamId emptyStreamId = current;
00198 
00199     for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00200         edge = *(outEdges.first);
00201         current = boost::target(edge,graphRep);
00202         if (boost::out_degree(current,graphRep) == 0) {
00203             // we've hit the output sentinel
00204             assert(!graphImpl.getStreamFromVertex(current));
00205             FENNEL_TRACE(
00206                 TRACE_FINE,
00207                 "leaving readStream " << stream.getName());
00208             return false;
00209         }
00210 
00211         ExecStreamBufAccessor &bufAccessor =
00212             graphImpl.getBufAccessorFromEdge(edge);
00213 
00214         // Save the first edge with an empty state that we find, but don't
00215         // return that as the next consumer.  We want to give priority to
00216         // streams that have explicity requested data.  So, only return the
00217         // empty edge consumer if there are no consumers that have explicitly
00218         // requested data.
00219         if (bufAccessor.getState() == EXECBUF_EMPTY) {
00220             if (!emptyFound) {
00221                 emptyFound = true;
00222                 emptyEdge = edge;
00223                 emptyStreamId = current;
00224             }
00225             continue;
00226         }
00227 
00228         if (bufAccessor.getState() != skipState) {
00229             break;
00230         }
00231         assert(!(skipState == EXECBUF_UNDERFLOW &&
00232                     bufAccessor.getState() == EXECBUF_EOS));
00233     }
00234 
00235     if (outEdges.first == outEdges.second && emptyFound) {
00236         edge = emptyEdge;
00237         current = emptyStreamId;
00238     } else {
00239         assert(!(skipState == EXECBUF_UNDERFLOW &&
00240                     outEdges.first == outEdges.second));
00241     }
00242 
00243     return true;
00244 }

void DfsTreeExecStreamScheduler::addGraph ( SharedExecStreamGraph  pGraph  )  [virtual]

Adds a graph to be scheduled.

Some implementations may require all graphs to be added before scheduler is started; others may allow graphs to be added at any time.

Parameters:
pGraph the graph to be scheduled

Reimplemented from ExecStreamScheduler.

Definition at line 45 of file DfsTreeExecStreamScheduler.cpp.

References ExecStreamScheduler::addGraph(), and pGraph.

00046 {
00047     assert(!pGraph);
00048 
00049     ExecStreamScheduler::addGraph(pGraphInit);
00050     pGraph = pGraphInit;
00051 }

void DfsTreeExecStreamScheduler::removeGraph ( SharedExecStreamGraph  pGraph  )  [virtual]

Removes a graph currently being scheduled.

Some implementations may disallow graph removal except when scheduler is stopped; others may disallow graph removal altogether.

Parameters:
pGraph the graph currently being scheduled

Reimplemented from ExecStreamScheduler.

Definition at line 53 of file DfsTreeExecStreamScheduler.cpp.

References pGraph, and ExecStreamScheduler::removeGraph().

00054 {
00055     assert(pGraph == pGraphInit);
00056 
00057     pGraph.reset();
00058     ExecStreamScheduler::removeGraph(pGraphInit);
00059 }

void DfsTreeExecStreamScheduler::start (  )  [virtual]

Starts this scheduler, preparing it to execute streams.

Implements ExecStreamScheduler.

Definition at line 61 of file DfsTreeExecStreamScheduler.cpp.

References aborted, pGraph, and TRACE_FINE.

00062 {
00063     FENNEL_TRACE(TRACE_FINE,"start");
00064 
00065     // TODO jvs 2-Jan-2006:  rename this class now that it's no longer
00066     // restricted to trees; come up with something more generic in case
00067     // DFS becomes irrelevant also.
00068 
00069     // note: we no longer check that graph is a tree (or forest of trees)
00070     // since it is now possible to have multiple consumers from a single
00071     // producer
00072     assert(pGraph->isAcyclic());
00073     aborted = false;
00074 }

void DfsTreeExecStreamScheduler::setRunnable ( ExecStream stream,
bool   
) [virtual]

Sets whether that a specific stream should be considered for execution.

Parameters:
stream the stream to make runnable

Implements ExecStreamScheduler.

Definition at line 76 of file DfsTreeExecStreamScheduler.cpp.

00077 {
00078     permAssert(false);
00079 }

void DfsTreeExecStreamScheduler::abort ( ExecStreamGraph graph  )  [virtual]

Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.

Returns immediately, not waiting for abort request to be fully processed.

Parameters:
graph graph to abort; must be one of the graphs associated with this scheduler

Implements ExecStreamScheduler.

Definition at line 81 of file DfsTreeExecStreamScheduler.cpp.

References aborted, and TRACE_FINE.

00082 {
00083     FENNEL_TRACE(TRACE_FINE,"abort requested");
00084 
00085     aborted = true;
00086 }

void DfsTreeExecStreamScheduler::checkAbort (  )  const [virtual]

Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.

Reimplemented from ExecStreamScheduler.

Definition at line 88 of file DfsTreeExecStreamScheduler.cpp.

References aborted, and TRACE_FINE.

Referenced by readStream().

00089 {
00090     if (aborted) {
00091         FENNEL_TRACE(TRACE_FINE,"abort detected");
00092         throw AbortExcn();
00093     }
00094 }

void DfsTreeExecStreamScheduler::stop (  )  [virtual]

Shuts down this scheduler, preventing any further streams from being scheduled.

Implements ExecStreamScheduler.

Definition at line 96 of file DfsTreeExecStreamScheduler.cpp.

References aborted, and TRACE_FINE.

00097 {
00098     FENNEL_TRACE(TRACE_FINE,"stop");
00099 
00100     // nothing to do
00101     aborted = false;
00102 }

ExecStreamBufAccessor & DfsTreeExecStreamScheduler::readStream ( ExecStream stream  )  [virtual]

Reads data from a stream, first performing any scheduling necessary to make output available.

Parameters:
stream the stream from which to read
Returns:
accessor for output data buffer

Implements ExecStreamScheduler.

Definition at line 104 of file DfsTreeExecStreamScheduler.cpp.

References checkAbort(), EXECBUF_EOS, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamScheduler::executeStream(), findNextConsumer(), ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), and TRACE_FINE.

00106 {
00107     FENNEL_TRACE(
00108         TRACE_FINE,
00109         "entering readStream " << stream.getName());
00110 
00111     ExecStreamId current = stream.getStreamId();
00112     ExecStreamQuantum quantum;
00113 
00114     ExecStreamGraphImpl &graphImpl =
00115         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00116     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00117 
00118     // assert that we're reading from a designated output stream
00119     assert(boost::out_degree(current,graphRep) == 1);
00120     assert(!graphImpl.getStreamFromVertex(
00121                boost::target(
00122                    *(boost::out_edges(current,graphRep).first),
00123                    graphRep)));
00124 
00125     // TODO:  assertions about accessor state/provision
00126 
00127     for (;;) {
00128         ExecStreamGraphImpl::InEdgeIterPair inEdges =
00129             boost::in_edges(current,graphRep);
00130         for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00131             ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00132             ExecStreamBufAccessor &bufAccessor =
00133                 graphImpl.getBufAccessorFromEdge(edge);
00134             if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00135                 // move current upstream
00136                 current = boost::source(edge,graphRep);
00137                 break;
00138             }
00139         }
00140         if (inEdges.first != inEdges.second) {
00141             // hit EXECBUF_UNDERFLOW
00142             continue;
00143         }
00144 
00145         SharedExecStream pStream = graphImpl.getStreamFromVertex(current);
00146         ExecStreamResult rc = executeStream(*pStream, quantum);
00147 
00148         checkAbort();
00149 
00150         ExecStreamGraphImpl::Edge edge;
00151 
00152         switch (rc) {
00153         case EXECRC_EOS:
00154             // find a consumer that is not in EOS state
00155             if (!findNextConsumer(
00156                 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS))
00157             {
00158                 return graphImpl.getBufAccessorFromEdge(edge);
00159             }
00160             // if all were in eos, just use the last consumer
00161             break;
00162         case EXECRC_BUF_OVERFLOW:
00163             // find a consumer that is not in underflow state; i.e., not
00164             // waiting on this producer to continue execution
00165             if (!findNextConsumer(
00166                 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW))
00167             {
00168                 return graphImpl.getBufAccessorFromEdge(edge);
00169             }
00170             break;
00171         case EXECRC_BUF_UNDERFLOW:
00172             // TODO:  assert that at least one input is in state
00173             // EXECBUF_UNDERFLOW
00174             break;
00175         case EXECRC_QUANTUM_EXPIRED:
00176             break;
00177         default:
00178             permAssert(false);
00179         }
00180     }
00181 }

ExecStreamResult ExecStreamScheduler::executeStream ( ExecStream stream,
ExecStreamQuantum const &  quantum 
) [inline, protected, inherited]

Executes one stream, performing tracing if enabled.

Parameters:
stream stream to execute
quantum quantum controlling stream execution
Returns:
result of executing stream

Definition at line 243 of file ExecStreamScheduler.h.

References ExecStream::execute(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), and ExecStreamScheduler::tracingFine.

Referenced by readStream(), and ParallelExecStreamScheduler::tryExecuteTask().

00246 {
00247     if (tracingFine) {
00248         tracePreExecution(stream, quantum);
00249         ExecStreamResult rc = stream.execute(quantum);
00250         tracePostExecution(stream, rc);
00251         return rc;
00252     } else {
00253         return stream.execute(quantum);
00254     }
00255 }

void ExecStreamScheduler::tracePreExecution ( ExecStream stream,
ExecStreamQuantum const &  quantum 
) [protected, virtual, inherited]

Traces before execution of a stream.

Parameters:
stream stream about to be executed
quantum quantum controlling stream execution

Definition at line 112 of file ExecStreamScheduler.cpp.

References ExecStream::getName(), ExecStream::getStreamId(), isMAXU(), ExecStreamQuantum::nTuplesMax, TRACE_FINE, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().

Referenced by ExecStreamScheduler::executeStream().

00115 {
00116     FENNEL_TRACE(
00117         TRACE_FINE,
00118         "executing " << stream.getStreamId() << ' ' << stream.getName());
00119     if (!isMAXU(quantum.nTuplesMax)) {
00120         FENNEL_TRACE(
00121             TRACE_FINE,
00122             "nTuplesMax = " << quantum.nTuplesMax);
00123     }
00124 
00125     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST);
00126 }

void ExecStreamScheduler::tracePostExecution ( ExecStream stream,
ExecStreamResult  rc 
) [protected, virtual, inherited]

Traces after execution of a stream.

Parameters:
stream stream which was just executed
rc result code returned by stream

Definition at line 128 of file ExecStreamScheduler.cpp.

References ExecStreamResult_names, ExecStream::getName(), ExecStream::getStreamId(), TRACE_FINE, TRACE_FINER, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().

Referenced by ExecStreamScheduler::executeStream().

00131 {
00132     FENNEL_TRACE(
00133         TRACE_FINE,
00134         "executed " << stream.getStreamId() << ' ' << stream.getName()
00135         << " with result " << ExecStreamResult_names[rc]);
00136 
00137     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER);
00138 }

void ExecStreamScheduler::traceStreamBuffers ( ExecStream stream,
TraceLevel  inputTupleTraceLevel,
TraceLevel  outputTupleTraceLevel 
) [protected, virtual, inherited]

Traces the states of the input and output buffers adjacent to a stream.

Parameters:
stream stream whose buffers are to be traced
inputTupleTraceLevel trace level at which tuple contents of input buffers are to be traced
outputTupleTraceLevel trace level at which tuple contents of output buffers are to be traced

Definition at line 140 of file ExecStreamScheduler.cpp.

References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().

Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00144 {
00145     ExecStreamGraphImpl &graphImpl =
00146         dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph());
00147     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00148 
00149     ExecStreamGraphImpl::InEdgeIterPair inEdges =
00150         boost::in_edges(stream.getStreamId(),graphRep);
00151     for (uint i = 0; inEdges.first != inEdges.second;
00152          ++(inEdges.first),  ++i)
00153     {
00154         ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00155         ExecStreamBufAccessor &bufAccessor =
00156             graphImpl.getBufAccessorFromEdge(edge);
00157         FENNEL_TRACE(
00158             TRACE_FINER,
00159             "input buffer " << i << ":  "
00160             << ExecStreamBufState_names[bufAccessor.getState()]
00161             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00162             << ",  consumption available = "
00163             << bufAccessor.getConsumptionAvailable());
00164         if (stream.isTracingLevel(inputTupleTraceLevel)) {
00165             traceStreamBufferContents(
00166                 stream, bufAccessor, inputTupleTraceLevel);
00167         }
00168     }
00169 
00170     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00171         boost::out_edges(stream.getStreamId(),graphRep);
00172     for (uint i = 0; outEdges.first != outEdges.second;
00173          ++(outEdges.first),  ++i) {
00174         ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00175         ExecStreamBufAccessor &bufAccessor =
00176             graphImpl.getBufAccessorFromEdge(edge);
00177         FENNEL_TRACE(
00178             TRACE_FINER,
00179             "output buffer " << i << ":  "
00180             << ExecStreamBufState_names[bufAccessor.getState()]
00181             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00182             << ",  consumption available = "
00183             << bufAccessor.getConsumptionAvailable()
00184             << ",  production available = "
00185             << bufAccessor.getProductionAvailable());
00186         if (stream.isTracingLevel(outputTupleTraceLevel)) {
00187             traceStreamBufferContents(
00188                 stream, bufAccessor, outputTupleTraceLevel);
00189         }
00190     }
00191 }

void ExecStreamScheduler::traceStreamBufferContents ( ExecStream stream,
ExecStreamBufAccessor bufAccessor,
TraceLevel  traceLevel 
) [virtual, inherited]

Traces the contents of a stream buffer.

Parameters:
stream stream whose buffer is being traced
bufAccessor accessor for stream buffer
traceLevel level at which contents should be traced

Definition at line 193 of file ExecStreamScheduler.cpp.

References ExecStreamBufAccessor::getConsumptionEnd(), ExecStreamBufAccessor::getConsumptionStart(), TupleAccessor::getCurrentByteCount(), ExecStreamBufAccessor::getScratchTupleAccessor(), ExecStreamBufAccessor::getTupleDesc(), TuplePrinter::print(), TupleAccessor::setCurrentTupleBuf(), TraceSource::trace(), and TupleAccessor::unmarshal().

Referenced by ExecStreamScheduler::traceStreamBuffers().

00197 {
00198     TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc();
00199     TupleData tupleData(tupleDesc);
00200     TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor();
00201 
00202     for (PConstBuffer pTuple = bufAccessor.getConsumptionStart();
00203          pTuple != bufAccessor.getConsumptionEnd();
00204          pTuple += tupleAccessor.getCurrentByteCount())
00205     {
00206         tupleAccessor.setCurrentTupleBuf(pTuple);
00207         // while we're here, we might as well sanity-check the content
00208         assert(pTuple + tupleAccessor.getCurrentByteCount()
00209             <= bufAccessor.getConsumptionEnd());
00210         tupleAccessor.unmarshal(tupleData);
00211         // TODO:  sanity-check individual data values?
00212         std::ostringstream oss;
00213         TuplePrinter tuplePrinter;
00214         tuplePrinter.print(oss,tupleDesc,tupleData);
00215         stream.trace(traceLevel,oss.str());
00216     }
00217 }

void ExecStreamScheduler::makeRunnable ( ExecStream stream  )  [inline, inherited]

Requests that a specific stream be considered for execution.

Parameters:
stream the stream to make runnable
Deprecated:
use setRunnable

Reimplemented in ParallelExecStreamScheduler.

Definition at line 257 of file ExecStreamScheduler.h.

References ExecStreamScheduler::setRunnable().

00259 {
00260     setRunnable(stream, true);
00261 }

SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor (  )  [virtual, inherited]

Creates a new ExecStreamBufAccessor suitable for use with this scheduler.

Returns:
new buffer accessor

Definition at line 78 of file ExecStreamScheduler.cpp.

Referenced by ExecStreamGraphImpl::prepare().

00079 {
00080     return SharedExecStreamBufAccessor(new ExecStreamBufAccessor());
00081 }

void ExecStreamScheduler::createBufferProvisionAdapter ( ExecStreamEmbryo embryo  )  [virtual, inherited]

Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.

Default implementation is ScratchBufferExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:
embryo receives new adapter stream

Reimplemented in ParallelExecStreamScheduler.

Definition at line 83 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

00085 {
00086     ScratchBufferExecStreamParams adapterParams;
00087     embryo.init(
00088         new ScratchBufferExecStream(),
00089         adapterParams);
00090 }

void ExecStreamScheduler::createCopyProvisionAdapter ( ExecStreamEmbryo embryo  )  [virtual, inherited]

Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.

Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:
embryo receives new adapter stream

Definition at line 92 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

00094 {
00095     CopyExecStreamParams adapterParams;
00096     embryo.init(
00097         new CopyExecStream(),
00098         adapterParams);
00099 }

uint ExecStreamScheduler::getDegreeOfParallelism (  )  [virtual, inherited]

Returns:
the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler

Reimplemented in ParallelExecStreamScheduler.

Definition at line 223 of file ExecStreamScheduler.cpp.

Referenced by MergeExecStream::open().

00224 {
00225     return 1;
00226 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }


Member Data Documentation

volatile bool DfsTreeExecStreamScheduler::aborted [private]

Definition at line 45 of file DfsTreeExecStreamScheduler.h.

Referenced by abort(), checkAbort(), start(), and stop().

SharedExecStreamGraph DfsTreeExecStreamScheduler::pGraph [private]

Definition at line 47 of file DfsTreeExecStreamScheduler.h.

Referenced by addGraph(), removeGraph(), and start().

bool ExecStreamScheduler::tracingFine [protected, inherited]

Definition at line 47 of file ExecStreamScheduler.h.

Referenced by ExecStreamScheduler::addGraph(), ExecStreamScheduler::ExecStreamScheduler(), and ExecStreamScheduler::executeStream().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:30 2009 for Fennel by  doxygen 1.5.1