00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamGraphImpl.h"
00026 #include "fennel/exec/ExecStream.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/ExecStreamScheduler.h"
00029 #include "fennel/exec/DynamicParam.h"
00030 #include "fennel/exec/ExecStreamGovernor.h"
00031 #include "fennel/segment/Segment.h"
00032 #include "fennel/exec/ScratchBufferExecStream.h"
00033 #include "fennel/common/Backtrace.h"
00034 #include "fennel/txn/LogicalTxn.h"
00035
00036 #include <boost/bind.hpp>
00037 #include <boost/graph/strong_components.hpp>
00038 #include <boost/graph/topological_sort.hpp>
00039 #include <boost/graph/graphviz.hpp>
00040
00041 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraph.cpp#34 $");
00042
00043 SharedExecStreamGraph ExecStreamGraph::newExecStreamGraph()
00044 {
00045 return SharedExecStreamGraph(
00046 new ExecStreamGraphImpl(),
00047 ClosableObjectDestructor());
00048 }
00049
00050 ExecStreamGraph::ExecStreamGraph()
00051 : pScheduler(NULL),
00052 pDynamicParamManager(new DynamicParamManager())
00053 {
00054 }
00055
00056 ExecStreamGraph::~ExecStreamGraph()
00057 {
00058 }
00059
00060 ExecStreamGraphImpl::ExecStreamGraphImpl()
00061 : filteredGraph(
00062 graphRep,
00063 boost::get(boost::edge_weight, graphRep))
00064 {
00065 isPrepared = false;
00066 isOpen = false;
00067 doDataflowClose = false;
00068 allowDummyTxnId = false;
00069 }
00070
00071 void ExecStreamGraphImpl::setTxn(SharedLogicalTxn pTxnInit)
00072 {
00073 pTxn = pTxnInit;
00074 }
00075
00076 void ExecStreamGraphImpl::setErrorTarget(SharedErrorTarget pErrorTargetInit)
00077 {
00078 pErrorTarget = pErrorTargetInit;
00079 }
00080
00081 void ExecStreamGraphImpl::setScratchSegment(
00082 SharedSegment pScratchSegmentInit)
00083 {
00084 pScratchSegment = pScratchSegmentInit;
00085 }
00086
00087 void ExecStreamGraphImpl::setResourceGovernor(
00088 SharedExecStreamGovernor pResourceGovernorInit)
00089 {
00090 pResourceGovernor = pResourceGovernorInit;
00091 }
00092
00093 SharedLogicalTxn ExecStreamGraphImpl::getTxn()
00094 {
00095 return pTxn;
00096 }
00097
00098 TxnId ExecStreamGraphImpl::getTxnId()
00099 {
00100 if (pTxn) {
00101 return pTxn->getTxnId();
00102 }
00103 assert(allowDummyTxnId);
00104 return FIRST_TXN_ID;
00105 }
00106
00107 void ExecStreamGraphImpl::enableDummyTxnId(bool enabled)
00108 {
00109 allowDummyTxnId = enabled;
00110 }
00111
00112 SharedExecStreamGovernor ExecStreamGraphImpl::getResourceGovernor()
00113 {
00114 return pResourceGovernor;
00115 }
00116
00117 ExecStreamGraphImpl::Vertex ExecStreamGraphImpl::newVertex()
00118 {
00119 if (freeVertices.size() > 0) {
00120 Vertex ret = freeVertices.back();
00121 freeVertices.pop_back();
00122 return ret;
00123 }
00124 return boost::add_vertex(graphRep);
00125 }
00126
00127 void ExecStreamGraphImpl::freeVertex(Vertex v)
00128 {
00129 boost::clear_vertex(v, graphRep);
00130 boost::get(boost::vertex_data, graphRep)[v].reset();
00131 freeVertices.push_back(v);
00132 }
00133
00134 int ExecStreamGraphImpl::getStreamCount()
00135 {
00136 return boost::num_vertices(graphRep) - freeVertices.size();
00137 }
00138
00139 ExecStreamGraphImpl::Vertex
00140 ExecStreamGraphImpl::addVertex(SharedExecStream pStream)
00141 {
00142 Vertex v = newVertex();
00143 boost::put(boost::vertex_data, graphRep, v, pStream);
00144 if (pStream) {
00145
00146
00147 const std::string& name = pStream->getName();
00148 if (name.length() == 0) {
00149 permFail("cannot add nameless stream to graph " << this);
00150 }
00151 if (findStream(name)) {
00152 permFail("cannot add stream " << name << " to graph " << this);
00153 }
00154 pStream->id = v;
00155 pStream->pGraph = this;
00156 streamMap[name] = pStream->getStreamId();
00157 }
00158 return v;
00159 }
00160
00161 void ExecStreamGraphImpl::addStream(
00162 SharedExecStream pStream)
00163 {
00164 (void) addVertex(pStream);
00165 }
00166
00167 void ExecStreamGraphImpl::removeStream(ExecStreamId id)
00168 {
00169 Vertex v = boost::vertices(graphRep).first[id];
00170 SharedExecStream pStream = getStreamFromVertex(v);
00171 permAssert(pStream->pGraph == this);
00172 permAssert(pStream->id == id);
00173
00174 streamMap.erase(pStream->getName());
00175 removeFromStreamOutMap(pStream);
00176 sortedStreams.clear();
00177 freeVertex(v);
00178
00179 pStream->pGraph = 0;
00180 pStream->id = 0;
00181 }
00182
00183 void ExecStreamGraphImpl::removeFromStreamOutMap(SharedExecStream p)
00184 {
00185 int outCt = getOutputCount(p->getStreamId());
00186 if (outCt > 0) {
00187 std::string name = p->getName();
00188
00189
00190 EdgeMap::iterator startNameRange =
00191 streamOutMap.find(std::make_pair(name, 0));
00192 EdgeMap::iterator endNameRange =
00193 streamOutMap.find(std::make_pair(name, outCt - 1));
00194 streamOutMap.erase(startNameRange, endNameRange);
00195 }
00196 }
00197
00198
00199
00200
00201 void ExecStreamGraphImpl::clear()
00202 {
00203 FgVertexIterPair verts = boost::vertices(graphRep);
00204 while (verts.first != verts.second) {
00205 Vertex v = *verts.first;
00206 freeVertex(v);
00207 ++verts.first;
00208 }
00209
00210 streamMap.clear();
00211 streamOutMap.clear();
00212 sortedStreams.clear();
00213 needsClose = isOpen = isPrepared = false;
00214 }
00215
00216 void ExecStreamGraphImpl::addDataflow(
00217 ExecStreamId producerId,
00218 ExecStreamId consumerId,
00219 bool isImplicit)
00220 {
00221 Edge newEdge =
00222 boost::add_edge(producerId, consumerId, graphRep).first;
00223 boost::put(
00224 boost::edge_weight,
00225 graphRep,
00226 newEdge,
00227 isImplicit ? 0 : 1);
00228 }
00229
00230 void ExecStreamGraphImpl::addOutputDataflow(
00231 ExecStreamId producerId)
00232 {
00233 Vertex consumerId = newVertex();
00234 Edge newEdge =
00235 boost::add_edge(producerId, consumerId, graphRep).first;
00236 boost::put(
00237 boost::edge_weight,
00238 graphRep,
00239 newEdge,
00240 1);
00241 }
00242
00243 void ExecStreamGraphImpl::addInputDataflow(
00244 ExecStreamId consumerId)
00245 {
00246 Vertex producerId = newVertex();
00247 Edge newEdge =
00248 boost::add_edge(producerId, consumerId, graphRep).first;
00249 boost::put(
00250 boost::edge_weight,
00251 graphRep,
00252 newEdge,
00253 1);
00254 }
00255
00256
00257 int ExecStreamGraphImpl::getDataflowCount()
00258 {
00259 return boost::num_edges(graphRep);
00260 }
00261
00262 void ExecStreamGraphImpl::mergeFrom(ExecStreamGraph& src)
00263 {
00264 if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) {
00265 mergeFrom(*p);
00266 return;
00267 }
00268 permFail("unknown subtype of ExecStreamGraph");
00269 }
00270
00271 void ExecStreamGraphImpl::mergeFrom(
00272 ExecStreamGraph& src,
00273 std::vector<ExecStreamId> const& nodes)
00274 {
00275 if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) {
00276 mergeFrom(*p, nodes);
00277 return;
00278 }
00279 permFail("unknown subtype of ExecStreamGraph");
00280 }
00281
00282 void ExecStreamGraphImpl::mergeFrom(ExecStreamGraphImpl& src)
00283 {
00284
00285
00286 permAssert(isPrepared && src.isPrepared);
00287 permAssert(isOpen == src.isOpen);
00288
00289
00290 std::map<Vertex, Vertex> vmap;
00291
00292
00293 FgVertexIterPair verts = boost::vertices(src.graphRep);
00294 for (; verts.first != verts.second; ++verts.first) {
00295 Vertex vsrc = *verts.first;
00296 SharedExecStream pStream = src.getStreamFromVertex(vsrc);
00297 Vertex vnew = addVertex(pStream);
00298 vmap[vsrc] = vnew;
00299 }
00300
00301
00302
00303 FgEdgeIterPair edges = boost::edges(src.graphRep);
00304 for (; edges.first != edges.second; ++edges.first) {
00305 Edge esrc = *edges.first;
00306 SharedExecStreamBufAccessor pBuf =
00307 src.getSharedBufAccessorFromEdge(esrc);
00308 std::pair<Edge, bool> x = boost::add_edge(
00309 vmap[boost::source(esrc, src.graphRep)],
00310 vmap[boost::target(esrc, src.graphRep)],
00311 pBuf,
00312 graphRep);
00313 boost::put(
00314 boost::edge_weight,
00315 graphRep,
00316 x.first,
00317 boost::get(boost::edge_weight, src.graphRep, esrc));
00318 assert(x.second);
00319 }
00320 src.clear();
00321 sortedStreams.clear();
00322 }
00323
00324
00325 void ExecStreamGraphImpl::mergeFrom(
00326 ExecStreamGraphImpl& src,
00327 std::vector<ExecStreamId> const& nodes)
00328 {
00329
00330 permAssert(isPrepared && src.isPrepared);
00331 permAssert(isOpen == src.isOpen);
00332
00333
00334 std::map<Vertex, Vertex> vmap;
00335
00336
00337 int nnodes = nodes.size();
00338 for (int i = 0; i < nnodes; i++) {
00339 Vertex vsrc = boost::vertices(src.graphRep).first[nodes[i]];
00340 SharedExecStream pStream = src.getStreamFromVertex(vsrc);
00341 Vertex vnew = addVertex(pStream);
00342 vmap[vsrc] = vnew;
00343 }
00344
00345
00346
00347
00348 if (nnodes > 1) {
00349 for (int i = 0; i < nnodes; i++) {
00350
00351 Vertex u = boost::vertices(src.graphRep).first[nodes[i]];
00352 for (FgOutEdgeIterPair edges = boost::out_edges(u, src.graphRep);
00353 edges.first != edges.second;
00354 ++edges.first)
00355 {
00356
00357 Edge e = *edges.first;
00358 assert(u == boost::source(e, src.graphRep));
00359 Vertex v = boost::target(e, src.graphRep);
00360
00361 if (vmap.find(v) != vmap.end()) {
00362 SharedExecStreamBufAccessor pBuf =
00363 src.getSharedBufAccessorFromEdge(e);
00364 std::pair<Edge, bool> x =
00365 boost::add_edge(
00366 vmap[u],
00367 vmap[v],
00368 pBuf,
00369 graphRep);
00370 assert(x.second);
00371 boost::put(
00372 boost::edge_weight,
00373 graphRep,
00374 x.first,
00375 boost::get(boost::edge_weight, src.graphRep, e));
00376 }
00377 }
00378 }
00379 }
00380
00381
00382 for (int i = 0; i < nnodes; i++) {
00383 Vertex v = boost::vertices(src.graphRep).first[nodes[i]];
00384 SharedExecStream pStream = src.getStreamFromVertex(v);
00385 src.streamMap.erase(pStream->getName());
00386 src.removeFromStreamOutMap(pStream);
00387 src.freeVertex(v);
00388 }
00389 src.sortedStreams.clear();
00390 sortedStreams.clear();
00391 }
00392
00393 SharedExecStream ExecStreamGraphImpl::findStream(
00394 std::string name)
00395 {
00396 StreamMapConstIter pPair = streamMap.find(name);
00397 if (pPair == streamMap.end()) {
00398 SharedExecStream nullStream;
00399 return nullStream;
00400 } else {
00401 return getStreamFromVertex(pPair->second);
00402 }
00403 }
00404
00405 SharedExecStream ExecStreamGraphImpl::findLastStream(
00406 std::string name,
00407 uint iOutput)
00408 {
00409 EdgeMap::const_iterator pPair =
00410 streamOutMap.find(std::make_pair(name, iOutput));
00411 if (pPair == streamOutMap.end()) {
00412 return findStream(name);
00413 } else {
00414 return getStreamFromVertex(pPair->second);
00415 }
00416 }
00417
00418 void ExecStreamGraphImpl::interposeStream(
00419 std::string name,
00420 uint iOutput,
00421 ExecStreamId interposedId)
00422 {
00423 SharedExecStream pLastStream = findLastStream(name, iOutput);
00424 permAssert(pLastStream.get());
00425 streamOutMap[std::make_pair(name, iOutput)] = interposedId;
00426 addDataflow(
00427 pLastStream->getStreamId(),
00428 interposedId,
00429 false);
00430 }
00431
00432 void ExecStreamGraphImpl::sortStreams()
00433 {
00434 std::vector<Vertex> sortedVertices;
00435 boost::topological_sort(
00436 graphRep,std::back_inserter(sortedVertices));
00437 sortedStreams.resize(sortedVertices.size());
00438
00439
00440
00441
00442 std::transform(
00443 sortedVertices.begin(),
00444 sortedVertices.end(),
00445 sortedStreams.rbegin(),
00446 boost::bind(&ExecStreamGraphImpl::getStreamFromVertex,this,_1));
00447
00448
00449 sortedStreams.erase(
00450 std::remove(
00451 sortedStreams.begin(),sortedStreams.end(),SharedExecStream()),
00452 sortedStreams.end());
00453 }
00454
00455 void ExecStreamGraphImpl::prepare(ExecStreamScheduler &scheduler)
00456 {
00457 isPrepared = true;
00458 sortStreams();
00459
00460
00461 EdgeIterPair edges = boost::edges(filteredGraph);
00462 for (; edges.first != edges.second; edges.first++) {
00463 SharedExecStreamBufAccessor pBufAccessor = scheduler.newBufAccessor();
00464 boost::put(boost::edge_data,graphRep,*(edges.first),pBufAccessor);
00465 }
00466
00467
00468 std::for_each(
00469 sortedStreams.begin(),
00470 sortedStreams.end(),
00471 boost::bind(
00472 &ExecStreamGraphImpl::bindStreamBufAccessors,this,_1));
00473 }
00474
00475 void ExecStreamGraphImpl::bindStreamBufAccessors(SharedExecStream pStream)
00476 {
00477 std::vector<SharedExecStreamBufAccessor> bufAccessors;
00478
00479
00480 InEdgeIterPair inEdges = boost::in_edges(
00481 pStream->getStreamId(),filteredGraph);
00482 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00483 SharedExecStreamBufAccessor pBufAccessor =
00484 getSharedBufAccessorFromEdge(*(inEdges.first));
00485 bufAccessors.push_back(pBufAccessor);
00486 }
00487 pStream->setInputBufAccessors(bufAccessors);
00488 bufAccessors.clear();
00489
00490
00491 OutEdgeIterPair outEdges = boost::out_edges(
00492 pStream->getStreamId(),filteredGraph);
00493 for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00494 SharedExecStreamBufAccessor pBufAccessor =
00495 getSharedBufAccessorFromEdge(*(outEdges.first));
00496 bufAccessors.push_back(pBufAccessor);
00497 pBufAccessor->setProvision(pStream->getOutputBufProvision());
00498 }
00499 pStream->setOutputBufAccessors(bufAccessors);
00500 }
00501
00502 void ExecStreamGraphImpl::open()
00503 {
00504 permAssert(!isOpen);
00505 isOpen = true;
00506 needsClose = true;
00507
00508
00509 EdgeIterPair edges = boost::edges(filteredGraph);
00510 for (; edges.first != edges.second; edges.first++) {
00511 ExecStreamBufAccessor &bufAccessor =
00512 getBufAccessorFromEdge(*(edges.first));
00513 bufAccessor.clear();
00514 }
00515
00516
00517 if (sortedStreams.empty()) {
00518
00519 sortStreams();
00520 }
00521 std::for_each(
00522 sortedStreams.begin(),
00523 sortedStreams.end(),
00524 boost::bind(&ExecStreamGraphImpl::openStream,this,_1));
00525 }
00526
00527 void ExecStreamGraphImpl::openStream(SharedExecStream pStream)
00528 {
00529 if (pErrorTarget) {
00530 pStream->initErrorSource(pErrorTarget, pStream->getName());
00531 }
00532 pStream->open(false);
00533 }
00534
00535 void ExecStreamGraphImpl::closeImpl()
00536 {
00537 isOpen = false;
00538 if (sortedStreams.empty()) {
00539
00540 sortStreams();
00541 }
00542 if (doDataflowClose) {
00543 std::for_each(
00544 sortedStreams.begin(),
00545 sortedStreams.end(),
00546 boost::bind(&ClosableObject::close,_1));
00547 } else {
00548 std::for_each(
00549 sortedStreams.rbegin(),
00550 sortedStreams.rend(),
00551 boost::bind(&ClosableObject::close,_1));
00552 }
00553 pDynamicParamManager->deleteAllParams();
00554 SharedExecStreamGovernor pGov = getResourceGovernor();
00555 if (pGov) {
00556 pGov->returnResources(*this);
00557 }
00558 pTxn.reset();
00559
00560
00561 if (pScratchSegment) {
00562 pScratchSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID);
00563 }
00564 }
00565
00566 SharedExecStream ExecStreamGraphImpl::getStream(ExecStreamId id)
00567 {
00568 Vertex v = boost::vertices(graphRep).first[id];
00569 return getStreamFromVertex(v);
00570 }
00571
00572 uint ExecStreamGraphImpl::getInputCount(
00573 ExecStreamId streamId)
00574 {
00575 Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00576 return boost::in_degree(streamVertex,filteredGraph);
00577 }
00578
00579 uint ExecStreamGraphImpl::getOutputCount(
00580 ExecStreamId streamId)
00581 {
00582 Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00583 return boost::out_degree(streamVertex,filteredGraph);
00584 }
00585
00586 ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getInputEdge(
00587 ExecStreamId streamId,
00588 uint iInput)
00589 {
00590 Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00591 InEdgeIter pEdge = boost::in_edges(streamVertex,filteredGraph).first;
00592 for (int i = 0; i < iInput; ++i) {
00593 ++pEdge;
00594 }
00595 return *pEdge;
00596 }
00597
00598 SharedExecStream ExecStreamGraphImpl::getStreamInput(
00599 ExecStreamId streamId,
00600 uint iInput)
00601 {
00602 Edge inputEdge = getInputEdge(streamId, iInput);
00603 Vertex inputVertex = boost::source(inputEdge,graphRep);
00604 return getStreamFromVertex(inputVertex);
00605 }
00606
00607 SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamInputAccessor(
00608 ExecStreamId streamId,
00609 uint iInput)
00610 {
00611 Edge inputEdge = getInputEdge(streamId, iInput);
00612 return getSharedBufAccessorFromEdge(inputEdge);
00613 }
00614
00615 ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getOutputEdge(
00616 ExecStreamId streamId,
00617 uint iOutput)
00618 {
00619 Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00620 OutEdgeIter pEdge = boost::out_edges(streamVertex,filteredGraph).first;
00621 for (int i = 0; i < iOutput; ++i) {
00622 ++pEdge;
00623 }
00624 return *pEdge;
00625 }
00626
00627 SharedExecStream ExecStreamGraphImpl::getStreamOutput(
00628 ExecStreamId streamId,
00629 uint iOutput)
00630 {
00631 Edge outputEdge = getOutputEdge(streamId, iOutput);
00632 Vertex outputVertex = boost::target(outputEdge,graphRep);
00633 return getStreamFromVertex(outputVertex);
00634 }
00635
00636 SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamOutputAccessor(
00637 ExecStreamId streamId,
00638 uint iOutput)
00639 {
00640 Edge outputEdge = getOutputEdge(streamId, iOutput);
00641 return getSharedBufAccessorFromEdge(outputEdge);
00642 }
00643
00644 std::vector<SharedExecStream> ExecStreamGraphImpl::getSortedStreams()
00645 {
00646 permAssert(isPrepared);
00647 if (sortedStreams.empty()) {
00648 sortStreams();
00649 }
00650 return sortedStreams;
00651 }
00652
00653 bool ExecStreamGraphImpl::isAcyclic()
00654 {
00655 int numVertices = boost::num_vertices(graphRep);
00656
00657
00658
00659 std::vector<int> component(numVertices);
00660 int nStrongComps = boost::strong_components(graphRep, &component[0]);
00661 return (nStrongComps >= numVertices);
00662 }
00663
00664 class ExecStreamGraphImpl::DotGraphRenderer
00665 {
00666 public:
00667 void operator()(std::ostream &out) const
00668 {
00669 out << "graph [bgcolor=gray, rankdir=BT]" << std::endl;
00670 out << "node [shape=record, style=filled, "
00671 << "fillcolor=white, fontsize=10.0]" << std::endl;
00672 out << "edge [fontsize=10.0]" << std::endl;
00673 }
00674 };
00675
00676 class ExecStreamGraphImpl::DotEdgeRenderer
00677 {
00678 ExecStreamGraphImpl &graph;
00679 public:
00680 DotEdgeRenderer(ExecStreamGraphImpl &graphInit)
00681 : graph(graphInit)
00682 {
00683 }
00684
00685 void operator()(
00686 std::ostream &out, ExecStreamGraphImpl::Edge const &edge) const
00687 {
00688 SharedExecStreamBufAccessor pAccessor =
00689 graph.getSharedBufAccessorFromEdge(edge);
00690 int weight = boost::get(
00691 boost::edge_weight, graph.getFullGraphRep(), edge);
00692 out << "[label=\"";
00693 if (pAccessor) {
00694 out << ExecStreamBufState_names[pAccessor->getState()];
00695 }
00696 out << "\"";
00697 if (!weight) {
00698 out << "style=\"dotted\"";
00699 }
00700 out << "]";
00701 }
00702 };
00703
00704 class ExecStreamGraphImpl::DotVertexRenderer
00705 {
00706 ExecStreamGraphImpl &graph;
00707 public:
00708 DotVertexRenderer(ExecStreamGraphImpl &graphInit)
00709 : graph(graphInit)
00710 {
00711 }
00712
00713 void operator()(std::ostream &out, ExecStreamId const &streamId) const
00714 {
00715 SharedExecStream pStream = graph.getStream(streamId);
00716 out << "[label=\"{";
00717 if (pStream) {
00718 out << streamId;
00719 out << "|";
00720 if (dynamic_cast<ScratchBufferExecStream *>(pStream.get())) {
00721 out << "MEMBUF";
00722 } else {
00723 Backtrace::writeDemangled(out, typeid(*pStream).name());
00724 out << "|";
00725 out << pStream->getName();
00726 }
00727 } else {
00728 out << "SINK";
00729 }
00730 out << "}\"]";
00731 }
00732 };
00733
00734 void ExecStreamGraphImpl::renderGraphviz(std::ostream &dotStream)
00735 {
00736 boost::write_graphviz(
00737 dotStream,
00738 graphRep,
00739 DotVertexRenderer(*this),
00740 DotEdgeRenderer(*this),
00741 DotGraphRenderer());
00742 }
00743
00744 void ExecStreamGraphImpl::closeProducers(ExecStreamId streamId)
00745 {
00746 FgInEdgeIterPair inEdges =
00747 boost::in_edges(streamId, graphRep);
00748 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00749 Edge edge = *(inEdges.first);
00750
00751 streamId = boost::source(edge,graphRep);
00752
00753
00754 SharedExecStream pStream = getStreamFromVertex(streamId);
00755 if (!pStream->canEarlyClose()) {
00756 continue;
00757 }
00758 closeProducers(streamId);
00759 pStream->close();
00760 }
00761 }
00762
00763 void ExecStreamGraphImpl::declareDynamicParamWriter(
00764 ExecStreamId streamId,
00765 DynamicParamId dynamicParamId)
00766 {
00767 DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00768 info.writerStreamIds.push_back(streamId);
00769 }
00770
00771 void ExecStreamGraphImpl::declareDynamicParamReader(
00772 ExecStreamId streamId,
00773 DynamicParamId dynamicParamId)
00774 {
00775 DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00776 info.readerStreamIds.push_back(streamId);
00777 }
00778
00779 const std::vector<ExecStreamId> &ExecStreamGraphImpl::getDynamicParamWriters(
00780 DynamicParamId dynamicParamId)
00781 {
00782 DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00783 return info.writerStreamIds;
00784 }
00785
00786 const std::vector<ExecStreamId> &ExecStreamGraphImpl::getDynamicParamReaders(
00787 DynamicParamId dynamicParamId)
00788 {
00789 DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00790 return info.readerStreamIds;
00791 }
00792
00793 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraph.cpp#34 $");
00794
00795