00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/DfsTreeExecStreamScheduler.h"
00026 #include "fennel/exec/ExecStreamGraphImpl.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamBufAccessor.h"
00029 #include "fennel/common/AbortExcn.h"
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $");
00032
00033 DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler(
00034 SharedTraceTarget pTraceTargetInit,
00035 std::string nameInit)
00036 : TraceSource(pTraceTargetInit, nameInit),
00037 ExecStreamScheduler(pTraceTargetInit, nameInit)
00038 {
00039 }
00040
00041 DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler()
00042 {
00043 }
00044
00045 void DfsTreeExecStreamScheduler::addGraph(SharedExecStreamGraph pGraphInit)
00046 {
00047 assert(!pGraph);
00048
00049 ExecStreamScheduler::addGraph(pGraphInit);
00050 pGraph = pGraphInit;
00051 }
00052
00053 void DfsTreeExecStreamScheduler::removeGraph(SharedExecStreamGraph pGraphInit)
00054 {
00055 assert(pGraph == pGraphInit);
00056
00057 pGraph.reset();
00058 ExecStreamScheduler::removeGraph(pGraphInit);
00059 }
00060
00061 void DfsTreeExecStreamScheduler::start()
00062 {
00063 FENNEL_TRACE(TRACE_FINE,"start");
00064
00065
00066
00067
00068
00069
00070
00071
00072 assert(pGraph->isAcyclic());
00073 aborted = false;
00074 }
00075
00076 void DfsTreeExecStreamScheduler::setRunnable(ExecStream &, bool)
00077 {
00078 permAssert(false);
00079 }
00080
00081 void DfsTreeExecStreamScheduler::abort(ExecStreamGraph &)
00082 {
00083 FENNEL_TRACE(TRACE_FINE,"abort requested");
00084
00085 aborted = true;
00086 }
00087
00088 void DfsTreeExecStreamScheduler::checkAbort() const
00089 {
00090 if (aborted) {
00091 FENNEL_TRACE(TRACE_FINE,"abort detected");
00092 throw AbortExcn();
00093 }
00094 }
00095
00096 void DfsTreeExecStreamScheduler::stop()
00097 {
00098 FENNEL_TRACE(TRACE_FINE,"stop");
00099
00100
00101 aborted = false;
00102 }
00103
00104 ExecStreamBufAccessor &DfsTreeExecStreamScheduler::readStream(
00105 ExecStream &stream)
00106 {
00107 FENNEL_TRACE(
00108 TRACE_FINE,
00109 "entering readStream " << stream.getName());
00110
00111 ExecStreamId current = stream.getStreamId();
00112 ExecStreamQuantum quantum;
00113
00114 ExecStreamGraphImpl &graphImpl =
00115 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00116 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00117
00118
00119 assert(boost::out_degree(current,graphRep) == 1);
00120 assert(!graphImpl.getStreamFromVertex(
00121 boost::target(
00122 *(boost::out_edges(current,graphRep).first),
00123 graphRep)));
00124
00125
00126
00127 for (;;) {
00128 ExecStreamGraphImpl::InEdgeIterPair inEdges =
00129 boost::in_edges(current,graphRep);
00130 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00131 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00132 ExecStreamBufAccessor &bufAccessor =
00133 graphImpl.getBufAccessorFromEdge(edge);
00134 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00135
00136 current = boost::source(edge,graphRep);
00137 break;
00138 }
00139 }
00140 if (inEdges.first != inEdges.second) {
00141
00142 continue;
00143 }
00144
00145 SharedExecStream pStream = graphImpl.getStreamFromVertex(current);
00146 ExecStreamResult rc = executeStream(*pStream, quantum);
00147
00148 checkAbort();
00149
00150 ExecStreamGraphImpl::Edge edge;
00151
00152 switch (rc) {
00153 case EXECRC_EOS:
00154
00155 if (!findNextConsumer(
00156 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS))
00157 {
00158 return graphImpl.getBufAccessorFromEdge(edge);
00159 }
00160
00161 break;
00162 case EXECRC_BUF_OVERFLOW:
00163
00164
00165 if (!findNextConsumer(
00166 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW))
00167 {
00168 return graphImpl.getBufAccessorFromEdge(edge);
00169 }
00170 break;
00171 case EXECRC_BUF_UNDERFLOW:
00172
00173
00174 break;
00175 case EXECRC_QUANTUM_EXPIRED:
00176 break;
00177 default:
00178 permAssert(false);
00179 }
00180 }
00181 }
00182
00183 bool DfsTreeExecStreamScheduler::findNextConsumer(
00184 ExecStreamGraphImpl &graphImpl,
00185 const ExecStreamGraphImpl::GraphRep &graphRep,
00186 const ExecStream &stream,
00187 ExecStreamGraphImpl::Edge &edge,
00188 ExecStreamId ¤t,
00189 ExecStreamBufState skipState)
00190 {
00191 ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00192 boost::out_edges(current,graphRep);
00193
00194 bool emptyFound = false;
00195
00196 ExecStreamGraphImpl::Edge emptyEdge = edge;
00197 ExecStreamId emptyStreamId = current;
00198
00199 for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00200 edge = *(outEdges.first);
00201 current = boost::target(edge,graphRep);
00202 if (boost::out_degree(current,graphRep) == 0) {
00203
00204 assert(!graphImpl.getStreamFromVertex(current));
00205 FENNEL_TRACE(
00206 TRACE_FINE,
00207 "leaving readStream " << stream.getName());
00208 return false;
00209 }
00210
00211 ExecStreamBufAccessor &bufAccessor =
00212 graphImpl.getBufAccessorFromEdge(edge);
00213
00214
00215
00216
00217
00218
00219 if (bufAccessor.getState() == EXECBUF_EMPTY) {
00220 if (!emptyFound) {
00221 emptyFound = true;
00222 emptyEdge = edge;
00223 emptyStreamId = current;
00224 }
00225 continue;
00226 }
00227
00228 if (bufAccessor.getState() != skipState) {
00229 break;
00230 }
00231 assert(!(skipState == EXECBUF_UNDERFLOW &&
00232 bufAccessor.getState() == EXECBUF_EOS));
00233 }
00234
00235 if (outEdges.first == outEdges.second && emptyFound) {
00236 edge = emptyEdge;
00237 current = emptyStreamId;
00238 } else {
00239 assert(!(skipState == EXECBUF_UNDERFLOW &&
00240 outEdges.first == outEdges.second));
00241 }
00242
00243 return true;
00244 }
00245
00246 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $");
00247
00248