#include <DfsTreeExecStreamScheduler.h>
Inheritance diagram for DfsTreeExecStreamScheduler:
Public Member Functions | |
DfsTreeExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name) | |
Constructs a new scheduler. | |
virtual | ~DfsTreeExecStreamScheduler () |
virtual void | addGraph (SharedExecStreamGraph pGraph) |
Adds a graph to be scheduled. | |
virtual void | removeGraph (SharedExecStreamGraph pGraph) |
Removes a graph currently being scheduled. | |
virtual void | start () |
Starts this scheduler, preparing it to execute streams. | |
virtual void | setRunnable (ExecStream &stream, bool) |
Sets whether that a specific stream should be considered for execution. | |
virtual void | abort (ExecStreamGraph &graph) |
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn. | |
virtual void | stop () |
Shuts down this scheduler, preventing any further streams from being scheduled. | |
virtual ExecStreamBufAccessor & | readStream (ExecStream &stream) |
Reads data from a stream, first performing any scheduling necessary to make output available. | |
virtual void | traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel) |
Traces the contents of a stream buffer. | |
void | makeRunnable (ExecStream &stream) |
Requests that a specific stream be considered for execution. | |
virtual SharedExecStreamBufAccessor | newBufAccessor () |
Creates a new ExecStreamBufAccessor suitable for use with this scheduler. | |
virtual void | createBufferProvisionAdapter (ExecStreamEmbryo &embryo) |
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER. | |
virtual void | createCopyProvisionAdapter (ExecStreamEmbryo &embryo) |
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER. | |
virtual uint | getDegreeOfParallelism () |
| |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
Protected Member Functions | |
ExecStreamResult | executeStream (ExecStream &stream, ExecStreamQuantum const &quantum) |
Executes one stream, performing tracing if enabled. | |
virtual void | tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum) |
Traces before execution of a stream. | |
virtual void | tracePostExecution (ExecStream &stream, ExecStreamResult rc) |
Traces after execution of a stream. | |
virtual void | traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel) |
Traces the states of the input and output buffers adjacent to a stream. | |
Protected Attributes | |
bool | tracingFine |
Private Member Functions | |
bool | findNextConsumer (ExecStreamGraphImpl &graphImpl, const ExecStreamGraphImpl::GraphRep &graphRep, const ExecStream &stream, ExecStreamGraphImpl::Edge &edge, ExecStreamId ¤t, ExecStreamBufState skipState) |
Finds the next consumer to execute for a given producer. | |
Private Attributes | |
volatile bool | aborted |
SharedExecStreamGraph | pGraph |
See SchedulerDesign for more details.
Definition at line 42 of file DfsTreeExecStreamScheduler.h.
DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [explicit] |
Constructs a new scheduler.
pTraceTarget | the TraceTarget to which messages will be sent, or NULL to disable tracing entirely | |
name | the name to use for tracing this scheduler |
Definition at line 33 of file DfsTreeExecStreamScheduler.cpp.
00036 : TraceSource(pTraceTargetInit, nameInit), 00037 ExecStreamScheduler(pTraceTargetInit, nameInit) 00038 { 00039 }
DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler | ( | ) | [virtual] |
bool DfsTreeExecStreamScheduler::findNextConsumer | ( | ExecStreamGraphImpl & | graphImpl, | |
const ExecStreamGraphImpl::GraphRep & | graphRep, | |||
const ExecStream & | stream, | |||
ExecStreamGraphImpl::Edge & | edge, | |||
ExecStreamId & | current, | |||
ExecStreamBufState | skipState | |||
) | [private] |
Finds the next consumer to execute for a given producer.
graphImpl | current stream graph | |
graphRep | graph representation of current stream graph | |
stream | currrent execution stream | |
edge | returns edge to consumer to execute next | |
current | returns id of consumer to execute next | |
skipState | state to skip when looking for next consumer |
Definition at line 183 of file DfsTreeExecStreamScheduler.cpp.
References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), and TRACE_FINE.
Referenced by readStream().
00190 { 00191 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00192 boost::out_edges(current,graphRep); 00193 00194 bool emptyFound = false; 00195 // dummy initializations to avoid compiler error 00196 ExecStreamGraphImpl::Edge emptyEdge = edge; 00197 ExecStreamId emptyStreamId = current; 00198 00199 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00200 edge = *(outEdges.first); 00201 current = boost::target(edge,graphRep); 00202 if (boost::out_degree(current,graphRep) == 0) { 00203 // we've hit the output sentinel 00204 assert(!graphImpl.getStreamFromVertex(current)); 00205 FENNEL_TRACE( 00206 TRACE_FINE, 00207 "leaving readStream " << stream.getName()); 00208 return false; 00209 } 00210 00211 ExecStreamBufAccessor &bufAccessor = 00212 graphImpl.getBufAccessorFromEdge(edge); 00213 00214 // Save the first edge with an empty state that we find, but don't 00215 // return that as the next consumer. We want to give priority to 00216 // streams that have explicity requested data. So, only return the 00217 // empty edge consumer if there are no consumers that have explicitly 00218 // requested data. 00219 if (bufAccessor.getState() == EXECBUF_EMPTY) { 00220 if (!emptyFound) { 00221 emptyFound = true; 00222 emptyEdge = edge; 00223 emptyStreamId = current; 00224 } 00225 continue; 00226 } 00227 00228 if (bufAccessor.getState() != skipState) { 00229 break; 00230 } 00231 assert(!(skipState == EXECBUF_UNDERFLOW && 00232 bufAccessor.getState() == EXECBUF_EOS)); 00233 } 00234 00235 if (outEdges.first == outEdges.second && emptyFound) { 00236 edge = emptyEdge; 00237 current = emptyStreamId; 00238 } else { 00239 assert(!(skipState == EXECBUF_UNDERFLOW && 00240 outEdges.first == outEdges.second)); 00241 } 00242 00243 return true; 00244 }
void DfsTreeExecStreamScheduler::addGraph | ( | SharedExecStreamGraph | pGraph | ) | [virtual] |
Adds a graph to be scheduled.
Some implementations may require all graphs to be added before scheduler is started; others may allow graphs to be added at any time.
pGraph | the graph to be scheduled |
Reimplemented from ExecStreamScheduler.
Definition at line 45 of file DfsTreeExecStreamScheduler.cpp.
References ExecStreamScheduler::addGraph(), and pGraph.
00046 { 00047 assert(!pGraph); 00048 00049 ExecStreamScheduler::addGraph(pGraphInit); 00050 pGraph = pGraphInit; 00051 }
void DfsTreeExecStreamScheduler::removeGraph | ( | SharedExecStreamGraph | pGraph | ) | [virtual] |
Removes a graph currently being scheduled.
Some implementations may disallow graph removal except when scheduler is stopped; others may disallow graph removal altogether.
pGraph | the graph currently being scheduled |
Reimplemented from ExecStreamScheduler.
Definition at line 53 of file DfsTreeExecStreamScheduler.cpp.
References pGraph, and ExecStreamScheduler::removeGraph().
00054 { 00055 assert(pGraph == pGraphInit); 00056 00057 pGraph.reset(); 00058 ExecStreamScheduler::removeGraph(pGraphInit); 00059 }
void DfsTreeExecStreamScheduler::start | ( | ) | [virtual] |
Starts this scheduler, preparing it to execute streams.
Implements ExecStreamScheduler.
Definition at line 61 of file DfsTreeExecStreamScheduler.cpp.
References aborted, pGraph, and TRACE_FINE.
00062 { 00063 FENNEL_TRACE(TRACE_FINE,"start"); 00064 00065 // TODO jvs 2-Jan-2006: rename this class now that it's no longer 00066 // restricted to trees; come up with something more generic in case 00067 // DFS becomes irrelevant also. 00068 00069 // note: we no longer check that graph is a tree (or forest of trees) 00070 // since it is now possible to have multiple consumers from a single 00071 // producer 00072 assert(pGraph->isAcyclic()); 00073 aborted = false; 00074 }
void DfsTreeExecStreamScheduler::setRunnable | ( | ExecStream & | stream, | |
bool | ||||
) | [virtual] |
Sets whether that a specific stream should be considered for execution.
stream | the stream to make runnable |
Implements ExecStreamScheduler.
Definition at line 76 of file DfsTreeExecStreamScheduler.cpp.
void DfsTreeExecStreamScheduler::abort | ( | ExecStreamGraph & | graph | ) | [virtual] |
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
Returns immediately, not waiting for abort request to be fully processed.
graph | graph to abort; must be one of the graphs associated with this scheduler |
Implements ExecStreamScheduler.
Definition at line 81 of file DfsTreeExecStreamScheduler.cpp.
References aborted, and TRACE_FINE.
00082 { 00083 FENNEL_TRACE(TRACE_FINE,"abort requested"); 00084 00085 aborted = true; 00086 }
void DfsTreeExecStreamScheduler::checkAbort | ( | ) | const [virtual] |
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
Reimplemented from ExecStreamScheduler.
Definition at line 88 of file DfsTreeExecStreamScheduler.cpp.
References aborted, and TRACE_FINE.
Referenced by readStream().
00089 { 00090 if (aborted) { 00091 FENNEL_TRACE(TRACE_FINE,"abort detected"); 00092 throw AbortExcn(); 00093 } 00094 }
void DfsTreeExecStreamScheduler::stop | ( | ) | [virtual] |
Shuts down this scheduler, preventing any further streams from being scheduled.
Implements ExecStreamScheduler.
Definition at line 96 of file DfsTreeExecStreamScheduler.cpp.
References aborted, and TRACE_FINE.
00097 { 00098 FENNEL_TRACE(TRACE_FINE,"stop"); 00099 00100 // nothing to do 00101 aborted = false; 00102 }
ExecStreamBufAccessor & DfsTreeExecStreamScheduler::readStream | ( | ExecStream & | stream | ) | [virtual] |
Reads data from a stream, first performing any scheduling necessary to make output available.
stream | the stream from which to read |
Implements ExecStreamScheduler.
Definition at line 104 of file DfsTreeExecStreamScheduler.cpp.
References checkAbort(), EXECBUF_EOS, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamScheduler::executeStream(), findNextConsumer(), ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), and TRACE_FINE.
00106 { 00107 FENNEL_TRACE( 00108 TRACE_FINE, 00109 "entering readStream " << stream.getName()); 00110 00111 ExecStreamId current = stream.getStreamId(); 00112 ExecStreamQuantum quantum; 00113 00114 ExecStreamGraphImpl &graphImpl = 00115 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00116 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00117 00118 // assert that we're reading from a designated output stream 00119 assert(boost::out_degree(current,graphRep) == 1); 00120 assert(!graphImpl.getStreamFromVertex( 00121 boost::target( 00122 *(boost::out_edges(current,graphRep).first), 00123 graphRep))); 00124 00125 // TODO: assertions about accessor state/provision 00126 00127 for (;;) { 00128 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00129 boost::in_edges(current,graphRep); 00130 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00131 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00132 ExecStreamBufAccessor &bufAccessor = 00133 graphImpl.getBufAccessorFromEdge(edge); 00134 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) { 00135 // move current upstream 00136 current = boost::source(edge,graphRep); 00137 break; 00138 } 00139 } 00140 if (inEdges.first != inEdges.second) { 00141 // hit EXECBUF_UNDERFLOW 00142 continue; 00143 } 00144 00145 SharedExecStream pStream = graphImpl.getStreamFromVertex(current); 00146 ExecStreamResult rc = executeStream(*pStream, quantum); 00147 00148 checkAbort(); 00149 00150 ExecStreamGraphImpl::Edge edge; 00151 00152 switch (rc) { 00153 case EXECRC_EOS: 00154 // find a consumer that is not in EOS state 00155 if (!findNextConsumer( 00156 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS)) 00157 { 00158 return graphImpl.getBufAccessorFromEdge(edge); 00159 } 00160 // if all were in eos, just use the last consumer 00161 break; 00162 case EXECRC_BUF_OVERFLOW: 00163 // find a consumer that is not in underflow state; i.e., not 00164 // waiting on this producer to continue execution 00165 if (!findNextConsumer( 00166 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW)) 00167 { 00168 return graphImpl.getBufAccessorFromEdge(edge); 00169 } 00170 break; 00171 case EXECRC_BUF_UNDERFLOW: 00172 // TODO: assert that at least one input is in state 00173 // EXECBUF_UNDERFLOW 00174 break; 00175 case EXECRC_QUANTUM_EXPIRED: 00176 break; 00177 default: 00178 permAssert(false); 00179 } 00180 } 00181 }
ExecStreamResult ExecStreamScheduler::executeStream | ( | ExecStream & | stream, | |
ExecStreamQuantum const & | quantum | |||
) | [inline, protected, inherited] |
Executes one stream, performing tracing if enabled.
stream | stream to execute | |
quantum | quantum controlling stream execution |
Definition at line 243 of file ExecStreamScheduler.h.
References ExecStream::execute(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), and ExecStreamScheduler::tracingFine.
Referenced by readStream(), and ParallelExecStreamScheduler::tryExecuteTask().
00246 { 00247 if (tracingFine) { 00248 tracePreExecution(stream, quantum); 00249 ExecStreamResult rc = stream.execute(quantum); 00250 tracePostExecution(stream, rc); 00251 return rc; 00252 } else { 00253 return stream.execute(quantum); 00254 } 00255 }
void ExecStreamScheduler::tracePreExecution | ( | ExecStream & | stream, | |
ExecStreamQuantum const & | quantum | |||
) | [protected, virtual, inherited] |
Traces before execution of a stream.
stream | stream about to be executed | |
quantum | quantum controlling stream execution |
Definition at line 112 of file ExecStreamScheduler.cpp.
References ExecStream::getName(), ExecStream::getStreamId(), isMAXU(), ExecStreamQuantum::nTuplesMax, TRACE_FINE, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().
Referenced by ExecStreamScheduler::executeStream().
00115 { 00116 FENNEL_TRACE( 00117 TRACE_FINE, 00118 "executing " << stream.getStreamId() << ' ' << stream.getName()); 00119 if (!isMAXU(quantum.nTuplesMax)) { 00120 FENNEL_TRACE( 00121 TRACE_FINE, 00122 "nTuplesMax = " << quantum.nTuplesMax); 00123 } 00124 00125 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST); 00126 }
void ExecStreamScheduler::tracePostExecution | ( | ExecStream & | stream, | |
ExecStreamResult | rc | |||
) | [protected, virtual, inherited] |
Traces after execution of a stream.
stream | stream which was just executed | |
rc | result code returned by stream |
Definition at line 128 of file ExecStreamScheduler.cpp.
References ExecStreamResult_names, ExecStream::getName(), ExecStream::getStreamId(), TRACE_FINE, TRACE_FINER, TRACE_FINEST, and ExecStreamScheduler::traceStreamBuffers().
Referenced by ExecStreamScheduler::executeStream().
00131 { 00132 FENNEL_TRACE( 00133 TRACE_FINE, 00134 "executed " << stream.getStreamId() << ' ' << stream.getName() 00135 << " with result " << ExecStreamResult_names[rc]); 00136 00137 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER); 00138 }
void ExecStreamScheduler::traceStreamBuffers | ( | ExecStream & | stream, | |
TraceLevel | inputTupleTraceLevel, | |||
TraceLevel | outputTupleTraceLevel | |||
) | [protected, virtual, inherited] |
Traces the states of the input and output buffers adjacent to a stream.
stream | stream whose buffers are to be traced | |
inputTupleTraceLevel | trace level at which tuple contents of input buffers are to be traced | |
outputTupleTraceLevel | trace level at which tuple contents of output buffers are to be traced |
Definition at line 140 of file ExecStreamScheduler.cpp.
References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().
Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 }
void ExecStreamScheduler::traceStreamBufferContents | ( | ExecStream & | stream, | |
ExecStreamBufAccessor & | bufAccessor, | |||
TraceLevel | traceLevel | |||
) | [virtual, inherited] |
Traces the contents of a stream buffer.
stream | stream whose buffer is being traced | |
bufAccessor | accessor for stream buffer | |
traceLevel | level at which contents should be traced |
Definition at line 193 of file ExecStreamScheduler.cpp.
References ExecStreamBufAccessor::getConsumptionEnd(), ExecStreamBufAccessor::getConsumptionStart(), TupleAccessor::getCurrentByteCount(), ExecStreamBufAccessor::getScratchTupleAccessor(), ExecStreamBufAccessor::getTupleDesc(), TuplePrinter::print(), TupleAccessor::setCurrentTupleBuf(), TraceSource::trace(), and TupleAccessor::unmarshal().
Referenced by ExecStreamScheduler::traceStreamBuffers().
00197 { 00198 TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc(); 00199 TupleData tupleData(tupleDesc); 00200 TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor(); 00201 00202 for (PConstBuffer pTuple = bufAccessor.getConsumptionStart(); 00203 pTuple != bufAccessor.getConsumptionEnd(); 00204 pTuple += tupleAccessor.getCurrentByteCount()) 00205 { 00206 tupleAccessor.setCurrentTupleBuf(pTuple); 00207 // while we're here, we might as well sanity-check the content 00208 assert(pTuple + tupleAccessor.getCurrentByteCount() 00209 <= bufAccessor.getConsumptionEnd()); 00210 tupleAccessor.unmarshal(tupleData); 00211 // TODO: sanity-check individual data values? 00212 std::ostringstream oss; 00213 TuplePrinter tuplePrinter; 00214 tuplePrinter.print(oss,tupleDesc,tupleData); 00215 stream.trace(traceLevel,oss.str()); 00216 } 00217 }
void ExecStreamScheduler::makeRunnable | ( | ExecStream & | stream | ) | [inline, inherited] |
Requests that a specific stream be considered for execution.
stream | the stream to make runnable |
Reimplemented in ParallelExecStreamScheduler.
Definition at line 257 of file ExecStreamScheduler.h.
References ExecStreamScheduler::setRunnable().
00259 { 00260 setRunnable(stream, true); 00261 }
SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor | ( | ) | [virtual, inherited] |
Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
Definition at line 78 of file ExecStreamScheduler.cpp.
Referenced by ExecStreamGraphImpl::prepare().
00079 { 00080 return SharedExecStreamBufAccessor(new ExecStreamBufAccessor()); 00081 }
void ExecStreamScheduler::createBufferProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual, inherited] |
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
Default implementation is ScratchBufferExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
embryo | receives new adapter stream |
Reimplemented in ParallelExecStreamScheduler.
Definition at line 83 of file ExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
00085 { 00086 ScratchBufferExecStreamParams adapterParams; 00087 embryo.init( 00088 new ScratchBufferExecStream(), 00089 adapterParams); 00090 }
void ExecStreamScheduler::createCopyProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual, inherited] |
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
embryo | receives new adapter stream |
Definition at line 92 of file ExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
00094 { 00095 CopyExecStreamParams adapterParams; 00096 embryo.init( 00097 new CopyExecStream(), 00098 adapterParams); 00099 }
uint ExecStreamScheduler::getDegreeOfParallelism | ( | ) | [virtual, inherited] |
Reimplemented in ParallelExecStreamScheduler.
Definition at line 223 of file ExecStreamScheduler.cpp.
Referenced by MergeExecStream::open().
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
volatile bool DfsTreeExecStreamScheduler::aborted [private] |
Definition at line 45 of file DfsTreeExecStreamScheduler.h.
Referenced by abort(), checkAbort(), start(), and stop().
Definition at line 47 of file DfsTreeExecStreamScheduler.h.
Referenced by addGraph(), removeGraph(), and start().
bool ExecStreamScheduler::tracingFine [protected, inherited] |
Definition at line 47 of file ExecStreamScheduler.h.
Referenced by ExecStreamScheduler::addGraph(), ExecStreamScheduler::ExecStreamScheduler(), and ExecStreamScheduler::executeStream().