00001 /*
00002 // $Id: //open/dev/fennel/exec/ExecStreamGraph.cpp#34 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamGraphImpl.h"
00026 #include "fennel/exec/ExecStream.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/ExecStreamScheduler.h"
00029 #include "fennel/exec/DynamicParam.h"
00030 #include "fennel/exec/ExecStreamGovernor.h"
00031 #include "fennel/segment/Segment.h"
00032 #include "fennel/exec/ScratchBufferExecStream.h"
00033 #include "fennel/common/Backtrace.h"
00034 #include "fennel/txn/LogicalTxn.h"
00036 #include <boost/bind.hpp>
00037 #include <boost/graph/strong_components.hpp>
00038 #include <boost/graph/topological_sort.hpp>
00039 #include <boost/graph/graphviz.hpp>
00041 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraph.cpp#34 $");
00043 SharedExecStreamGraph ExecStreamGraph::newExecStreamGraph()
00044 {
00045     return SharedExecStreamGraph(
00046         new ExecStreamGraphImpl(),
00047         ClosableObjectDestructor());
00048 }
00050 ExecStreamGraph::ExecStreamGraph()
00051     : pScheduler(NULL),
00052       pDynamicParamManager(new DynamicParamManager())
00053 {
00054 }
00056 ExecStreamGraph::~ExecStreamGraph()
00057 {
00058 }
00060 ExecStreamGraphImpl::ExecStreamGraphImpl()
00061     : filteredGraph(
00062         graphRep,
00063         boost::get(boost::edge_weight, graphRep))
00064 {
00065     isPrepared = false;
00066     isOpen = false;
00067     doDataflowClose = false;
00068     allowDummyTxnId = false;
00069 }
00071 void ExecStreamGraphImpl::setTxn(SharedLogicalTxn pTxnInit)
00072 {
00073     pTxn = pTxnInit;
00074 }
00076 void ExecStreamGraphImpl::setErrorTarget(SharedErrorTarget pErrorTargetInit)
00077 {
00078     pErrorTarget = pErrorTargetInit;
00079 }
00081 void ExecStreamGraphImpl::setScratchSegment(
00082     SharedSegment pScratchSegmentInit)
00083 {
00084     pScratchSegment = pScratchSegmentInit;
00085 }
00087 void ExecStreamGraphImpl::setResourceGovernor(
00088     SharedExecStreamGovernor pResourceGovernorInit)
00089 {
00090     pResourceGovernor = pResourceGovernorInit;
00091 }
00093 SharedLogicalTxn ExecStreamGraphImpl::getTxn()
00094 {
00095     return pTxn;
00096 }
00098 TxnId ExecStreamGraphImpl::getTxnId()
00099 {
00100     if (pTxn) {
00101         return pTxn->getTxnId();
00102     }
00103     assert(allowDummyTxnId);
00104     return FIRST_TXN_ID;
00105 }
00107 void ExecStreamGraphImpl::enableDummyTxnId(bool enabled)
00108 {
00109     allowDummyTxnId = enabled;
00110 }
00112 SharedExecStreamGovernor ExecStreamGraphImpl::getResourceGovernor()
00113 {
00114     return pResourceGovernor;
00115 }
00117 ExecStreamGraphImpl::Vertex ExecStreamGraphImpl::newVertex()
00118 {
00119     if (freeVertices.size() > 0) {
00120         Vertex ret = freeVertices.back();
00121         freeVertices.pop_back();
00122         return ret;
00123     }
00124     return boost::add_vertex(graphRep);
00125 }
00127 void ExecStreamGraphImpl::freeVertex(Vertex v)
00128 {
00129     boost::clear_vertex(v, graphRep);
00130     boost::get(boost::vertex_data, graphRep)[v].reset();
00131     freeVertices.push_back(v);
00132 }
00134 int ExecStreamGraphImpl::getStreamCount()
00135 {
00136     return boost::num_vertices(graphRep) - freeVertices.size();
00137 }
00139 ExecStreamGraphImpl::Vertex
00140 ExecStreamGraphImpl::addVertex(SharedExecStream pStream)
00141 {
00142     Vertex v = newVertex();
00143     boost::put(boost::vertex_data, graphRep, v, pStream);
00144     if (pStream) {
00145         // Note that pStream can be null for an exterior node in a farrago
00146         // graph.  Guard against duplicating a stream name.
00147         const std::string& name = pStream->getName();
00148         if (name.length() == 0) {
00149             permFail("cannot add nameless stream to graph " << this);
00150         }
00151         if (findStream(name)) {
00152             permFail("cannot add stream " << name << " to graph " << this);
00153         }
00154         pStream->id = v;
00155         pStream->pGraph = this;
00156         streamMap[name] = pStream->getStreamId();
00157     }
00158     return v;
00159 }
00161 void ExecStreamGraphImpl::addStream(
00162     SharedExecStream pStream)
00163 {
00164     (void) addVertex(pStream);
00165 }
00167 void ExecStreamGraphImpl::removeStream(ExecStreamId id)
00168 {
00169     Vertex v = boost::vertices(graphRep).first[id];
00170     SharedExecStream pStream = getStreamFromVertex(v);
00171     permAssert(pStream->pGraph == this);
00172     permAssert(pStream->id == id);
00174     streamMap.erase(pStream->getName());
00175     removeFromStreamOutMap(pStream);
00176     sortedStreams.clear();              // invalidate list: recreated on demand
00177     freeVertex(v);
00178     // stream is now detached from any graph, and not usable.
00179     pStream->pGraph = 0;
00180     pStream->id = 0;
00181 }
00183 void ExecStreamGraphImpl::removeFromStreamOutMap(SharedExecStream p)
00184 {
00185     int outCt = getOutputCount(p->getStreamId());
00186     if (outCt > 0) {
00187         std::string name = p->getName();
00188         // assumes map key pairs <name, index> sort lexicographically, so
00189         // <name, *> is contiguous.
00190         EdgeMap::iterator startNameRange =
00191             streamOutMap.find(std::make_pair(name, 0));
00192         EdgeMap::iterator endNameRange =
00193             streamOutMap.find(std::make_pair(name, outCt - 1));
00194         streamOutMap.erase(startNameRange, endNameRange);
00195     }
00196 }
00198 // Deletes all edges and puts all vertices on the free list;
00199 // almost like removeStream() on all vertices,
00200 // but doesn't affect the ExecStream which no longer belongs to this graph.
00201 void ExecStreamGraphImpl::clear()
00202 {
00203     FgVertexIterPair verts = boost::vertices(graphRep);
00204     while (verts.first != verts.second) {
00205         Vertex v = *verts.first;
00206         freeVertex(v);
00207         ++verts.first;
00208     }
00210     streamMap.clear();
00211     streamOutMap.clear();
00212     sortedStreams.clear();
00213     needsClose = isOpen = isPrepared = false;
00214 }
00216 void ExecStreamGraphImpl::addDataflow(
00217     ExecStreamId producerId,
00218     ExecStreamId consumerId,
00219     bool isImplicit)
00220 {
00221     Edge newEdge =
00222         boost::add_edge(producerId, consumerId, graphRep).first;
00223     boost::put(
00224         boost::edge_weight,
00225         graphRep,
00226         newEdge,
00227         isImplicit ? 0 : 1);
00228 }
00230 void ExecStreamGraphImpl::addOutputDataflow(
00231     ExecStreamId producerId)
00232 {
00233     Vertex consumerId = newVertex();
00234     Edge newEdge =
00235         boost::add_edge(producerId, consumerId, graphRep).first;
00236     boost::put(
00237         boost::edge_weight,
00238         graphRep,
00239         newEdge,
00240         1);
00241 }
00243 void ExecStreamGraphImpl::addInputDataflow(
00244     ExecStreamId consumerId)
00245 {
00246     Vertex producerId = newVertex();
00247     Edge newEdge =
00248         boost::add_edge(producerId, consumerId, graphRep).first;
00249     boost::put(
00250         boost::edge_weight,
00251         graphRep,
00252         newEdge,
00253         1);
00254 }
00257 int ExecStreamGraphImpl::getDataflowCount()
00258 {
00259     return boost::num_edges(graphRep);
00260 }
00262 void ExecStreamGraphImpl::mergeFrom(ExecStreamGraph& src)
00263 {
00264     if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) {
00265         mergeFrom(*p);
00266         return;
00267     }
00268     permFail("unknown subtype of ExecStreamGraph");
00269 }
00271 void ExecStreamGraphImpl::mergeFrom(
00272     ExecStreamGraph& src,
00273     std::vector<ExecStreamId> const& nodes)
00274 {
00275     if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) {
00276         mergeFrom(*p, nodes);
00277         return;
00278     }
00279     permFail("unknown subtype of ExecStreamGraph");
00280 }
00282 void ExecStreamGraphImpl::mergeFrom(ExecStreamGraphImpl& src)
00283 {
00284     // Since the identity of the added graph SRC will be lost, at this time both
00285     // graphs must be prepared, and must both be open or both be closed.
00286     permAssert(isPrepared && src.isPrepared);
00287     permAssert(isOpen == src.isOpen);
00289     // map a source vertex ID to the ID of the copied target vertex
00290     std::map<Vertex, Vertex> vmap;
00292     // copy the nodes (with attached streams)
00293     FgVertexIterPair verts = boost::vertices(src.graphRep);
00294     for (; verts.first != verts.second; ++verts.first) {
00295         Vertex vsrc = *verts.first;
00296         SharedExecStream pStream = src.getStreamFromVertex(vsrc);
00297         Vertex vnew = addVertex(pStream);
00298         vmap[vsrc] = vnew;
00299     }
00301     // copy the edges (with attached buffers, which stay bound to the adjacent
00302     // streams)
00303     FgEdgeIterPair edges = boost::edges(src.graphRep);
00304     for (; edges.first != edges.second; ++edges.first) {
00305         Edge esrc = *edges.first;
00306         SharedExecStreamBufAccessor pBuf =
00307             src.getSharedBufAccessorFromEdge(esrc);
00308         std::pair<Edge, bool> x = boost::add_edge(
00309             vmap[boost::source(esrc, src.graphRep)], // image of source node
00310             vmap[boost::target(esrc, src.graphRep)], // image of target node
00311             pBuf,
00312             graphRep);
00313         boost::put(
00314             boost::edge_weight,
00315             graphRep,
00316             x.first,
00317             boost::get(boost::edge_weight, src.graphRep, esrc));
00318         assert(x.second);
00319     }
00320     src.clear();                        // source is empty
00321     sortedStreams.clear();              // invalid now
00322 }
00324 // merges a subgraph, viz the induced subgraph of a set of NODES of SRC
00325 void ExecStreamGraphImpl::mergeFrom(
00326     ExecStreamGraphImpl& src,
00327     std::vector<ExecStreamId> const& nodes)
00328 {
00329     // both graphs must be prepared, and must both be open or both be closed.
00330     permAssert(isPrepared && src.isPrepared);
00331     permAssert(isOpen == src.isOpen);
00333     // map a source vertex ID to the ID of the copied target vertex
00334     std::map<Vertex, Vertex> vmap;
00336     // Copy the nodes (with attached streams)
00337     int nnodes = nodes.size();
00338     for (int i = 0; i < nnodes; i++) {
00339         Vertex vsrc = boost::vertices(src.graphRep).first[nodes[i]];
00340         SharedExecStream pStream = src.getStreamFromVertex(vsrc);
00341         Vertex vnew = addVertex(pStream);
00342         vmap[vsrc] = vnew;
00343     }
00345     // Copy the internal edges (with attached buffers, which stay bound to the
00346     // adjacent streams).  It suffices to scan the outbound edges. The external
00347     // edges are abandoned.
00348     if (nnodes > 1) {                   // (when only 1 node, no internal edges)
00349         for (int i = 0; i < nnodes; i++) {
00350             // Find all outbound edges E (U,V) in the source subgraph
00351             Vertex u = boost::vertices(src.graphRep).first[nodes[i]];
00352             for (FgOutEdgeIterPair edges = boost::out_edges(u, src.graphRep);
00353                  edges.first != edges.second;
00354                  ++edges.first)
00355             {
00356                 // an edge e (u, v) in the source graph
00357                 Edge e = *edges.first;
00358                 assert(u == boost::source(e, src.graphRep));
00359                 Vertex v = boost::target(e, src.graphRep);
00360                 // V is in the subgraph iff v is a key in the map vmap[]
00361                 if (vmap.find(v) != vmap.end()) {
00362                     SharedExecStreamBufAccessor pBuf =
00363                         src.getSharedBufAccessorFromEdge(e);
00364                     std::pair<Edge, bool> x =
00365                         boost::add_edge(
00366                             vmap[u],
00367                             vmap[v],
00368                             pBuf,
00369                             graphRep);
00370                     assert(x.second);
00371                     boost::put(
00372                         boost::edge_weight,
00373                         graphRep,
00374                         x.first,
00375                         boost::get(boost::edge_weight, src.graphRep, e));
00376                 }
00377             }
00378         }
00379     }
00381     // delete the copied subgraph from SRC
00382     for (int i = 0; i < nnodes; i++) {
00383         Vertex v = boost::vertices(src.graphRep).first[nodes[i]];
00384         SharedExecStream pStream = src.getStreamFromVertex(v);
00385         src.streamMap.erase(pStream->getName());
00386         src.removeFromStreamOutMap(pStream);
00387         src.freeVertex(v);
00388     }
00389     src.sortedStreams.clear();          // invalidate
00390     sortedStreams.clear();              // invalidate
00391 }
00393 SharedExecStream ExecStreamGraphImpl::findStream(
00394     std::string name)
00395 {
00396     StreamMapConstIter pPair = streamMap.find(name);
00397     if (pPair == streamMap.end()) {
00398         SharedExecStream nullStream;
00399         return nullStream;
00400     } else {
00401         return getStreamFromVertex(pPair->second);
00402     }
00403 }
00405 SharedExecStream ExecStreamGraphImpl::findLastStream(
00406     std::string name,
00407     uint iOutput)
00408 {
00409     EdgeMap::const_iterator pPair =
00410         streamOutMap.find(std::make_pair(name, iOutput));
00411     if (pPair == streamOutMap.end()) {
00412         return findStream(name);
00413     } else {
00414         return getStreamFromVertex(pPair->second);
00415     }
00416 }
00418 void ExecStreamGraphImpl::interposeStream(
00419     std::string name,
00420     uint iOutput,
00421     ExecStreamId interposedId)
00422 {
00423     SharedExecStream pLastStream = findLastStream(name, iOutput);
00424     permAssert(pLastStream.get());
00425     streamOutMap[std::make_pair(name, iOutput)] = interposedId;
00426     addDataflow(
00427         pLastStream->getStreamId(),
00428         interposedId,
00429         false);
00430 }
00432 void ExecStreamGraphImpl::sortStreams()
00433 {
00434     std::vector<Vertex> sortedVertices;
00435     boost::topological_sort(
00436         graphRep,std::back_inserter(sortedVertices));
00437     sortedStreams.resize(sortedVertices.size());
00439     // boost::topological_sort produces an ordering from consumers to
00440     // producers, but we want the oppposite ordering, hence
00441     // sortedStreams.rbegin() below
00442     std::transform(
00443         sortedVertices.begin(),
00444         sortedVertices.end(),
00445         sortedStreams.rbegin(),
00446         boost::bind(&ExecStreamGraphImpl::getStreamFromVertex,this,_1));
00448     // now filter out the null vertices representing inputs and outputs
00449     sortedStreams.erase(
00450         std::remove(
00451             sortedStreams.begin(),sortedStreams.end(),SharedExecStream()),
00452         sortedStreams.end());
00453 }
00455 void ExecStreamGraphImpl::prepare(ExecStreamScheduler &scheduler)
00456 {
00457     isPrepared = true;
00458     sortStreams();
00460     // create buffer accessors for all explicit dataflow edges
00461     EdgeIterPair edges = boost::edges(filteredGraph);
00462     for (; edges.first != edges.second; edges.first++) {
00463         SharedExecStreamBufAccessor pBufAccessor = scheduler.newBufAccessor();
00464         boost::put(boost::edge_data,graphRep,*(edges.first),pBufAccessor);
00465     }
00467     // bind buffer accessors to streams
00468     std::for_each(
00469         sortedStreams.begin(),
00470         sortedStreams.end(),
00471         boost::bind(
00472             &ExecStreamGraphImpl::bindStreamBufAccessors,this,_1));
00473 }
00475 void ExecStreamGraphImpl::bindStreamBufAccessors(SharedExecStream pStream)
00476 {
00477     std::vector<SharedExecStreamBufAccessor> bufAccessors;
00479     // bind the input buffers (explicit dataflow only)
00480     InEdgeIterPair inEdges = boost::in_edges(
00481         pStream->getStreamId(),filteredGraph);
00482     for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00483         SharedExecStreamBufAccessor pBufAccessor =
00484             getSharedBufAccessorFromEdge(*(inEdges.first));
00485         bufAccessors.push_back(pBufAccessor);
00486     }
00487     pStream->setInputBufAccessors(bufAccessors);
00488     bufAccessors.clear();
00490     // bind the output buffers (explicit dataflow only)
00491     OutEdgeIterPair outEdges = boost::out_edges(
00492         pStream->getStreamId(),filteredGraph);
00493     for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00494         SharedExecStreamBufAccessor pBufAccessor =
00495             getSharedBufAccessorFromEdge(*(outEdges.first));
00496         bufAccessors.push_back(pBufAccessor);
00497         pBufAccessor->setProvision(pStream->getOutputBufProvision());
00498     }
00499     pStream->setOutputBufAccessors(bufAccessors);
00500 }
00502 void ExecStreamGraphImpl::open()
00503 {
00504     permAssert(!isOpen);
00505     isOpen = true;
00506     needsClose = true;
00508     // clear all buffer accessors
00509     EdgeIterPair edges = boost::edges(filteredGraph);
00510     for (; edges.first != edges.second; edges.first++) {
00511         ExecStreamBufAccessor &bufAccessor =
00512             getBufAccessorFromEdge(*(edges.first));
00513         bufAccessor.clear();
00514     }
00516     // open streams in dataflow order (from producers to consumers)
00517     if (sortedStreams.empty()) {
00518         // in case removeStream() was called after prepare
00519         sortStreams();
00520     }
00521     std::for_each(
00522         sortedStreams.begin(),
00523         sortedStreams.end(),
00524         boost::bind(&ExecStreamGraphImpl::openStream,this,_1));
00525 }
00527 void ExecStreamGraphImpl::openStream(SharedExecStream pStream)
00528 {
00529     if (pErrorTarget) {
00530         pStream->initErrorSource(pErrorTarget, pStream->getName());
00531     }
00532     pStream->open(false);
00533 }
00535 void ExecStreamGraphImpl::closeImpl()
00536 {
00537     isOpen = false;
00538     if (sortedStreams.empty()) {
00539         // in case prepare was never called
00540         sortStreams();
00541     }
00542     if (doDataflowClose) {
00543         std::for_each(
00544             sortedStreams.begin(),
00545             sortedStreams.end(),
00546             boost::bind(&ClosableObject::close,_1));
00547     } else {
00548         std::for_each(
00549             sortedStreams.rbegin(),
00550             sortedStreams.rend(),
00551             boost::bind(&ClosableObject::close,_1));
00552     }
00553     pDynamicParamManager->deleteAllParams();
00554     SharedExecStreamGovernor pGov = getResourceGovernor();
00555     if (pGov) {
00556         pGov->returnResources(*this);
00557     }
00558     pTxn.reset();
00560     // release any scratch memory
00561     if (pScratchSegment) {
00562         pScratchSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID);
00563     }
00564 }
00566 SharedExecStream ExecStreamGraphImpl::getStream(ExecStreamId id)
00567 {
00568     Vertex v = boost::vertices(graphRep).first[id];
00569     return getStreamFromVertex(v);
00570 }
00572 uint ExecStreamGraphImpl::getInputCount(
00573     ExecStreamId streamId)
00574 {
00575     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00576     return boost::in_degree(streamVertex,filteredGraph);
00577 }
00579 uint ExecStreamGraphImpl::getOutputCount(
00580     ExecStreamId streamId)
00581 {
00582     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00583     return boost::out_degree(streamVertex,filteredGraph);
00584 }
00586 ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getInputEdge(
00587     ExecStreamId streamId,
00588     uint iInput)
00589 {
00590     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00591     InEdgeIter pEdge = boost::in_edges(streamVertex,filteredGraph).first;
00592     for (int i = 0; i < iInput; ++i) {
00593         ++pEdge;
00594     }
00595     return *pEdge;
00596 }
00598 SharedExecStream ExecStreamGraphImpl::getStreamInput(
00599     ExecStreamId streamId,
00600     uint iInput)
00601 {
00602     Edge inputEdge = getInputEdge(streamId, iInput);
00603     Vertex inputVertex = boost::source(inputEdge,graphRep);
00604     return getStreamFromVertex(inputVertex);
00605 }
00607 SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamInputAccessor(
00608     ExecStreamId streamId,
00609     uint iInput)
00610 {
00611     Edge inputEdge = getInputEdge(streamId, iInput);
00612     return getSharedBufAccessorFromEdge(inputEdge);
00613 }
00615 ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getOutputEdge(
00616     ExecStreamId streamId,
00617     uint iOutput)
00618 {
00619     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00620     OutEdgeIter pEdge = boost::out_edges(streamVertex,filteredGraph).first;
00621     for (int i = 0; i < iOutput; ++i) {
00622         ++pEdge;
00623     }
00624     return *pEdge;
00625 }
00627 SharedExecStream ExecStreamGraphImpl::getStreamOutput(
00628     ExecStreamId streamId,
00629     uint iOutput)
00630 {
00631     Edge outputEdge = getOutputEdge(streamId, iOutput);
00632     Vertex outputVertex = boost::target(outputEdge,graphRep);
00633     return getStreamFromVertex(outputVertex);
00634 }
00636 SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamOutputAccessor(
00637     ExecStreamId streamId,
00638     uint iOutput)
00639 {
00640     Edge outputEdge = getOutputEdge(streamId, iOutput);
00641     return getSharedBufAccessorFromEdge(outputEdge);
00642 }
00644 std::vector<SharedExecStream> ExecStreamGraphImpl::getSortedStreams()
00645 {
00646     permAssert(isPrepared);
00647     if (sortedStreams.empty()) {
00648         sortStreams();
00649     }
00650     return sortedStreams;
00651 }
00653 bool ExecStreamGraphImpl::isAcyclic()
00654 {
00655     int numVertices = boost::num_vertices(graphRep);
00657     // if # strong components is < # vertices, then there must be at least
00658     // one cycle
00659     std::vector<int> component(numVertices);
00660     int nStrongComps = boost::strong_components(graphRep, &component[0]);
00661     return (nStrongComps >= numVertices);
00662 }
00664 class ExecStreamGraphImpl::DotGraphRenderer
00665 {
00666 public:
00667     void operator()(std::ostream &out) const
00668     {
00669         out << "graph [bgcolor=gray, rankdir=BT]" << std::endl;
00670         out << "node [shape=record, style=filled, "
00671             << "fillcolor=white, fontsize=10.0]" << std::endl;
00672         out << "edge [fontsize=10.0]" << std::endl;
00673     }
00674 };
00676 class ExecStreamGraphImpl::DotEdgeRenderer
00677 {
00678     ExecStreamGraphImpl &graph;
00679 public:
00680     DotEdgeRenderer(ExecStreamGraphImpl &graphInit)
00681         : graph(graphInit)
00682     {
00683     }
00685     void operator()(
00686         std::ostream &out, ExecStreamGraphImpl::Edge const &edge) const
00687     {
00688         SharedExecStreamBufAccessor pAccessor =
00689             graph.getSharedBufAccessorFromEdge(edge);
00690         int weight = boost::get(
00691             boost::edge_weight, graph.getFullGraphRep(), edge);
00692         out << "[label=\"";
00693         if (pAccessor) {
00694             out << ExecStreamBufState_names[pAccessor->getState()];
00695         }
00696         out << "\"";
00697         if (!weight) {
00698             out << "style=\"dotted\"";
00699         }
00700         out << "]";
00701     }
00702 };
00704 class ExecStreamGraphImpl::DotVertexRenderer
00705 {
00706     ExecStreamGraphImpl &graph;
00707 public:
00708     DotVertexRenderer(ExecStreamGraphImpl &graphInit)
00709         : graph(graphInit)
00710     {
00711     }
00713     void operator()(std::ostream &out, ExecStreamId const &streamId) const
00714     {
00715         SharedExecStream pStream = graph.getStream(streamId);
00716         out << "[label=\"{";
00717         if (pStream) {
00718             out << streamId;
00719             out << "|";
00720             if (dynamic_cast<ScratchBufferExecStream *>(pStream.get())) {
00721                 out << "MEMBUF";
00722             } else {
00723                 Backtrace::writeDemangled(out, typeid(*pStream).name());
00724                 out << "|";
00725                 out << pStream->getName();
00726             }
00727         } else {
00728             out << "SINK";
00729         }
00730         out << "}\"]";
00731     }
00732 };
00734 void ExecStreamGraphImpl::renderGraphviz(std::ostream &dotStream)
00735 {
00736     boost::write_graphviz(
00737         dotStream,
00738         graphRep,
00739         DotVertexRenderer(*this),
00740         DotEdgeRenderer(*this),
00741         DotGraphRenderer());
00742 }
00744 void ExecStreamGraphImpl::closeProducers(ExecStreamId streamId)
00745 {
00746     FgInEdgeIterPair inEdges =
00747         boost::in_edges(streamId, graphRep);
00748     for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00749         Edge edge = *(inEdges.first);
00750         // move streamId upstream
00751         streamId = boost::source(edge,graphRep);
00752         // close the producers of this stream before closing the stream
00753         // itself, but only if it's possible to early close the stream
00754         SharedExecStream pStream = getStreamFromVertex(streamId);
00755         if (!pStream->canEarlyClose()) {
00756             continue;
00757         }
00758         closeProducers(streamId);
00759         pStream->close();
00760     }
00761 }
00763 void ExecStreamGraphImpl::declareDynamicParamWriter(
00764     ExecStreamId streamId,
00765     DynamicParamId dynamicParamId)
00766 {
00767     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00768     info.writerStreamIds.push_back(streamId);
00769 }
00771 void ExecStreamGraphImpl::declareDynamicParamReader(
00772     ExecStreamId streamId,
00773     DynamicParamId dynamicParamId)
00774 {
00775     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00776     info.readerStreamIds.push_back(streamId);
00777 }
00779 const std::vector<ExecStreamId> &ExecStreamGraphImpl::getDynamicParamWriters(
00780     DynamicParamId dynamicParamId)
00781 {
00782     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00783     return info.writerStreamIds;
00784 }
00786 const std::vector<ExecStreamId> &ExecStreamGraphImpl::getDynamicParamReaders(
00787     DynamicParamId dynamicParamId)
00788 {
00789     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00790     return info.readerStreamIds;
00791 }
00793 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraph.cpp#34 $");
00795 // End ExecStreamGraph.cpp

