#include <ExecStreamScheduler.h>
Inheritance diagram for ExecStreamScheduler:
Public Member Functions | |
virtual | ~ExecStreamScheduler () |
virtual void | traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel) |
Traces the contents of a stream buffer. | |
virtual void | addGraph (SharedExecStreamGraph pGraph) |
Adds a graph to be scheduled. | |
virtual void | removeGraph (SharedExecStreamGraph pGraph) |
Removes a graph currently being scheduled. | |
virtual void | start ()=0 |
Starts this scheduler, preparing it to execute streams. | |
void | makeRunnable (ExecStream &stream) |
Requests that a specific stream be considered for execution. | |
virtual void | setRunnable (ExecStream &stream, bool runnable)=0 |
Sets whether that a specific stream should be considered for execution. | |
virtual void | abort (ExecStreamGraph &graph)=0 |
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn. | |
virtual void | stop ()=0 |
Shuts down this scheduler, preventing any further streams from being scheduled. | |
virtual SharedExecStreamBufAccessor | newBufAccessor () |
Creates a new ExecStreamBufAccessor suitable for use with this scheduler. | |
virtual void | createBufferProvisionAdapter (ExecStreamEmbryo &embryo) |
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER. | |
virtual void | createCopyProvisionAdapter (ExecStreamEmbryo &embryo) |
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER. | |
virtual ExecStreamBufAccessor & | readStream (ExecStream &stream)=0 |
Reads data from a stream, first performing any scheduling necessary to make output available. | |
virtual uint | getDegreeOfParallelism () |
| |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
Protected Member Functions | |
ExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name) | |
Constructs a new ExecStreamScheduler. | |
ExecStreamResult | executeStream (ExecStream &stream, ExecStreamQuantum const &quantum) |
Executes one stream, performing tracing if enabled. | |
virtual void | tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum) |
Traces before execution of a stream. | |
virtual void | tracePostExecution (ExecStream &stream, ExecStreamResult rc) |
Traces after execution of a stream. | |
virtual void | traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel) |
Traces the states of the input and output buffers adjacent to a stream. | |
Protected Attributes | |
bool | tracingFine |
A scheduler determines which execution streams to run and in what order. For more information, see SchedulerDesign.
Definition at line 42 of file ExecStreamScheduler.h.
ExecStreamScheduler::ExecStreamScheduler | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [explicit, protected] |
Constructs a new ExecStreamScheduler.
pTraceTarget | the TraceTarget to which messages will be sent, or NULL to disable tracing entirely | |
name | the name to use for tracing this scheduler |
Definition at line 38 of file ExecStreamScheduler.cpp.
References TraceSource::isTracingLevel(), TRACE_FINE, and tracingFine.
00041 : TraceSource(pTraceTargetInit, nameInit) 00042 { 00043 tracingFine = isTracingLevel(TRACE_FINE); 00044 }
ExecStreamScheduler::~ExecStreamScheduler | ( | ) | [virtual] |
ExecStreamResult ExecStreamScheduler::executeStream | ( | ExecStream & | stream, | |
ExecStreamQuantum const & | quantum | |||
) | [inline, protected] |
Executes one stream, performing tracing if enabled.
stream | stream to execute | |
quantum | quantum controlling stream execution |
Definition at line 243 of file ExecStreamScheduler.h.
References ExecStream::execute(), tracePostExecution(), tracePreExecution(), and tracingFine.
Referenced by DfsTreeExecStreamScheduler::readStream(), and ParallelExecStreamScheduler::tryExecuteTask().
00246 { 00247 if (tracingFine) { 00248 tracePreExecution(stream, quantum); 00249 ExecStreamResult rc = stream.execute(quantum); 00250 tracePostExecution(stream, rc); 00251 return rc; 00252 } else { 00253 return stream.execute(quantum); 00254 } 00255 }
void ExecStreamScheduler::tracePreExecution | ( | ExecStream & | stream, | |
ExecStreamQuantum const & | quantum | |||
) | [protected, virtual] |
Traces before execution of a stream.
stream | stream about to be executed | |
quantum | quantum controlling stream execution |
Definition at line 112 of file ExecStreamScheduler.cpp.
References ExecStream::getName(), ExecStream::getStreamId(), isMAXU(), ExecStreamQuantum::nTuplesMax, TRACE_FINE, TRACE_FINEST, and traceStreamBuffers().
Referenced by executeStream().
00115 { 00116 FENNEL_TRACE( 00117 TRACE_FINE, 00118 "executing " << stream.getStreamId() << ' ' << stream.getName()); 00119 if (!isMAXU(quantum.nTuplesMax)) { 00120 FENNEL_TRACE( 00121 TRACE_FINE, 00122 "nTuplesMax = " << quantum.nTuplesMax); 00123 } 00124 00125 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST); 00126 }
void ExecStreamScheduler::tracePostExecution | ( | ExecStream & | stream, | |
ExecStreamResult | rc | |||
) | [protected, virtual] |
Traces after execution of a stream.
stream | stream which was just executed | |
rc | result code returned by stream |
Definition at line 128 of file ExecStreamScheduler.cpp.
References ExecStreamResult_names, ExecStream::getName(), ExecStream::getStreamId(), TRACE_FINE, TRACE_FINER, TRACE_FINEST, and traceStreamBuffers().
Referenced by executeStream().
00131 { 00132 FENNEL_TRACE( 00133 TRACE_FINE, 00134 "executed " << stream.getStreamId() << ' ' << stream.getName() 00135 << " with result " << ExecStreamResult_names[rc]); 00136 00137 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER); 00138 }
void ExecStreamScheduler::traceStreamBuffers | ( | ExecStream & | stream, | |
TraceLevel | inputTupleTraceLevel, | |||
TraceLevel | outputTupleTraceLevel | |||
) | [protected, virtual] |
Traces the states of the input and output buffers adjacent to a stream.
stream | stream whose buffers are to be traced | |
inputTupleTraceLevel | trace level at which tuple contents of input buffers are to be traced | |
outputTupleTraceLevel | trace level at which tuple contents of output buffers are to be traced |
Definition at line 140 of file ExecStreamScheduler.cpp.
References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and traceStreamBufferContents().
Referenced by tracePostExecution(), and tracePreExecution().
00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 }
void ExecStreamScheduler::traceStreamBufferContents | ( | ExecStream & | stream, | |
ExecStreamBufAccessor & | bufAccessor, | |||
TraceLevel | traceLevel | |||
) | [virtual] |
Traces the contents of a stream buffer.
stream | stream whose buffer is being traced | |
bufAccessor | accessor for stream buffer | |
traceLevel | level at which contents should be traced |
Definition at line 193 of file ExecStreamScheduler.cpp.
References ExecStreamBufAccessor::getConsumptionEnd(), ExecStreamBufAccessor::getConsumptionStart(), TupleAccessor::getCurrentByteCount(), ExecStreamBufAccessor::getScratchTupleAccessor(), ExecStreamBufAccessor::getTupleDesc(), TuplePrinter::print(), TupleAccessor::setCurrentTupleBuf(), TraceSource::trace(), and TupleAccessor::unmarshal().
Referenced by traceStreamBuffers().
00197 { 00198 TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc(); 00199 TupleData tupleData(tupleDesc); 00200 TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor(); 00201 00202 for (PConstBuffer pTuple = bufAccessor.getConsumptionStart(); 00203 pTuple != bufAccessor.getConsumptionEnd(); 00204 pTuple += tupleAccessor.getCurrentByteCount()) 00205 { 00206 tupleAccessor.setCurrentTupleBuf(pTuple); 00207 // while we're here, we might as well sanity-check the content 00208 assert(pTuple + tupleAccessor.getCurrentByteCount() 00209 <= bufAccessor.getConsumptionEnd()); 00210 tupleAccessor.unmarshal(tupleData); 00211 // TODO: sanity-check individual data values? 00212 std::ostringstream oss; 00213 TuplePrinter tuplePrinter; 00214 tuplePrinter.print(oss,tupleDesc,tupleData); 00215 stream.trace(traceLevel,oss.str()); 00216 } 00217 }
void ExecStreamScheduler::addGraph | ( | SharedExecStreamGraph | pGraph | ) | [virtual] |
Adds a graph to be scheduled.
Some implementations may require all graphs to be added before scheduler is started; others may allow graphs to be added at any time.
pGraph | the graph to be scheduled |
Reimplemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
Definition at line 50 of file ExecStreamScheduler.cpp.
References TraceSource::isTracingLevel(), TRACE_FINE, and tracingFine.
Referenced by ParallelExecStreamScheduler::addGraph(), and DfsTreeExecStreamScheduler::addGraph().
00051 { 00052 assert(!pGraph->pScheduler); 00053 pGraph->pScheduler = this; 00054 00055 if (tracingFine) { 00056 std::string dotFileName; 00057 const char *fennelHome = getenv("FENNEL_HOME"); 00058 if (fennelHome) { 00059 dotFileName += fennelHome; 00060 dotFileName += "/trace/"; 00061 } 00062 dotFileName += "ExecStreamGraph.dot"; 00063 std::ofstream dotStream(dotFileName.c_str()); 00064 pGraph->renderGraphviz(dotStream); 00065 } 00066 00067 // if any of the streams in the new graph require tracing, then 00068 // disable our tracing short-circuit 00069 std::vector<SharedExecStream> streams = pGraph->getSortedStreams(); 00070 for (uint i = 0; i < streams.size(); ++i) { 00071 if (streams[i]->isTracingLevel(TRACE_FINE)) { 00072 tracingFine = true; 00073 return; 00074 } 00075 } 00076 }
void ExecStreamScheduler::removeGraph | ( | SharedExecStreamGraph | pGraph | ) | [virtual] |
Removes a graph currently being scheduled.
Some implementations may disallow graph removal except when scheduler is stopped; others may disallow graph removal altogether.
pGraph | the graph currently being scheduled |
Reimplemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
Definition at line 101 of file ExecStreamScheduler.cpp.
Referenced by ParallelExecStreamScheduler::removeGraph(), and DfsTreeExecStreamScheduler::removeGraph().
virtual void ExecStreamScheduler::start | ( | ) | [pure virtual] |
Starts this scheduler, preparing it to execute streams.
Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
void ExecStreamScheduler::makeRunnable | ( | ExecStream & | stream | ) | [inline] |
Requests that a specific stream be considered for execution.
stream | the stream to make runnable |
Reimplemented in ParallelExecStreamScheduler.
Definition at line 257 of file ExecStreamScheduler.h.
References setRunnable().
00259 { 00260 setRunnable(stream, true); 00261 }
virtual void ExecStreamScheduler::setRunnable | ( | ExecStream & | stream, | |
bool | runnable | |||
) | [pure virtual] |
Sets whether that a specific stream should be considered for execution.
stream | the stream to make runnable |
Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
Referenced by CorrelationJoinExecStream::execute(), makeRunnable(), and CorrelationJoinExecStream::open().
virtual void ExecStreamScheduler::abort | ( | ExecStreamGraph & | graph | ) | [pure virtual] |
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
Returns immediately, not waiting for abort request to be fully processed.
graph | graph to abort; must be one of the graphs associated with this scheduler |
Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
void ExecStreamScheduler::checkAbort | ( | ) | const [virtual] |
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
Reimplemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
Definition at line 219 of file ExecStreamScheduler.cpp.
Referenced by ExecStream::checkAbort().
virtual void ExecStreamScheduler::stop | ( | ) | [pure virtual] |
Shuts down this scheduler, preventing any further streams from being scheduled.
Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor | ( | ) | [virtual] |
Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
Definition at line 78 of file ExecStreamScheduler.cpp.
Referenced by ExecStreamGraphImpl::prepare().
00079 { 00080 return SharedExecStreamBufAccessor(new ExecStreamBufAccessor()); 00081 }
void ExecStreamScheduler::createBufferProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual] |
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
Default implementation is ScratchBufferExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
embryo | receives new adapter stream |
Reimplemented in ParallelExecStreamScheduler.
Definition at line 83 of file ExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
00085 { 00086 ScratchBufferExecStreamParams adapterParams; 00087 embryo.init( 00088 new ScratchBufferExecStream(), 00089 adapterParams); 00090 }
void ExecStreamScheduler::createCopyProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual] |
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
embryo | receives new adapter stream |
Definition at line 92 of file ExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
00094 { 00095 CopyExecStreamParams adapterParams; 00096 embryo.init( 00097 new CopyExecStream(), 00098 adapterParams); 00099 }
virtual ExecStreamBufAccessor& ExecStreamScheduler::readStream | ( | ExecStream & | stream | ) | [pure virtual] |
Reads data from a stream, first performing any scheduling necessary to make output available.
stream | the stream from which to read |
Implemented in DfsTreeExecStreamScheduler, and ParallelExecStreamScheduler.
Referenced by Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch().
uint ExecStreamScheduler::getDegreeOfParallelism | ( | ) | [virtual] |
Reimplemented in ParallelExecStreamScheduler.
Definition at line 223 of file ExecStreamScheduler.cpp.
Referenced by MergeExecStream::open().
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
bool ExecStreamScheduler::tracingFine [protected] |
Definition at line 47 of file ExecStreamScheduler.h.
Referenced by addGraph(), ExecStreamScheduler(), and executeStream().