00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamScheduler.h"
00026 #include "fennel/exec/ExecStreamGraphImpl.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/CopyExecStream.h"
00029 #include "fennel/exec/ScratchBufferExecStream.h"
00030 #include "fennel/exec/ExecStreamEmbryo.h"
00031 #include "fennel/tuple/TupleData.h"
00032 #include "fennel/tuple/TuplePrinter.h"
00033
00034 #include <fstream>
00035
00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $");
00037
00038 ExecStreamScheduler::ExecStreamScheduler(
00039 SharedTraceTarget pTraceTargetInit,
00040 std::string nameInit)
00041 : TraceSource(pTraceTargetInit, nameInit)
00042 {
00043 tracingFine = isTracingLevel(TRACE_FINE);
00044 }
00045
00046 ExecStreamScheduler::~ExecStreamScheduler()
00047 {
00048 }
00049
00050 void ExecStreamScheduler::addGraph(SharedExecStreamGraph pGraph)
00051 {
00052 assert(!pGraph->pScheduler);
00053 pGraph->pScheduler = this;
00054
00055 if (tracingFine) {
00056 std::string dotFileName;
00057 const char *fennelHome = getenv("FENNEL_HOME");
00058 if (fennelHome) {
00059 dotFileName += fennelHome;
00060 dotFileName += "/trace/";
00061 }
00062 dotFileName += "ExecStreamGraph.dot";
00063 std::ofstream dotStream(dotFileName.c_str());
00064 pGraph->renderGraphviz(dotStream);
00065 }
00066
00067
00068
00069 std::vector<SharedExecStream> streams = pGraph->getSortedStreams();
00070 for (uint i = 0; i < streams.size(); ++i) {
00071 if (streams[i]->isTracingLevel(TRACE_FINE)) {
00072 tracingFine = true;
00073 return;
00074 }
00075 }
00076 }
00077
00078 SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor()
00079 {
00080 return SharedExecStreamBufAccessor(new ExecStreamBufAccessor());
00081 }
00082
00083 void ExecStreamScheduler::createBufferProvisionAdapter(
00084 ExecStreamEmbryo &embryo)
00085 {
00086 ScratchBufferExecStreamParams adapterParams;
00087 embryo.init(
00088 new ScratchBufferExecStream(),
00089 adapterParams);
00090 }
00091
00092 void ExecStreamScheduler::createCopyProvisionAdapter(
00093 ExecStreamEmbryo &embryo)
00094 {
00095 CopyExecStreamParams adapterParams;
00096 embryo.init(
00097 new CopyExecStream(),
00098 adapterParams);
00099 }
00100
00101 void ExecStreamScheduler::removeGraph(SharedExecStreamGraph pGraph)
00102 {
00103 assert(pGraph->pScheduler == this);
00104 pGraph->pScheduler = NULL;
00105 }
00106
00107
00108
00109
00110
00111
00112 void ExecStreamScheduler::tracePreExecution(
00113 ExecStream &stream,
00114 ExecStreamQuantum const &quantum)
00115 {
00116 FENNEL_TRACE(
00117 TRACE_FINE,
00118 "executing " << stream.getStreamId() << ' ' << stream.getName());
00119 if (!isMAXU(quantum.nTuplesMax)) {
00120 FENNEL_TRACE(
00121 TRACE_FINE,
00122 "nTuplesMax = " << quantum.nTuplesMax);
00123 }
00124
00125 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST);
00126 }
00127
00128 void ExecStreamScheduler::tracePostExecution(
00129 ExecStream &stream,
00130 ExecStreamResult rc)
00131 {
00132 FENNEL_TRACE(
00133 TRACE_FINE,
00134 "executed " << stream.getStreamId() << ' ' << stream.getName()
00135 << " with result " << ExecStreamResult_names[rc]);
00136
00137 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER);
00138 }
00139
00140 void ExecStreamScheduler::traceStreamBuffers(
00141 ExecStream &stream,
00142 TraceLevel inputTupleTraceLevel,
00143 TraceLevel outputTupleTraceLevel)
00144 {
00145 ExecStreamGraphImpl &graphImpl =
00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph());
00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00148
00149 ExecStreamGraphImpl::InEdgeIterPair inEdges =
00150 boost::in_edges(stream.getStreamId(),graphRep);
00151 for (uint i = 0; inEdges.first != inEdges.second;
00152 ++(inEdges.first), ++i)
00153 {
00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00155 ExecStreamBufAccessor &bufAccessor =
00156 graphImpl.getBufAccessorFromEdge(edge);
00157 FENNEL_TRACE(
00158 TRACE_FINER,
00159 "input buffer " << i << ": "
00160 << ExecStreamBufState_names[bufAccessor.getState()]
00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00162 << ", consumption available = "
00163 << bufAccessor.getConsumptionAvailable());
00164 if (stream.isTracingLevel(inputTupleTraceLevel)) {
00165 traceStreamBufferContents(
00166 stream, bufAccessor, inputTupleTraceLevel);
00167 }
00168 }
00169
00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00171 boost::out_edges(stream.getStreamId(),graphRep);
00172 for (uint i = 0; outEdges.first != outEdges.second;
00173 ++(outEdges.first), ++i) {
00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00175 ExecStreamBufAccessor &bufAccessor =
00176 graphImpl.getBufAccessorFromEdge(edge);
00177 FENNEL_TRACE(
00178 TRACE_FINER,
00179 "output buffer " << i << ": "
00180 << ExecStreamBufState_names[bufAccessor.getState()]
00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00182 << ", consumption available = "
00183 << bufAccessor.getConsumptionAvailable()
00184 << ", production available = "
00185 << bufAccessor.getProductionAvailable());
00186 if (stream.isTracingLevel(outputTupleTraceLevel)) {
00187 traceStreamBufferContents(
00188 stream, bufAccessor, outputTupleTraceLevel);
00189 }
00190 }
00191 }
00192
00193 void ExecStreamScheduler::traceStreamBufferContents(
00194 ExecStream &stream,
00195 ExecStreamBufAccessor &bufAccessor,
00196 TraceLevel traceLevel)
00197 {
00198 TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc();
00199 TupleData tupleData(tupleDesc);
00200 TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor();
00201
00202 for (PConstBuffer pTuple = bufAccessor.getConsumptionStart();
00203 pTuple != bufAccessor.getConsumptionEnd();
00204 pTuple += tupleAccessor.getCurrentByteCount())
00205 {
00206 tupleAccessor.setCurrentTupleBuf(pTuple);
00207
00208 assert(pTuple + tupleAccessor.getCurrentByteCount()
00209 <= bufAccessor.getConsumptionEnd());
00210 tupleAccessor.unmarshal(tupleData);
00211
00212 std::ostringstream oss;
00213 TuplePrinter tuplePrinter;
00214 tuplePrinter.print(oss,tupleDesc,tupleData);
00215 stream.trace(traceLevel,oss.str());
00216 }
00217 }
00218
00219 void ExecStreamScheduler::checkAbort() const
00220 {
00221 }
00222
00223 uint ExecStreamScheduler::getDegreeOfParallelism()
00224 {
00225 return 1;
00226 }
00227
00228 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $");
00229
00230