#include <ExecStreamGraphImpl.h>
Inheritance diagram for ExecStreamGraphImpl:
Public Types | |
typedef boost::adjacency_list< boost::vecS, boost::vecS, boost::bidirectionalS, boost::property< boost::vertex_data_t, SharedExecStream >, boost::property< boost::edge_data_t, SharedExecStreamBufAccessor, boost::property< boost::edge_weight_t, int > > > | FullGraphRep |
typedef boost::graph_traits< FullGraphRep >::vertex_descriptor | Vertex |
typedef boost::graph_traits< FullGraphRep >::edge_descriptor | Edge |
typedef boost::graph_traits< FullGraphRep >::vertex_iterator | FgVertexIter |
typedef boost::graph_traits< FullGraphRep >::edge_iterator | FgEdgeIter |
typedef boost::graph_traits< FullGraphRep >::out_edge_iterator | FgOutEdgeIter |
typedef boost::graph_traits< FullGraphRep >::in_edge_iterator | FgInEdgeIter |
typedef std::pair< FgVertexIter, FgVertexIter > | FgVertexIterPair |
typedef std::pair< FgEdgeIter, FgEdgeIter > | FgEdgeIterPair |
typedef std::pair< FgOutEdgeIter, FgOutEdgeIter > | FgOutEdgeIterPair |
typedef std::pair< FgInEdgeIter, FgInEdgeIter > | FgInEdgeIterPair |
typedef boost::property_map< FullGraphRep, boost::edge_weight_t >::type | EdgeWeightMap |
typedef boost::filtered_graph< FullGraphRep, ExplicitEdgePredicate > | GraphRep |
typedef boost::graph_traits< GraphRep >::vertex_iterator | VertexIter |
typedef boost::graph_traits< GraphRep >::edge_iterator | EdgeIter |
typedef boost::graph_traits< GraphRep >::out_edge_iterator | OutEdgeIter |
typedef boost::graph_traits< GraphRep >::in_edge_iterator | InEdgeIter |
typedef std::pair< VertexIter, VertexIter > | VertexIterPair |
typedef std::pair< EdgeIter, EdgeIter > | EdgeIterPair |
typedef std::pair< OutEdgeIter, OutEdgeIter > | OutEdgeIterPair |
typedef std::pair< InEdgeIter, InEdgeIter > | InEdgeIterPair |
Public Member Functions | |
ExecStreamGraphImpl () | |
virtual | ~ExecStreamGraphImpl () |
GraphRep const & | getGraphRep () |
FullGraphRep const & | getFullGraphRep () |
SharedExecStream | getStreamFromVertex (Vertex) |
SharedExecStreamBufAccessor & | getSharedBufAccessorFromEdge (Edge) |
ExecStreamBufAccessor & | getBufAccessorFromEdge (Edge) |
virtual void | setTxn (SharedLogicalTxn pTxn) |
Sets the transaction within which this graph should execute. | |
virtual void | setErrorTarget (SharedErrorTarget pErrorTarget) |
Sets the ErrorTarget to which this graph's streams should send row errors. | |
virtual void | setScratchSegment (SharedSegment pScratchSegment) |
Sets the ScratchSegment from which this graph's streams should allocate memory buffers. | |
virtual void | setResourceGovernor (SharedExecStreamGovernor pResourceGovernor) |
Sets the global exec stream governor. | |
virtual SharedLogicalTxn | getTxn () |
| |
virtual TxnId | getTxnId () |
| |
virtual void | enableDummyTxnId (bool enabled) |
Controls whether it is OK to call getTxnId without first calling setTxn. | |
virtual SharedExecStreamGovernor | getResourceGovernor () |
| |
virtual void | prepare (ExecStreamScheduler &scheduler) |
Prepares this graph for execution. | |
virtual void | open () |
Opens execution on this graph. | |
virtual void | addStream (SharedExecStream pStream) |
Adds a stream to this graph. | |
virtual void | removeStream (ExecStreamId) |
Removes a stream from the graph: deletes the edges, and puts the vertex on a free list to be reallocated. | |
virtual void | addDataflow (ExecStreamId producerId, ExecStreamId consumerId, bool isImplicit=false) |
Defines a dataflow relationship between two streams in this graph. | |
virtual void | addOutputDataflow (ExecStreamId producerId) |
Defines a dataflow representing external output produced by this graph. | |
virtual void | addInputDataflow (ExecStreamId consumerId) |
Defines a dataflow representing external input consumed by this graph. | |
virtual void | mergeFrom (ExecStreamGraph &src) |
Adds all the vertices and edges from another graph. | |
virtual void | mergeFrom (ExecStreamGraph &src, std::vector< ExecStreamId > const &nodes) |
Adds a subgraph, taken (removed) from another graph. | |
virtual SharedExecStream | findStream (std::string name) |
Finds a stream by name. | |
virtual SharedExecStream | findLastStream (std::string name, uint iOutput) |
Finds last stream known for name. | |
virtual void | interposeStream (std::string name, uint iOutput, ExecStreamId interposedId) |
Interposes an adapter stream. | |
virtual SharedExecStream | getStream (ExecStreamId id) |
Translates a stream ID to a stream pointer. | |
virtual uint | getInputCount (ExecStreamId streamId) |
Determines number of explicit input flows consumed by a stream. | |
virtual uint | getOutputCount (ExecStreamId streamId) |
Determines number of explicit output flows produced by a stream. | |
virtual SharedExecStream | getStreamInput (ExecStreamId streamId, uint iInput) |
Accesses a stream's input. | |
virtual SharedExecStreamBufAccessor | getStreamInputAccessor (ExecStreamId streamId, uint iInput) |
Accesses a stream's input accessor. | |
virtual SharedExecStream | getStreamOutput (ExecStreamId streamId, uint iOutput) |
Accesses a stream's output. | |
virtual SharedExecStreamBufAccessor | getStreamOutputAccessor (ExecStreamId streamId, uint iOutput) |
Accesses a stream's output accessor. | |
virtual std::vector< SharedExecStream > | getSortedStreams () |
Gets streams, sorted topologically. | |
virtual int | getStreamCount () |
| |
virtual int | getDataflowCount () |
| |
virtual void | renderGraphviz (std::ostream &dotStream) |
Renders the graph in the .dot format defined by Graphviz. | |
virtual bool | isAcyclic () |
| |
virtual void | closeProducers (ExecStreamId streamId) |
Closes the producers of a stream with a given id. | |
virtual void | declareDynamicParamWriter (ExecStreamId streamId, DynamicParamId dynamicParamId) |
Declares that a given stream writes a given dynamic parameter. | |
virtual void | declareDynamicParamReader (ExecStreamId streamId, DynamicParamId dynamicParamId) |
Declares that a given stream reads a given dynamic parameter. | |
virtual const std::vector< ExecStreamId > & | getDynamicParamWriters (DynamicParamId dynamicParamId) |
Returns a list of stream ids that write a given dynamic parameter. | |
virtual const std::vector< ExecStreamId > & | getDynamicParamReaders (DynamicParamId dynamicParamId) |
Returns a list of stream ids that read a given dynamic parameter. | |
ExecStreamScheduler * | getScheduler () const |
| |
SharedDynamicParamManager | getDynamicParamManager () |
| |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
Static Public Member Functions | |
static SharedExecStreamGraph | newExecStreamGraph () |
Constructs a new ExecStreamGraph. | |
Protected Types | |
typedef std::map< std::string, ExecStreamId > | StreamMap |
typedef StreamMap::const_iterator | StreamMapConstIter |
typedef std::map< std::pair< std::string, uint >, ExecStreamId > | EdgeMap |
Protected Member Functions | |
virtual void | closeImpl () |
Must be implemented by derived class to release any resources. | |
virtual void | sortStreams () |
virtual void | openStream (SharedExecStream pStream) |
virtual void | bindStreamBufAccessors (SharedExecStream pStream) |
virtual void | mergeFrom (ExecStreamGraphImpl &src) |
virtual void | mergeFrom (ExecStreamGraphImpl &src, std::vector< ExecStreamId > const &nodes) |
virtual void | clear () |
frees all nodes and edges: like removeStream() on all streams, but faster | |
virtual Vertex | addVertex (SharedExecStream pStream) |
adds a node | |
Vertex | newVertex () |
| |
void | freeVertex (Vertex) |
releases a Vertex to the free list | |
void | removeFromStreamOutMap (SharedExecStream) |
removes a stream from streamOutMap | |
virtual Edge | getInputEdge (ExecStreamId stream, uint iInput) |
virtual Edge | getOutputEdge (ExecStreamId stream, uint iOutput) |
Protected Attributes | |
FullGraphRep | graphRep |
GraphRep | filteredGraph |
std::vector< Vertex > | freeVertices |
List of freed vertices. | |
StreamMap | streamMap |
Map of name to stream. | |
EdgeMap | streamOutMap |
Map of name and output arc to stream output, after add-ons. | |
std::vector< SharedExecStream > | sortedStreams |
Result of topologically sorting graph (producers before consumers). | |
SharedLogicalTxn | pTxn |
Transaction being executed. | |
SharedErrorTarget | pErrorTarget |
Target for row errors. | |
SharedSegment | pScratchSegment |
Source for scratch buffers. | |
SharedExecStreamGovernor | pResourceGovernor |
Resource governor. | |
bool | isOpen |
Whether this graph is currently open. | |
bool | isPrepared |
Whether this graph has been prepared. | |
bool | doDataflowClose |
Whether to close this graph in dataflow order (producers to consumers). | |
std::map< DynamicParamId, DynamicParamInfo > | dynamicParamMap |
Information on readers and writers of dynamic parameters. | |
bool | allowDummyTxnId |
Whether to allow execution without a real transaction. | |
ExecStreamScheduler * | pScheduler |
A Scheduler responsible for executing streams in this graph. | |
SharedDynamicParamManager | pDynamicParamManager |
Manager that handles dynamic parameters for this graph. | |
bool | needsClose |
Classes | |
class | DotEdgeRenderer |
class | DotGraphRenderer |
class | DotVertexRenderer |
class | DynamicParamInfo |
struct | ExplicitEdgePredicate |
Definition at line 50 of file ExecStreamGraphImpl.h.
typedef boost::adjacency_list< boost::vecS, boost::vecS, boost::bidirectionalS, boost::property<boost::vertex_data_t,SharedExecStream>, boost::property< boost::edge_data_t,SharedExecStreamBufAccessor, boost::property<boost::edge_weight_t,int> > > ExecStreamGraphImpl::FullGraphRep |
Definition at line 62 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<FullGraphRep>::vertex_descriptor ExecStreamGraphImpl::Vertex |
Definition at line 64 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<FullGraphRep>::edge_descriptor ExecStreamGraphImpl::Edge |
Definition at line 65 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<FullGraphRep>::vertex_iterator ExecStreamGraphImpl::FgVertexIter |
Definition at line 67 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<FullGraphRep>::edge_iterator ExecStreamGraphImpl::FgEdgeIter |
Definition at line 68 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<FullGraphRep>::out_edge_iterator ExecStreamGraphImpl::FgOutEdgeIter |
Definition at line 69 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<FullGraphRep>::in_edge_iterator ExecStreamGraphImpl::FgInEdgeIter |
Definition at line 70 of file ExecStreamGraphImpl.h.
typedef std::pair<FgVertexIter,FgVertexIter> ExecStreamGraphImpl::FgVertexIterPair |
Definition at line 71 of file ExecStreamGraphImpl.h.
typedef std::pair<FgEdgeIter,FgEdgeIter> ExecStreamGraphImpl::FgEdgeIterPair |
Definition at line 72 of file ExecStreamGraphImpl.h.
typedef std::pair<FgOutEdgeIter,FgOutEdgeIter> ExecStreamGraphImpl::FgOutEdgeIterPair |
Definition at line 73 of file ExecStreamGraphImpl.h.
typedef std::pair<FgInEdgeIter,FgInEdgeIter> ExecStreamGraphImpl::FgInEdgeIterPair |
Definition at line 74 of file ExecStreamGraphImpl.h.
typedef boost::property_map<FullGraphRep, boost::edge_weight_t>::type ExecStreamGraphImpl::EdgeWeightMap |
Definition at line 77 of file ExecStreamGraphImpl.h.
typedef boost::filtered_graph<FullGraphRep, ExplicitEdgePredicate> ExecStreamGraphImpl::GraphRep |
Definition at line 102 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<GraphRep>::vertex_iterator ExecStreamGraphImpl::VertexIter |
Definition at line 103 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<GraphRep>::edge_iterator ExecStreamGraphImpl::EdgeIter |
Definition at line 104 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<GraphRep>::out_edge_iterator ExecStreamGraphImpl::OutEdgeIter |
Definition at line 105 of file ExecStreamGraphImpl.h.
typedef boost::graph_traits<GraphRep>::in_edge_iterator ExecStreamGraphImpl::InEdgeIter |
Definition at line 106 of file ExecStreamGraphImpl.h.
typedef std::pair<VertexIter,VertexIter> ExecStreamGraphImpl::VertexIterPair |
Definition at line 107 of file ExecStreamGraphImpl.h.
typedef std::pair<EdgeIter,EdgeIter> ExecStreamGraphImpl::EdgeIterPair |
Definition at line 108 of file ExecStreamGraphImpl.h.
typedef std::pair<OutEdgeIter,OutEdgeIter> ExecStreamGraphImpl::OutEdgeIterPair |
Definition at line 109 of file ExecStreamGraphImpl.h.
typedef std::pair<InEdgeIter,InEdgeIter> ExecStreamGraphImpl::InEdgeIterPair |
Definition at line 110 of file ExecStreamGraphImpl.h.
typedef std::map<std::string,ExecStreamId> ExecStreamGraphImpl::StreamMap [protected] |
Definition at line 126 of file ExecStreamGraphImpl.h.
typedef StreamMap::const_iterator ExecStreamGraphImpl::StreamMapConstIter [protected] |
Definition at line 127 of file ExecStreamGraphImpl.h.
typedef std::map<std::pair<std::string, uint>,ExecStreamId> ExecStreamGraphImpl::EdgeMap [protected] |
Definition at line 128 of file ExecStreamGraphImpl.h.
ExecStreamGraphImpl::ExecStreamGraphImpl | ( | ) | [explicit] |
Definition at line 60 of file ExecStreamGraph.cpp.
References allowDummyTxnId, doDataflowClose, isOpen, and isPrepared.
00061 : filteredGraph( 00062 graphRep, 00063 boost::get(boost::edge_weight, graphRep)) 00064 { 00065 isPrepared = false; 00066 isOpen = false; 00067 doDataflowClose = false; 00068 allowDummyTxnId = false; 00069 }
virtual ExecStreamGraphImpl::~ExecStreamGraphImpl | ( | ) | [inline, virtual] |
void ExecStreamGraphImpl::closeImpl | ( | ) | [protected, virtual] |
Must be implemented by derived class to release any resources.
Implements ClosableObject.
Definition at line 535 of file ExecStreamGraph.cpp.
References ClosableObject::close(), doDataflowClose, getResourceGovernor(), isOpen, NULL_PAGE_ID, ExecStreamGraph::pDynamicParamManager, pScratchSegment, pTxn, sortedStreams, and sortStreams().
00536 { 00537 isOpen = false; 00538 if (sortedStreams.empty()) { 00539 // in case prepare was never called 00540 sortStreams(); 00541 } 00542 if (doDataflowClose) { 00543 std::for_each( 00544 sortedStreams.begin(), 00545 sortedStreams.end(), 00546 boost::bind(&ClosableObject::close,_1)); 00547 } else { 00548 std::for_each( 00549 sortedStreams.rbegin(), 00550 sortedStreams.rend(), 00551 boost::bind(&ClosableObject::close,_1)); 00552 } 00553 pDynamicParamManager->deleteAllParams(); 00554 SharedExecStreamGovernor pGov = getResourceGovernor(); 00555 if (pGov) { 00556 pGov->returnResources(*this); 00557 } 00558 pTxn.reset(); 00559 00560 // release any scratch memory 00561 if (pScratchSegment) { 00562 pScratchSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID); 00563 } 00564 }
void ExecStreamGraphImpl::sortStreams | ( | ) | [protected, virtual] |
Definition at line 432 of file ExecStreamGraph.cpp.
References getStreamFromVertex(), graphRep, and sortedStreams.
Referenced by closeImpl(), getSortedStreams(), open(), and prepare().
00433 { 00434 std::vector<Vertex> sortedVertices; 00435 boost::topological_sort( 00436 graphRep,std::back_inserter(sortedVertices)); 00437 sortedStreams.resize(sortedVertices.size()); 00438 00439 // boost::topological_sort produces an ordering from consumers to 00440 // producers, but we want the oppposite ordering, hence 00441 // sortedStreams.rbegin() below 00442 std::transform( 00443 sortedVertices.begin(), 00444 sortedVertices.end(), 00445 sortedStreams.rbegin(), 00446 boost::bind(&ExecStreamGraphImpl::getStreamFromVertex,this,_1)); 00447 00448 // now filter out the null vertices representing inputs and outputs 00449 sortedStreams.erase( 00450 std::remove( 00451 sortedStreams.begin(),sortedStreams.end(),SharedExecStream()), 00452 sortedStreams.end()); 00453 }
void ExecStreamGraphImpl::openStream | ( | SharedExecStream | pStream | ) | [protected, virtual] |
Definition at line 527 of file ExecStreamGraph.cpp.
References pErrorTarget.
Referenced by open().
00528 { 00529 if (pErrorTarget) { 00530 pStream->initErrorSource(pErrorTarget, pStream->getName()); 00531 } 00532 pStream->open(false); 00533 }
void ExecStreamGraphImpl::bindStreamBufAccessors | ( | SharedExecStream | pStream | ) | [protected, virtual] |
Definition at line 475 of file ExecStreamGraph.cpp.
References filteredGraph, and getSharedBufAccessorFromEdge().
Referenced by prepare().
00476 { 00477 std::vector<SharedExecStreamBufAccessor> bufAccessors; 00478 00479 // bind the input buffers (explicit dataflow only) 00480 InEdgeIterPair inEdges = boost::in_edges( 00481 pStream->getStreamId(),filteredGraph); 00482 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00483 SharedExecStreamBufAccessor pBufAccessor = 00484 getSharedBufAccessorFromEdge(*(inEdges.first)); 00485 bufAccessors.push_back(pBufAccessor); 00486 } 00487 pStream->setInputBufAccessors(bufAccessors); 00488 bufAccessors.clear(); 00489 00490 // bind the output buffers (explicit dataflow only) 00491 OutEdgeIterPair outEdges = boost::out_edges( 00492 pStream->getStreamId(),filteredGraph); 00493 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00494 SharedExecStreamBufAccessor pBufAccessor = 00495 getSharedBufAccessorFromEdge(*(outEdges.first)); 00496 bufAccessors.push_back(pBufAccessor); 00497 pBufAccessor->setProvision(pStream->getOutputBufProvision()); 00498 } 00499 pStream->setOutputBufAccessors(bufAccessors); 00500 }
void ExecStreamGraphImpl::mergeFrom | ( | ExecStreamGraphImpl & | src | ) | [protected, virtual] |
Definition at line 282 of file ExecStreamGraph.cpp.
References addVertex(), clear(), getSharedBufAccessorFromEdge(), getStreamFromVertex(), graphRep, isOpen, isPrepared, and sortedStreams.
Referenced by mergeFrom().
00283 { 00284 // Since the identity of the added graph SRC will be lost, at this time both 00285 // graphs must be prepared, and must both be open or both be closed. 00286 permAssert(isPrepared && src.isPrepared); 00287 permAssert(isOpen == src.isOpen); 00288 00289 // map a source vertex ID to the ID of the copied target vertex 00290 std::map<Vertex, Vertex> vmap; 00291 00292 // copy the nodes (with attached streams) 00293 FgVertexIterPair verts = boost::vertices(src.graphRep); 00294 for (; verts.first != verts.second; ++verts.first) { 00295 Vertex vsrc = *verts.first; 00296 SharedExecStream pStream = src.getStreamFromVertex(vsrc); 00297 Vertex vnew = addVertex(pStream); 00298 vmap[vsrc] = vnew; 00299 } 00300 00301 // copy the edges (with attached buffers, which stay bound to the adjacent 00302 // streams) 00303 FgEdgeIterPair edges = boost::edges(src.graphRep); 00304 for (; edges.first != edges.second; ++edges.first) { 00305 Edge esrc = *edges.first; 00306 SharedExecStreamBufAccessor pBuf = 00307 src.getSharedBufAccessorFromEdge(esrc); 00308 std::pair<Edge, bool> x = boost::add_edge( 00309 vmap[boost::source(esrc, src.graphRep)], // image of source node 00310 vmap[boost::target(esrc, src.graphRep)], // image of target node 00311 pBuf, 00312 graphRep); 00313 boost::put( 00314 boost::edge_weight, 00315 graphRep, 00316 x.first, 00317 boost::get(boost::edge_weight, src.graphRep, esrc)); 00318 assert(x.second); 00319 } 00320 src.clear(); // source is empty 00321 sortedStreams.clear(); // invalid now 00322 }
void ExecStreamGraphImpl::mergeFrom | ( | ExecStreamGraphImpl & | src, | |
std::vector< ExecStreamId > const & | nodes | |||
) | [protected, virtual] |
Definition at line 325 of file ExecStreamGraph.cpp.
References addVertex(), getSharedBufAccessorFromEdge(), getStreamFromVertex(), graphRep, isOpen, and isPrepared.
00328 { 00329 // both graphs must be prepared, and must both be open or both be closed. 00330 permAssert(isPrepared && src.isPrepared); 00331 permAssert(isOpen == src.isOpen); 00332 00333 // map a source vertex ID to the ID of the copied target vertex 00334 std::map<Vertex, Vertex> vmap; 00335 00336 // Copy the nodes (with attached streams) 00337 int nnodes = nodes.size(); 00338 for (int i = 0; i < nnodes; i++) { 00339 Vertex vsrc = boost::vertices(src.graphRep).first[nodes[i]]; 00340 SharedExecStream pStream = src.getStreamFromVertex(vsrc); 00341 Vertex vnew = addVertex(pStream); 00342 vmap[vsrc] = vnew; 00343 } 00344 00345 // Copy the internal edges (with attached buffers, which stay bound to the 00346 // adjacent streams). It suffices to scan the outbound edges. The external 00347 // edges are abandoned. 00348 if (nnodes > 1) { // (when only 1 node, no internal edges) 00349 for (int i = 0; i < nnodes; i++) { 00350 // Find all outbound edges E (U,V) in the source subgraph 00351 Vertex u = boost::vertices(src.graphRep).first[nodes[i]]; 00352 for (FgOutEdgeIterPair edges = boost::out_edges(u, src.graphRep); 00353 edges.first != edges.second; 00354 ++edges.first) 00355 { 00356 // an edge e (u, v) in the source graph 00357 Edge e = *edges.first; 00358 assert(u == boost::source(e, src.graphRep)); 00359 Vertex v = boost::target(e, src.graphRep); 00360 // V is in the subgraph iff v is a key in the map vmap[] 00361 if (vmap.find(v) != vmap.end()) { 00362 SharedExecStreamBufAccessor pBuf = 00363 src.getSharedBufAccessorFromEdge(e); 00364 std::pair<Edge, bool> x = 00365 boost::add_edge( 00366 vmap[u], 00367 vmap[v], 00368 pBuf, 00369 graphRep); 00370 assert(x.second); 00371 boost::put( 00372 boost::edge_weight, 00373 graphRep, 00374 x.first, 00375 boost::get(boost::edge_weight, src.graphRep, e)); 00376 } 00377 } 00378 } 00379 } 00380 00381 // delete the copied subgraph from SRC 00382 for (int i = 0; i < nnodes; i++) { 00383 Vertex v = boost::vertices(src.graphRep).first[nodes[i]]; 00384 SharedExecStream pStream = src.getStreamFromVertex(v); 00385 src.streamMap.erase(pStream->getName()); 00386 src.removeFromStreamOutMap(pStream); 00387 src.freeVertex(v); 00388 } 00389 src.sortedStreams.clear(); // invalidate 00390 sortedStreams.clear(); // invalidate 00391 }
void ExecStreamGraphImpl::clear | ( | ) | [protected, virtual] |
frees all nodes and edges: like removeStream() on all streams, but faster
Definition at line 201 of file ExecStreamGraph.cpp.
References freeVertex(), graphRep, isOpen, isPrepared, ClosableObject::needsClose, sortedStreams, streamMap, and streamOutMap.
Referenced by mergeFrom().
00202 { 00203 FgVertexIterPair verts = boost::vertices(graphRep); 00204 while (verts.first != verts.second) { 00205 Vertex v = *verts.first; 00206 freeVertex(v); 00207 ++verts.first; 00208 } 00209 00210 streamMap.clear(); 00211 streamOutMap.clear(); 00212 sortedStreams.clear(); 00213 needsClose = isOpen = isPrepared = false; 00214 }
ExecStreamGraphImpl::Vertex ExecStreamGraphImpl::addVertex | ( | SharedExecStream | pStream | ) | [protected, virtual] |
adds a node
Definition at line 140 of file ExecStreamGraph.cpp.
References findStream(), graphRep, newVertex(), streamMap, and boost::vertex_data.
Referenced by addStream(), and mergeFrom().
00141 { 00142 Vertex v = newVertex(); 00143 boost::put(boost::vertex_data, graphRep, v, pStream); 00144 if (pStream) { 00145 // Note that pStream can be null for an exterior node in a farrago 00146 // graph. Guard against duplicating a stream name. 00147 const std::string& name = pStream->getName(); 00148 if (name.length() == 0) { 00149 permFail("cannot add nameless stream to graph " << this); 00150 } 00151 if (findStream(name)) { 00152 permFail("cannot add stream " << name << " to graph " << this); 00153 } 00154 pStream->id = v; 00155 pStream->pGraph = this; 00156 streamMap[name] = pStream->getStreamId(); 00157 } 00158 return v; 00159 }
ExecStreamGraphImpl::Vertex ExecStreamGraphImpl::newVertex | ( | ) | [protected] |
Definition at line 117 of file ExecStreamGraph.cpp.
References freeVertices, and graphRep.
Referenced by addInputDataflow(), addOutputDataflow(), and addVertex().
00118 { 00119 if (freeVertices.size() > 0) { 00120 Vertex ret = freeVertices.back(); 00121 freeVertices.pop_back(); 00122 return ret; 00123 } 00124 return boost::add_vertex(graphRep); 00125 }
void ExecStreamGraphImpl::freeVertex | ( | Vertex | ) | [protected] |
releases a Vertex to the free list
Definition at line 127 of file ExecStreamGraph.cpp.
References freeVertices, graphRep, and boost::vertex_data.
Referenced by clear(), and removeStream().
00128 { 00129 boost::clear_vertex(v, graphRep); 00130 boost::get(boost::vertex_data, graphRep)[v].reset(); 00131 freeVertices.push_back(v); 00132 }
void ExecStreamGraphImpl::removeFromStreamOutMap | ( | SharedExecStream | ) | [protected] |
removes a stream from streamOutMap
Definition at line 183 of file ExecStreamGraph.cpp.
References getOutputCount(), and streamOutMap.
Referenced by removeStream().
00184 { 00185 int outCt = getOutputCount(p->getStreamId()); 00186 if (outCt > 0) { 00187 std::string name = p->getName(); 00188 // assumes map key pairs <name, index> sort lexicographically, so 00189 // <name, *> is contiguous. 00190 EdgeMap::iterator startNameRange = 00191 streamOutMap.find(std::make_pair(name, 0)); 00192 EdgeMap::iterator endNameRange = 00193 streamOutMap.find(std::make_pair(name, outCt - 1)); 00194 streamOutMap.erase(startNameRange, endNameRange); 00195 } 00196 }
ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getInputEdge | ( | ExecStreamId | stream, | |
uint | iInput | |||
) | [protected, virtual] |
Definition at line 586 of file ExecStreamGraph.cpp.
References filteredGraph, and graphRep.
Referenced by getStreamInput(), and getStreamInputAccessor().
00589 { 00590 Vertex streamVertex = boost::vertices(graphRep).first[streamId]; 00591 InEdgeIter pEdge = boost::in_edges(streamVertex,filteredGraph).first; 00592 for (int i = 0; i < iInput; ++i) { 00593 ++pEdge; 00594 } 00595 return *pEdge; 00596 }
ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getOutputEdge | ( | ExecStreamId | stream, | |
uint | iOutput | |||
) | [protected, virtual] |
Definition at line 615 of file ExecStreamGraph.cpp.
References filteredGraph, and graphRep.
Referenced by getStreamOutput(), and getStreamOutputAccessor().
00618 { 00619 Vertex streamVertex = boost::vertices(graphRep).first[streamId]; 00620 OutEdgeIter pEdge = boost::out_edges(streamVertex,filteredGraph).first; 00621 for (int i = 0; i < iOutput; ++i) { 00622 ++pEdge; 00623 } 00624 return *pEdge; 00625 }
ExecStreamGraphImpl::GraphRep const & ExecStreamGraphImpl::getGraphRep | ( | ) | [inline] |
Definition at line 319 of file ExecStreamGraphImpl.h.
References filteredGraph.
Referenced by ParallelExecStreamScheduler::alterNeighborInhibition(), ParallelExecStreamScheduler::processCompletedTask(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ParallelExecStreamScheduler::start(), and ExecStreamScheduler::traceStreamBuffers().
00320 { 00321 return filteredGraph; 00322 }
ExecStreamGraphImpl::FullGraphRep const & ExecStreamGraphImpl::getFullGraphRep | ( | ) | [inline] |
Definition at line 325 of file ExecStreamGraphImpl.h.
References graphRep.
Referenced by ExecStreamGraphImpl::DotEdgeRenderer::operator()().
00326 { 00327 return graphRep; 00328 }
SharedExecStream ExecStreamGraphImpl::getStreamFromVertex | ( | Vertex | ) | [inline] |
Definition at line 330 of file ExecStreamGraphImpl.h.
References graphRep, and boost::vertex_data.
Referenced by ParallelExecStreamScheduler::addToQueue(), closeProducers(), findLastStream(), DfsTreeExecStreamScheduler::findNextConsumer(), findStream(), getStream(), getStreamInput(), getStreamOutput(), mergeFrom(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), removeStream(), sortStreams(), and ParallelExecStreamScheduler::start().
00332 { 00333 return boost::get(boost::vertex_data,graphRep)[vertex]; 00334 }
SharedExecStreamBufAccessor & ExecStreamGraphImpl::getSharedBufAccessorFromEdge | ( | Edge | ) | [inline] |
Definition at line 337 of file ExecStreamGraphImpl.h.
References boost::edge_data, and graphRep.
Referenced by bindStreamBufAccessors(), getBufAccessorFromEdge(), getStreamInputAccessor(), getStreamOutputAccessor(), mergeFrom(), and ExecStreamGraphImpl::DotEdgeRenderer::operator()().
00339 { 00340 return boost::get(boost::edge_data,graphRep)[edge]; 00341 }
ExecStreamBufAccessor & ExecStreamGraphImpl::getBufAccessorFromEdge | ( | Edge | ) | [inline] |
Definition at line 343 of file ExecStreamGraphImpl.h.
References getSharedBufAccessorFromEdge().
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), open(), ParallelExecStreamScheduler::processCompletedTask(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ParallelExecStreamScheduler::start(), and ExecStreamScheduler::traceStreamBuffers().
00345 { 00346 return *(getSharedBufAccessorFromEdge(edge)); 00347 }
void ExecStreamGraphImpl::setTxn | ( | SharedLogicalTxn | pTxn | ) | [virtual] |
Sets the transaction within which this graph should execute.
The transaction is reset whenever the graph is closed.
pTxn | transaction |
Implements ExecStreamGraph.
Definition at line 71 of file ExecStreamGraph.cpp.
References pTxn.
00072 { 00073 pTxn = pTxnInit; 00074 }
void ExecStreamGraphImpl::setErrorTarget | ( | SharedErrorTarget | pErrorTarget | ) | [virtual] |
Sets the ErrorTarget to which this graph's streams should send row errors.
pErrorTarget | error target |
Implements ExecStreamGraph.
Definition at line 76 of file ExecStreamGraph.cpp.
References pErrorTarget.
00077 { 00078 pErrorTarget = pErrorTargetInit; 00079 }
void ExecStreamGraphImpl::setScratchSegment | ( | SharedSegment | pScratchSegment | ) | [virtual] |
Sets the ScratchSegment from which this graph's streams should allocate memory buffers.
pScratchSegment | scratch segment |
Implements ExecStreamGraph.
Definition at line 81 of file ExecStreamGraph.cpp.
References pScratchSegment.
00083 { 00084 pScratchSegment = pScratchSegmentInit; 00085 }
void ExecStreamGraphImpl::setResourceGovernor | ( | SharedExecStreamGovernor | pResourceGovernor | ) | [virtual] |
Sets the global exec stream governor.
pResourceGovernor | exec stream governor |
Implements ExecStreamGraph.
Definition at line 87 of file ExecStreamGraph.cpp.
References pResourceGovernor.
00089 { 00090 pResourceGovernor = pResourceGovernorInit; 00091 }
SharedLogicalTxn ExecStreamGraphImpl::getTxn | ( | ) | [virtual] |
Implements ExecStreamGraph.
Definition at line 93 of file ExecStreamGraph.cpp.
References pTxn.
00094 { 00095 return pTxn; 00096 }
TxnId ExecStreamGraphImpl::getTxnId | ( | ) | [virtual] |
Implements ExecStreamGraph.
Definition at line 98 of file ExecStreamGraph.cpp.
References allowDummyTxnId, FIRST_TXN_ID, and pTxn.
00099 { 00100 if (pTxn) { 00101 return pTxn->getTxnId(); 00102 } 00103 assert(allowDummyTxnId); 00104 return FIRST_TXN_ID; 00105 }
void ExecStreamGraphImpl::enableDummyTxnId | ( | bool | enabled | ) | [virtual] |
Controls whether it is OK to call getTxnId without first calling setTxn.
Normally, this is a bad idea (since in that case getTxnId will return FIRST_TXN_ID as a dummy, which could lead to concurrency problems), but for non-transactional unit tests, this can be useful. Default is disabled.
enabled | whether dummy txn ID's are enabled |
Implements ExecStreamGraph.
Definition at line 107 of file ExecStreamGraph.cpp.
References allowDummyTxnId.
00108 { 00109 allowDummyTxnId = enabled; 00110 }
SharedExecStreamGovernor ExecStreamGraphImpl::getResourceGovernor | ( | ) | [virtual] |
Implements ExecStreamGraph.
Definition at line 112 of file ExecStreamGraph.cpp.
References pResourceGovernor.
Referenced by closeImpl().
00113 { 00114 return pResourceGovernor; 00115 }
void ExecStreamGraphImpl::prepare | ( | ExecStreamScheduler & | scheduler | ) | [virtual] |
Prepares this graph for execution.
Only called once (before first open) after all streams and dataflows have been defined.
scheduler | ExecStreamScheduler which will execute this graph |
Implements ExecStreamGraph.
Definition at line 455 of file ExecStreamGraph.cpp.
References bindStreamBufAccessors(), boost::edge_data, filteredGraph, graphRep, isPrepared, ExecStreamScheduler::newBufAccessor(), sortedStreams, and sortStreams().
00456 { 00457 isPrepared = true; 00458 sortStreams(); 00459 00460 // create buffer accessors for all explicit dataflow edges 00461 EdgeIterPair edges = boost::edges(filteredGraph); 00462 for (; edges.first != edges.second; edges.first++) { 00463 SharedExecStreamBufAccessor pBufAccessor = scheduler.newBufAccessor(); 00464 boost::put(boost::edge_data,graphRep,*(edges.first),pBufAccessor); 00465 } 00466 00467 // bind buffer accessors to streams 00468 std::for_each( 00469 sortedStreams.begin(), 00470 sortedStreams.end(), 00471 boost::bind( 00472 &ExecStreamGraphImpl::bindStreamBufAccessors,this,_1)); 00473 }
void ExecStreamGraphImpl::open | ( | ) | [virtual] |
Opens execution on this graph.
A graph may be repeatedly closed and then reopened.
Implements ExecStreamGraph.
Definition at line 502 of file ExecStreamGraph.cpp.
References ExecStreamBufAccessor::clear(), filteredGraph, getBufAccessorFromEdge(), isOpen, ClosableObject::needsClose, openStream(), sortedStreams, and sortStreams().
00503 { 00504 permAssert(!isOpen); 00505 isOpen = true; 00506 needsClose = true; 00507 00508 // clear all buffer accessors 00509 EdgeIterPair edges = boost::edges(filteredGraph); 00510 for (; edges.first != edges.second; edges.first++) { 00511 ExecStreamBufAccessor &bufAccessor = 00512 getBufAccessorFromEdge(*(edges.first)); 00513 bufAccessor.clear(); 00514 } 00515 00516 // open streams in dataflow order (from producers to consumers) 00517 if (sortedStreams.empty()) { 00518 // in case removeStream() was called after prepare 00519 sortStreams(); 00520 } 00521 std::for_each( 00522 sortedStreams.begin(), 00523 sortedStreams.end(), 00524 boost::bind(&ExecStreamGraphImpl::openStream,this,_1)); 00525 }
void ExecStreamGraphImpl::addStream | ( | SharedExecStream | pStream | ) | [virtual] |
Adds a stream to this graph.
pStream | stream to add |
Implements ExecStreamGraph.
Definition at line 161 of file ExecStreamGraph.cpp.
References addVertex().
00163 { 00164 (void) addVertex(pStream); 00165 }
void ExecStreamGraphImpl::removeStream | ( | ExecStreamId | ) | [virtual] |
Removes a stream from the graph: deletes the edges, and puts the vertex on a free list to be reallocated.
Does not free the ExecStream or its ExecStreamBufAccessors.
Implements ExecStreamGraph.
Definition at line 167 of file ExecStreamGraph.cpp.
References freeVertex(), getStreamFromVertex(), graphRep, removeFromStreamOutMap(), sortedStreams, and streamMap.
00168 { 00169 Vertex v = boost::vertices(graphRep).first[id]; 00170 SharedExecStream pStream = getStreamFromVertex(v); 00171 permAssert(pStream->pGraph == this); 00172 permAssert(pStream->id == id); 00173 00174 streamMap.erase(pStream->getName()); 00175 removeFromStreamOutMap(pStream); 00176 sortedStreams.clear(); // invalidate list: recreated on demand 00177 freeVertex(v); 00178 // stream is now detached from any graph, and not usable. 00179 pStream->pGraph = 0; 00180 pStream->id = 0; 00181 }
void ExecStreamGraphImpl::addDataflow | ( | ExecStreamId | producerId, | |
ExecStreamId | consumerId, | |||
bool | isImplicit = false | |||
) | [virtual] |
Defines a dataflow relationship between two streams in this graph.
producerId | ID of producer stream in this graph | |
consumerId | ID of consumer stream in this graph | |
isImplicit | false (the default) if the edge represents direct dataflow; true if the edge represents an implicit dataflow dependency |
Implements ExecStreamGraph.
Definition at line 216 of file ExecStreamGraph.cpp.
References graphRep.
Referenced by interposeStream().
00220 { 00221 Edge newEdge = 00222 boost::add_edge(producerId, consumerId, graphRep).first; 00223 boost::put( 00224 boost::edge_weight, 00225 graphRep, 00226 newEdge, 00227 isImplicit ? 0 : 1); 00228 }
void ExecStreamGraphImpl::addOutputDataflow | ( | ExecStreamId | producerId | ) | [virtual] |
Defines a dataflow representing external output produced by this graph.
producerId | ID of producer stream in this graph |
Implements ExecStreamGraph.
Definition at line 230 of file ExecStreamGraph.cpp.
References graphRep, and newVertex().
00232 { 00233 Vertex consumerId = newVertex(); 00234 Edge newEdge = 00235 boost::add_edge(producerId, consumerId, graphRep).first; 00236 boost::put( 00237 boost::edge_weight, 00238 graphRep, 00239 newEdge, 00240 1); 00241 }
void ExecStreamGraphImpl::addInputDataflow | ( | ExecStreamId | consumerId | ) | [virtual] |
Defines a dataflow representing external input consumed by this graph.
consumerId | ID of consumer stream in this graph |
Implements ExecStreamGraph.
Definition at line 243 of file ExecStreamGraph.cpp.
References graphRep, and newVertex().
00245 { 00246 Vertex producerId = newVertex(); 00247 Edge newEdge = 00248 boost::add_edge(producerId, consumerId, graphRep).first; 00249 boost::put( 00250 boost::edge_weight, 00251 graphRep, 00252 newEdge, 00253 1); 00254 }
void ExecStreamGraphImpl::mergeFrom | ( | ExecStreamGraph & | src | ) | [virtual] |
Adds all the vertices and edges from another graph.
Assumes the graphs are disjoint, and that both have been prepared. The two graphs are both open, or else both closed.
src | the other graph, which is left empty. |
Implements ExecStreamGraph.
Definition at line 262 of file ExecStreamGraph.cpp.
References mergeFrom().
00263 { 00264 if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) { 00265 mergeFrom(*p); 00266 return; 00267 } 00268 permFail("unknown subtype of ExecStreamGraph"); 00269 }
void ExecStreamGraphImpl::mergeFrom | ( | ExecStreamGraph & | src, | |
std::vector< ExecStreamId > const & | nodes | |||
) | [virtual] |
Adds a subgraph, taken (removed) from another graph.
(Slower than mergeFrom(ExecStreamGraph&), which merges its entire source). Assumes the graphs are disjoint, and that both have been prepared. The two graphs are both open, or else both closed.
src | the source graph | |
nodes | identifies source nodes. |
Implements ExecStreamGraph.
Definition at line 271 of file ExecStreamGraph.cpp.
References mergeFrom().
00274 { 00275 if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) { 00276 mergeFrom(*p, nodes); 00277 return; 00278 } 00279 permFail("unknown subtype of ExecStreamGraph"); 00280 }
SharedExecStream ExecStreamGraphImpl::findStream | ( | std::string | name | ) | [virtual] |
Finds a stream by name.
name | name of stream to find |
Implements ExecStreamGraph.
Definition at line 393 of file ExecStreamGraph.cpp.
References getStreamFromVertex(), and streamMap.
Referenced by addVertex(), and findLastStream().
00395 { 00396 StreamMapConstIter pPair = streamMap.find(name); 00397 if (pPair == streamMap.end()) { 00398 SharedExecStream nullStream; 00399 return nullStream; 00400 } else { 00401 return getStreamFromVertex(pPair->second); 00402 } 00403 }
SharedExecStream ExecStreamGraphImpl::findLastStream | ( | std::string | name, | |
uint | iOutput | |||
) | [virtual] |
Finds last stream known for name.
May be original stream or an adapter.
name | name of stream to find | |
iOutput | ordinal of output arc |
Implements ExecStreamGraph.
Definition at line 405 of file ExecStreamGraph.cpp.
References findStream(), getStreamFromVertex(), and streamOutMap.
Referenced by interposeStream().
00408 { 00409 EdgeMap::const_iterator pPair = 00410 streamOutMap.find(std::make_pair(name, iOutput)); 00411 if (pPair == streamOutMap.end()) { 00412 return findStream(name); 00413 } else { 00414 return getStreamFromVertex(pPair->second); 00415 } 00416 }
void ExecStreamGraphImpl::interposeStream | ( | std::string | name, | |
uint | iOutput, | |||
ExecStreamId | interposedId | |||
) | [virtual] |
Interposes an adapter stream.
In the process, creates a dataflow from last stream associated with name to the adapter stream.
name | name of stream to adapt | |
iOutput | ordinal of output of stream | |
interposedId | ID of adapter stream within this graph |
Implements ExecStreamGraph.
Definition at line 418 of file ExecStreamGraph.cpp.
References addDataflow(), findLastStream(), and streamOutMap.
00422 { 00423 SharedExecStream pLastStream = findLastStream(name, iOutput); 00424 permAssert(pLastStream.get()); 00425 streamOutMap[std::make_pair(name, iOutput)] = interposedId; 00426 addDataflow( 00427 pLastStream->getStreamId(), 00428 interposedId, 00429 false); 00430 }
SharedExecStream ExecStreamGraphImpl::getStream | ( | ExecStreamId | id | ) | [virtual] |
Translates a stream ID to a stream pointer.
id | ID of a stream in this graph |
Implements ExecStreamGraph.
Definition at line 566 of file ExecStreamGraph.cpp.
References getStreamFromVertex(), and graphRep.
Referenced by ExecStreamGraphImpl::DotVertexRenderer::operator()().
00567 { 00568 Vertex v = boost::vertices(graphRep).first[id]; 00569 return getStreamFromVertex(v); 00570 }
uint ExecStreamGraphImpl::getInputCount | ( | ExecStreamId | streamId | ) | [virtual] |
Determines number of explicit input flows consumed by a stream.
streamId | ID of stream |
Implements ExecStreamGraph.
Definition at line 572 of file ExecStreamGraph.cpp.
References filteredGraph, and graphRep.
00574 { 00575 Vertex streamVertex = boost::vertices(graphRep).first[streamId]; 00576 return boost::in_degree(streamVertex,filteredGraph); 00577 }
uint ExecStreamGraphImpl::getOutputCount | ( | ExecStreamId | streamId | ) | [virtual] |
Determines number of explicit output flows produced by a stream.
streamId | ID of stream |
Implements ExecStreamGraph.
Definition at line 579 of file ExecStreamGraph.cpp.
References filteredGraph, and graphRep.
Referenced by removeFromStreamOutMap().
00581 { 00582 Vertex streamVertex = boost::vertices(graphRep).first[streamId]; 00583 return boost::out_degree(streamVertex,filteredGraph); 00584 }
SharedExecStream ExecStreamGraphImpl::getStreamInput | ( | ExecStreamId | streamId, | |
uint | iInput | |||
) | [virtual] |
Accesses a stream's input.
streamId | ID of stream | |
iInput | 0-based input explicit flow ordinal |
Implements ExecStreamGraph.
Definition at line 598 of file ExecStreamGraph.cpp.
References getInputEdge(), getStreamFromVertex(), and graphRep.
00601 { 00602 Edge inputEdge = getInputEdge(streamId, iInput); 00603 Vertex inputVertex = boost::source(inputEdge,graphRep); 00604 return getStreamFromVertex(inputVertex); 00605 }
SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamInputAccessor | ( | ExecStreamId | streamId, | |
uint | iInput | |||
) | [virtual] |
Accesses a stream's input accessor.
streamId | ID of stream | |
iInput | 0-based input explicit flow ordinal |
Implements ExecStreamGraph.
Definition at line 607 of file ExecStreamGraph.cpp.
References getInputEdge(), and getSharedBufAccessorFromEdge().
00610 { 00611 Edge inputEdge = getInputEdge(streamId, iInput); 00612 return getSharedBufAccessorFromEdge(inputEdge); 00613 }
SharedExecStream ExecStreamGraphImpl::getStreamOutput | ( | ExecStreamId | streamId, | |
uint | iOutput | |||
) | [virtual] |
Accesses a stream's output.
streamId | ID of stream | |
iOutput | 0-based output explicit flow ordinal |
Implements ExecStreamGraph.
Definition at line 627 of file ExecStreamGraph.cpp.
References getOutputEdge(), getStreamFromVertex(), and graphRep.
00630 { 00631 Edge outputEdge = getOutputEdge(streamId, iOutput); 00632 Vertex outputVertex = boost::target(outputEdge,graphRep); 00633 return getStreamFromVertex(outputVertex); 00634 }
SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamOutputAccessor | ( | ExecStreamId | streamId, | |
uint | iOutput | |||
) | [virtual] |
Accesses a stream's output accessor.
streamId | ID of stream | |
iOutput | 0-based output explicit flow ordinal |
Implements ExecStreamGraph.
Definition at line 636 of file ExecStreamGraph.cpp.
References getOutputEdge(), and getSharedBufAccessorFromEdge().
00639 { 00640 Edge outputEdge = getOutputEdge(streamId, iOutput); 00641 return getSharedBufAccessorFromEdge(outputEdge); 00642 }
std::vector< SharedExecStream > ExecStreamGraphImpl::getSortedStreams | ( | ) | [virtual] |
Gets streams, sorted topologically.
Can only be called after prepare.
Implements ExecStreamGraph.
Definition at line 644 of file ExecStreamGraph.cpp.
References isPrepared, sortedStreams, and sortStreams().
00645 { 00646 permAssert(isPrepared); 00647 if (sortedStreams.empty()) { 00648 sortStreams(); 00649 } 00650 return sortedStreams; 00651 }
int ExecStreamGraphImpl::getStreamCount | ( | ) | [virtual] |
Implements ExecStreamGraph.
Definition at line 134 of file ExecStreamGraph.cpp.
References freeVertices, and graphRep.
00135 { 00136 return boost::num_vertices(graphRep) - freeVertices.size(); 00137 }
int ExecStreamGraphImpl::getDataflowCount | ( | ) | [virtual] |
Implements ExecStreamGraph.
Definition at line 257 of file ExecStreamGraph.cpp.
References graphRep.
00258 { 00259 return boost::num_edges(graphRep); 00260 }
void ExecStreamGraphImpl::renderGraphviz | ( | std::ostream & | dotStream | ) | [virtual] |
Renders the graph in the .dot format defined by Graphviz.
dotStream | ostream on which to write .dot representation |
Implements ExecStreamGraph.
Definition at line 734 of file ExecStreamGraph.cpp.
References graphRep.
00735 { 00736 boost::write_graphviz( 00737 dotStream, 00738 graphRep, 00739 DotVertexRenderer(*this), 00740 DotEdgeRenderer(*this), 00741 DotGraphRenderer()); 00742 }
bool ExecStreamGraphImpl::isAcyclic | ( | ) | [virtual] |
Implements ExecStreamGraph.
Definition at line 653 of file ExecStreamGraph.cpp.
References graphRep.
00654 { 00655 int numVertices = boost::num_vertices(graphRep); 00656 00657 // if # strong components is < # vertices, then there must be at least 00658 // one cycle 00659 std::vector<int> component(numVertices); 00660 int nStrongComps = boost::strong_components(graphRep, &component[0]); 00661 return (nStrongComps >= numVertices); 00662 }
void ExecStreamGraphImpl::closeProducers | ( | ExecStreamId | streamId | ) | [virtual] |
Closes the producers of a stream with a given id.
streamId | stream id of the stream whose producers will be closed |
Implements ExecStreamGraph.
Definition at line 744 of file ExecStreamGraph.cpp.
References getStreamFromVertex(), and graphRep.
Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), and BarrierExecStream::execute().
00745 { 00746 FgInEdgeIterPair inEdges = 00747 boost::in_edges(streamId, graphRep); 00748 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00749 Edge edge = *(inEdges.first); 00750 // move streamId upstream 00751 streamId = boost::source(edge,graphRep); 00752 // close the producers of this stream before closing the stream 00753 // itself, but only if it's possible to early close the stream 00754 SharedExecStream pStream = getStreamFromVertex(streamId); 00755 if (!pStream->canEarlyClose()) { 00756 continue; 00757 } 00758 closeProducers(streamId); 00759 pStream->close(); 00760 } 00761 }
void ExecStreamGraphImpl::declareDynamicParamWriter | ( | ExecStreamId | streamId, | |
DynamicParamId | dynamicParamId | |||
) | [virtual] |
Declares that a given stream writes a given dynamic parameter.
streamId | Stream id | |
dynamicParamId | Dynamic parameter id |
Implements ExecStreamGraph.
Definition at line 763 of file ExecStreamGraph.cpp.
References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::writerStreamIds.
00766 { 00767 DynamicParamInfo &info = dynamicParamMap[dynamicParamId]; 00768 info.writerStreamIds.push_back(streamId); 00769 }
void ExecStreamGraphImpl::declareDynamicParamReader | ( | ExecStreamId | streamId, | |
DynamicParamId | dynamicParamId | |||
) | [virtual] |
Declares that a given stream reads a given dynamic parameter.
streamId | Stream id | |
dynamicParamId | Dynamic parameter id |
Implements ExecStreamGraph.
Definition at line 771 of file ExecStreamGraph.cpp.
References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::readerStreamIds.
00774 { 00775 DynamicParamInfo &info = dynamicParamMap[dynamicParamId]; 00776 info.readerStreamIds.push_back(streamId); 00777 }
const std::vector< ExecStreamId > & ExecStreamGraphImpl::getDynamicParamWriters | ( | DynamicParamId | dynamicParamId | ) | [virtual] |
Returns a list of stream ids that write a given dynamic parameter.
dynamicParamId | Dynamic parameter id |
Implements ExecStreamGraph.
Definition at line 779 of file ExecStreamGraph.cpp.
References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::writerStreamIds.
00781 { 00782 DynamicParamInfo &info = dynamicParamMap[dynamicParamId]; 00783 return info.writerStreamIds; 00784 }
const std::vector< ExecStreamId > & ExecStreamGraphImpl::getDynamicParamReaders | ( | DynamicParamId | dynamicParamId | ) | [virtual] |
Returns a list of stream ids that read a given dynamic parameter.
dynamicParamId | Dynamic parameter id |
Implements ExecStreamGraph.
Definition at line 786 of file ExecStreamGraph.cpp.
References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::readerStreamIds.
00788 { 00789 DynamicParamInfo &info = dynamicParamMap[dynamicParamId]; 00790 return info.readerStreamIds; 00791 }
SharedExecStreamGraph ExecStreamGraph::newExecStreamGraph | ( | ) | [static, inherited] |
Constructs a new ExecStreamGraph.
Definition at line 43 of file ExecStreamGraph.cpp.
Referenced by ExecStreamTestBase::newStreamGraph(), and CmdInterpreter::visit().
00044 { 00045 return SharedExecStreamGraph( 00046 new ExecStreamGraphImpl(), 00047 ClosableObjectDestructor()); 00048 }
ExecStreamScheduler * ExecStreamGraph::getScheduler | ( | ) | const [inline, inherited] |
Definition at line 444 of file ExecStreamGraph.h.
References ExecStreamGraph::pScheduler.
Referenced by ExecStream::checkAbort(), JavaSinkExecStream::execute(), CorrelationJoinExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), MergeExecStream::open(), CorrelationJoinExecStream::open(), and JavaSinkExecStream::stuffByteBuffer().
00445 { 00446 return pScheduler; 00447 }
SharedDynamicParamManager ExecStreamGraph::getDynamicParamManager | ( | ) | [inline, inherited] |
Definition at line 449 of file ExecStreamGraph.h.
References ExecStreamGraph::pDynamicParamManager.
Referenced by ExecStream::prepare().
00450 { 00451 return pDynamicParamManager; 00452 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
FullGraphRep ExecStreamGraphImpl::graphRep [protected] |
Definition at line 122 of file ExecStreamGraphImpl.h.
Referenced by addDataflow(), addInputDataflow(), addOutputDataflow(), addVertex(), clear(), closeProducers(), freeVertex(), getDataflowCount(), getFullGraphRep(), getInputCount(), getInputEdge(), getOutputCount(), getOutputEdge(), getSharedBufAccessorFromEdge(), getStream(), getStreamCount(), getStreamFromVertex(), getStreamInput(), getStreamOutput(), isAcyclic(), mergeFrom(), newVertex(), prepare(), removeStream(), renderGraphviz(), and sortStreams().
GraphRep ExecStreamGraphImpl::filteredGraph [protected] |
Definition at line 124 of file ExecStreamGraphImpl.h.
Referenced by bindStreamBufAccessors(), getGraphRep(), getInputCount(), getInputEdge(), getOutputCount(), getOutputEdge(), open(), and prepare().
std::vector<Vertex> ExecStreamGraphImpl::freeVertices [protected] |
List of freed vertices.
Definition at line 133 of file ExecStreamGraphImpl.h.
Referenced by freeVertex(), getStreamCount(), and newVertex().
StreamMap ExecStreamGraphImpl::streamMap [protected] |
Map of name to stream.
Definition at line 143 of file ExecStreamGraphImpl.h.
Referenced by addVertex(), clear(), findStream(), and removeStream().
EdgeMap ExecStreamGraphImpl::streamOutMap [protected] |
Map of name and output arc to stream output, after add-ons.
Definition at line 148 of file ExecStreamGraphImpl.h.
Referenced by clear(), findLastStream(), interposeStream(), and removeFromStreamOutMap().
std::vector<SharedExecStream> ExecStreamGraphImpl::sortedStreams [protected] |
Result of topologically sorting graph (producers before consumers).
Definition at line 153 of file ExecStreamGraphImpl.h.
Referenced by clear(), closeImpl(), getSortedStreams(), mergeFrom(), open(), prepare(), removeStream(), and sortStreams().
SharedLogicalTxn ExecStreamGraphImpl::pTxn [protected] |
Transaction being executed.
Definition at line 158 of file ExecStreamGraphImpl.h.
Referenced by closeImpl(), getTxn(), getTxnId(), and setTxn().
SharedErrorTarget ExecStreamGraphImpl::pErrorTarget [protected] |
Target for row errors.
Definition at line 163 of file ExecStreamGraphImpl.h.
Referenced by openStream(), and setErrorTarget().
SharedSegment ExecStreamGraphImpl::pScratchSegment [protected] |
Source for scratch buffers.
Definition at line 168 of file ExecStreamGraphImpl.h.
Referenced by closeImpl(), and setScratchSegment().
Resource governor.
Definition at line 173 of file ExecStreamGraphImpl.h.
Referenced by getResourceGovernor(), and setResourceGovernor().
bool ExecStreamGraphImpl::isOpen [protected] |
Whether this graph is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a graph needs to be closed before destruction if it has been prepared but never opened.
Definition at line 181 of file ExecStreamGraphImpl.h.
Referenced by clear(), closeImpl(), ExecStreamGraphImpl(), mergeFrom(), and open().
bool ExecStreamGraphImpl::isPrepared [protected] |
Whether this graph has been prepared.
Definition at line 186 of file ExecStreamGraphImpl.h.
Referenced by clear(), ExecStreamGraphImpl(), getSortedStreams(), mergeFrom(), and prepare().
bool ExecStreamGraphImpl::doDataflowClose [protected] |
Whether to close this graph in dataflow order (producers to consumers).
Definition at line 191 of file ExecStreamGraphImpl.h.
Referenced by closeImpl(), and ExecStreamGraphImpl().
std::map<DynamicParamId, DynamicParamInfo> ExecStreamGraphImpl::dynamicParamMap [protected] |
Information on readers and writers of dynamic parameters.
Definition at line 203 of file ExecStreamGraphImpl.h.
Referenced by declareDynamicParamReader(), declareDynamicParamWriter(), getDynamicParamReaders(), and getDynamicParamWriters().
bool ExecStreamGraphImpl::allowDummyTxnId [protected] |
Whether to allow execution without a real transaction.
Definition at line 208 of file ExecStreamGraphImpl.h.
Referenced by enableDummyTxnId(), ExecStreamGraphImpl(), and getTxnId().
ExecStreamScheduler* ExecStreamGraph::pScheduler [protected, inherited] |
A Scheduler responsible for executing streams in this graph.
(Can be null if the current graph is only building streams, not executing them.) Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 73 of file ExecStreamGraph.h.
Referenced by ExecStreamGraph::getScheduler().
SharedDynamicParamManager ExecStreamGraph::pDynamicParamManager [protected, inherited] |
Manager that handles dynamic parameters for this graph.
Definition at line 78 of file ExecStreamGraph.h.
Referenced by closeImpl(), and ExecStreamGraph::getDynamicParamManager().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), open(), ExecStream::open(), and ClosableObject::~ClosableObject().