ExecStreamGraphImpl Class Reference

ExecStreamGraphImpl is an implementation for the ExecStreamGraph interface based on the boost graph template. More...

#include <ExecStreamGraphImpl.h>

Inheritance diagram for ExecStreamGraphImpl:

ExecStreamGraph ClosableObject List of all members.

Public Types

typedef boost::adjacency_list<
boost::vecS, boost::vecS,
boost::bidirectionalS, boost::property<
boost::vertex_data_t, SharedExecStream >,
boost::property< boost::edge_data_t,
SharedExecStreamBufAccessor,
boost::property< boost::edge_weight_t,
int > > > 
FullGraphRep
typedef boost::graph_traits<
FullGraphRep >::vertex_descriptor 
Vertex
typedef boost::graph_traits<
FullGraphRep >::edge_descriptor 
Edge
typedef boost::graph_traits<
FullGraphRep >::vertex_iterator 
FgVertexIter
typedef boost::graph_traits<
FullGraphRep >::edge_iterator 
FgEdgeIter
typedef boost::graph_traits<
FullGraphRep >::out_edge_iterator 
FgOutEdgeIter
typedef boost::graph_traits<
FullGraphRep >::in_edge_iterator 
FgInEdgeIter
typedef std::pair< FgVertexIter,
FgVertexIter
FgVertexIterPair
typedef std::pair< FgEdgeIter,
FgEdgeIter
FgEdgeIterPair
typedef std::pair< FgOutEdgeIter,
FgOutEdgeIter
FgOutEdgeIterPair
typedef std::pair< FgInEdgeIter,
FgInEdgeIter
FgInEdgeIterPair
typedef boost::property_map<
FullGraphRep, boost::edge_weight_t
>::type 
EdgeWeightMap
typedef boost::filtered_graph<
FullGraphRep, ExplicitEdgePredicate
GraphRep
typedef boost::graph_traits<
GraphRep >::vertex_iterator 
VertexIter
typedef boost::graph_traits<
GraphRep >::edge_iterator 
EdgeIter
typedef boost::graph_traits<
GraphRep >::out_edge_iterator 
OutEdgeIter
typedef boost::graph_traits<
GraphRep >::in_edge_iterator 
InEdgeIter
typedef std::pair< VertexIter,
VertexIter
VertexIterPair
typedef std::pair< EdgeIter,
EdgeIter
EdgeIterPair
typedef std::pair< OutEdgeIter,
OutEdgeIter
OutEdgeIterPair
typedef std::pair< InEdgeIter,
InEdgeIter
InEdgeIterPair

Public Member Functions

 ExecStreamGraphImpl ()
virtual ~ExecStreamGraphImpl ()
GraphRep const & getGraphRep ()
FullGraphRep const & getFullGraphRep ()
SharedExecStream getStreamFromVertex (Vertex)
SharedExecStreamBufAccessorgetSharedBufAccessorFromEdge (Edge)
ExecStreamBufAccessorgetBufAccessorFromEdge (Edge)
virtual void setTxn (SharedLogicalTxn pTxn)
 Sets the transaction within which this graph should execute.
virtual void setErrorTarget (SharedErrorTarget pErrorTarget)
 Sets the ErrorTarget to which this graph's streams should send row errors.
virtual void setScratchSegment (SharedSegment pScratchSegment)
 Sets the ScratchSegment from which this graph's streams should allocate memory buffers.
virtual void setResourceGovernor (SharedExecStreamGovernor pResourceGovernor)
 Sets the global exec stream governor.
virtual SharedLogicalTxn getTxn ()
 
Returns:
the transaction within which this graph is executing

virtual TxnId getTxnId ()
 
Returns:
the transaction ID for this graph

virtual void enableDummyTxnId (bool enabled)
 Controls whether it is OK to call getTxnId without first calling setTxn.
virtual SharedExecStreamGovernor getResourceGovernor ()
 
Returns:
exec stream governor

virtual void prepare (ExecStreamScheduler &scheduler)
 Prepares this graph for execution.
virtual void open ()
 Opens execution on this graph.
virtual void addStream (SharedExecStream pStream)
 Adds a stream to this graph.
virtual void removeStream (ExecStreamId)
 Removes a stream from the graph: deletes the edges, and puts the vertex on a free list to be reallocated.
virtual void addDataflow (ExecStreamId producerId, ExecStreamId consumerId, bool isImplicit=false)
 Defines a dataflow relationship between two streams in this graph.
virtual void addOutputDataflow (ExecStreamId producerId)
 Defines a dataflow representing external output produced by this graph.
virtual void addInputDataflow (ExecStreamId consumerId)
 Defines a dataflow representing external input consumed by this graph.
virtual void mergeFrom (ExecStreamGraph &src)
 Adds all the vertices and edges from another graph.
virtual void mergeFrom (ExecStreamGraph &src, std::vector< ExecStreamId > const &nodes)
 Adds a subgraph, taken (removed) from another graph.
virtual SharedExecStream findStream (std::string name)
 Finds a stream by name.
virtual SharedExecStream findLastStream (std::string name, uint iOutput)
 Finds last stream known for name.
virtual void interposeStream (std::string name, uint iOutput, ExecStreamId interposedId)
 Interposes an adapter stream.
virtual SharedExecStream getStream (ExecStreamId id)
 Translates a stream ID to a stream pointer.
virtual uint getInputCount (ExecStreamId streamId)
 Determines number of explicit input flows consumed by a stream.
virtual uint getOutputCount (ExecStreamId streamId)
 Determines number of explicit output flows produced by a stream.
virtual SharedExecStream getStreamInput (ExecStreamId streamId, uint iInput)
 Accesses a stream's input.
virtual SharedExecStreamBufAccessor getStreamInputAccessor (ExecStreamId streamId, uint iInput)
 Accesses a stream's input accessor.
virtual SharedExecStream getStreamOutput (ExecStreamId streamId, uint iOutput)
 Accesses a stream's output.
virtual SharedExecStreamBufAccessor getStreamOutputAccessor (ExecStreamId streamId, uint iOutput)
 Accesses a stream's output accessor.
virtual std::vector< SharedExecStreamgetSortedStreams ()
 Gets streams, sorted topologically.
virtual int getStreamCount ()
 
Returns:
the number of streams in the graph; can only be called after prepare.

virtual int getDataflowCount ()
 
Returns:
the number of dataflows (edges) in the graph; can only be called after prepare.

virtual void renderGraphviz (std::ostream &dotStream)
 Renders the graph in the .dot format defined by Graphviz.
virtual bool isAcyclic ()
 
Returns:
true if graph has no cycles

virtual void closeProducers (ExecStreamId streamId)
 Closes the producers of a stream with a given id.
virtual void declareDynamicParamWriter (ExecStreamId streamId, DynamicParamId dynamicParamId)
 Declares that a given stream writes a given dynamic parameter.
virtual void declareDynamicParamReader (ExecStreamId streamId, DynamicParamId dynamicParamId)
 Declares that a given stream reads a given dynamic parameter.
virtual const std::vector<
ExecStreamId > & 
getDynamicParamWriters (DynamicParamId dynamicParamId)
 Returns a list of stream ids that write a given dynamic parameter.
virtual const std::vector<
ExecStreamId > & 
getDynamicParamReaders (DynamicParamId dynamicParamId)
 Returns a list of stream ids that read a given dynamic parameter.
ExecStreamSchedulergetScheduler () const
 
Returns:
pointer to executing scheduler, or null if there is none.

SharedDynamicParamManager getDynamicParamManager ()
 
Returns:
reference to the DynamicParamManager for this graph.

bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.

Static Public Member Functions

static SharedExecStreamGraph newExecStreamGraph ()
 Constructs a new ExecStreamGraph.

Protected Types

typedef std::map< std::string,
ExecStreamId
StreamMap
typedef StreamMap::const_iterator StreamMapConstIter
typedef std::map< std::pair<
std::string, uint >, ExecStreamId
EdgeMap

Protected Member Functions

virtual void closeImpl ()
 Must be implemented by derived class to release any resources.
virtual void sortStreams ()
virtual void openStream (SharedExecStream pStream)
virtual void bindStreamBufAccessors (SharedExecStream pStream)
virtual void mergeFrom (ExecStreamGraphImpl &src)
virtual void mergeFrom (ExecStreamGraphImpl &src, std::vector< ExecStreamId > const &nodes)
virtual void clear ()
 frees all nodes and edges: like removeStream() on all streams, but faster
virtual Vertex addVertex (SharedExecStream pStream)
 adds a node
Vertex newVertex ()
 
Returns:
an available Vertex, first trying the free list

void freeVertex (Vertex)
 releases a Vertex to the free list
void removeFromStreamOutMap (SharedExecStream)
 removes a stream from streamOutMap
virtual Edge getInputEdge (ExecStreamId stream, uint iInput)
virtual Edge getOutputEdge (ExecStreamId stream, uint iOutput)

Protected Attributes

FullGraphRep graphRep
GraphRep filteredGraph
std::vector< VertexfreeVertices
 List of freed vertices.
StreamMap streamMap
 Map of name to stream.
EdgeMap streamOutMap
 Map of name and output arc to stream output, after add-ons.
std::vector< SharedExecStreamsortedStreams
 Result of topologically sorting graph (producers before consumers).
SharedLogicalTxn pTxn
 Transaction being executed.
SharedErrorTarget pErrorTarget
 Target for row errors.
SharedSegment pScratchSegment
 Source for scratch buffers.
SharedExecStreamGovernor pResourceGovernor
 Resource governor.
bool isOpen
 Whether this graph is currently open.
bool isPrepared
 Whether this graph has been prepared.
bool doDataflowClose
 Whether to close this graph in dataflow order (producers to consumers).
std::map< DynamicParamId,
DynamicParamInfo
dynamicParamMap
 Information on readers and writers of dynamic parameters.
bool allowDummyTxnId
 Whether to allow execution without a real transaction.
ExecStreamSchedulerpScheduler
 A Scheduler responsible for executing streams in this graph.
SharedDynamicParamManager pDynamicParamManager
 Manager that handles dynamic parameters for this graph.
bool needsClose

Classes

class  DotEdgeRenderer
class  DotGraphRenderer
class  DotVertexRenderer
class  DynamicParamInfo
struct  ExplicitEdgePredicate

Detailed Description

ExecStreamGraphImpl is an implementation for the ExecStreamGraph interface based on the boost graph template.

Definition at line 50 of file ExecStreamGraphImpl.h.


Member Typedef Documentation

typedef boost::adjacency_list< boost::vecS, boost::vecS, boost::bidirectionalS, boost::property<boost::vertex_data_t,SharedExecStream>, boost::property< boost::edge_data_t,SharedExecStreamBufAccessor, boost::property<boost::edge_weight_t,int> > > ExecStreamGraphImpl::FullGraphRep

Definition at line 62 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<FullGraphRep>::vertex_descriptor ExecStreamGraphImpl::Vertex

Definition at line 64 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<FullGraphRep>::edge_descriptor ExecStreamGraphImpl::Edge

Definition at line 65 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<FullGraphRep>::vertex_iterator ExecStreamGraphImpl::FgVertexIter

Definition at line 67 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<FullGraphRep>::edge_iterator ExecStreamGraphImpl::FgEdgeIter

Definition at line 68 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<FullGraphRep>::out_edge_iterator ExecStreamGraphImpl::FgOutEdgeIter

Definition at line 69 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<FullGraphRep>::in_edge_iterator ExecStreamGraphImpl::FgInEdgeIter

Definition at line 70 of file ExecStreamGraphImpl.h.

typedef std::pair<FgVertexIter,FgVertexIter> ExecStreamGraphImpl::FgVertexIterPair

Definition at line 71 of file ExecStreamGraphImpl.h.

typedef std::pair<FgEdgeIter,FgEdgeIter> ExecStreamGraphImpl::FgEdgeIterPair

Definition at line 72 of file ExecStreamGraphImpl.h.

typedef std::pair<FgOutEdgeIter,FgOutEdgeIter> ExecStreamGraphImpl::FgOutEdgeIterPair

Definition at line 73 of file ExecStreamGraphImpl.h.

typedef std::pair<FgInEdgeIter,FgInEdgeIter> ExecStreamGraphImpl::FgInEdgeIterPair

Definition at line 74 of file ExecStreamGraphImpl.h.

typedef boost::property_map<FullGraphRep, boost::edge_weight_t>::type ExecStreamGraphImpl::EdgeWeightMap

Definition at line 77 of file ExecStreamGraphImpl.h.

typedef boost::filtered_graph<FullGraphRep, ExplicitEdgePredicate> ExecStreamGraphImpl::GraphRep

Definition at line 102 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<GraphRep>::vertex_iterator ExecStreamGraphImpl::VertexIter

Definition at line 103 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<GraphRep>::edge_iterator ExecStreamGraphImpl::EdgeIter

Definition at line 104 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<GraphRep>::out_edge_iterator ExecStreamGraphImpl::OutEdgeIter

Definition at line 105 of file ExecStreamGraphImpl.h.

typedef boost::graph_traits<GraphRep>::in_edge_iterator ExecStreamGraphImpl::InEdgeIter

Definition at line 106 of file ExecStreamGraphImpl.h.

typedef std::pair<VertexIter,VertexIter> ExecStreamGraphImpl::VertexIterPair

Definition at line 107 of file ExecStreamGraphImpl.h.

typedef std::pair<EdgeIter,EdgeIter> ExecStreamGraphImpl::EdgeIterPair

Definition at line 108 of file ExecStreamGraphImpl.h.

typedef std::pair<OutEdgeIter,OutEdgeIter> ExecStreamGraphImpl::OutEdgeIterPair

Definition at line 109 of file ExecStreamGraphImpl.h.

typedef std::pair<InEdgeIter,InEdgeIter> ExecStreamGraphImpl::InEdgeIterPair

Definition at line 110 of file ExecStreamGraphImpl.h.

typedef std::map<std::string,ExecStreamId> ExecStreamGraphImpl::StreamMap [protected]

Definition at line 126 of file ExecStreamGraphImpl.h.

typedef StreamMap::const_iterator ExecStreamGraphImpl::StreamMapConstIter [protected]

Definition at line 127 of file ExecStreamGraphImpl.h.

typedef std::map<std::pair<std::string, uint>,ExecStreamId> ExecStreamGraphImpl::EdgeMap [protected]

Definition at line 128 of file ExecStreamGraphImpl.h.


Constructor & Destructor Documentation

ExecStreamGraphImpl::ExecStreamGraphImpl (  )  [explicit]

Definition at line 60 of file ExecStreamGraph.cpp.

References allowDummyTxnId, doDataflowClose, isOpen, and isPrepared.

00061     : filteredGraph(
00062         graphRep,
00063         boost::get(boost::edge_weight, graphRep))
00064 {
00065     isPrepared = false;
00066     isOpen = false;
00067     doDataflowClose = false;
00068     allowDummyTxnId = false;
00069 }

virtual ExecStreamGraphImpl::~ExecStreamGraphImpl (  )  [inline, virtual]

Definition at line 239 of file ExecStreamGraphImpl.h.

00239 {}


Member Function Documentation

void ExecStreamGraphImpl::closeImpl (  )  [protected, virtual]

Must be implemented by derived class to release any resources.

Implements ClosableObject.

Definition at line 535 of file ExecStreamGraph.cpp.

References ClosableObject::close(), doDataflowClose, getResourceGovernor(), isOpen, NULL_PAGE_ID, ExecStreamGraph::pDynamicParamManager, pScratchSegment, pTxn, sortedStreams, and sortStreams().

00536 {
00537     isOpen = false;
00538     if (sortedStreams.empty()) {
00539         // in case prepare was never called
00540         sortStreams();
00541     }
00542     if (doDataflowClose) {
00543         std::for_each(
00544             sortedStreams.begin(),
00545             sortedStreams.end(),
00546             boost::bind(&ClosableObject::close,_1));
00547     } else {
00548         std::for_each(
00549             sortedStreams.rbegin(),
00550             sortedStreams.rend(),
00551             boost::bind(&ClosableObject::close,_1));
00552     }
00553     pDynamicParamManager->deleteAllParams();
00554     SharedExecStreamGovernor pGov = getResourceGovernor();
00555     if (pGov) {
00556         pGov->returnResources(*this);
00557     }
00558     pTxn.reset();
00559 
00560     // release any scratch memory
00561     if (pScratchSegment) {
00562         pScratchSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID);
00563     }
00564 }

void ExecStreamGraphImpl::sortStreams (  )  [protected, virtual]

Definition at line 432 of file ExecStreamGraph.cpp.

References getStreamFromVertex(), graphRep, and sortedStreams.

Referenced by closeImpl(), getSortedStreams(), open(), and prepare().

00433 {
00434     std::vector<Vertex> sortedVertices;
00435     boost::topological_sort(
00436         graphRep,std::back_inserter(sortedVertices));
00437     sortedStreams.resize(sortedVertices.size());
00438 
00439     // boost::topological_sort produces an ordering from consumers to
00440     // producers, but we want the oppposite ordering, hence
00441     // sortedStreams.rbegin() below
00442     std::transform(
00443         sortedVertices.begin(),
00444         sortedVertices.end(),
00445         sortedStreams.rbegin(),
00446         boost::bind(&ExecStreamGraphImpl::getStreamFromVertex,this,_1));
00447 
00448     // now filter out the null vertices representing inputs and outputs
00449     sortedStreams.erase(
00450         std::remove(
00451             sortedStreams.begin(),sortedStreams.end(),SharedExecStream()),
00452         sortedStreams.end());
00453 }

void ExecStreamGraphImpl::openStream ( SharedExecStream  pStream  )  [protected, virtual]

Definition at line 527 of file ExecStreamGraph.cpp.

References pErrorTarget.

Referenced by open().

00528 {
00529     if (pErrorTarget) {
00530         pStream->initErrorSource(pErrorTarget, pStream->getName());
00531     }
00532     pStream->open(false);
00533 }

void ExecStreamGraphImpl::bindStreamBufAccessors ( SharedExecStream  pStream  )  [protected, virtual]

Definition at line 475 of file ExecStreamGraph.cpp.

References filteredGraph, and getSharedBufAccessorFromEdge().

Referenced by prepare().

00476 {
00477     std::vector<SharedExecStreamBufAccessor> bufAccessors;
00478 
00479     // bind the input buffers (explicit dataflow only)
00480     InEdgeIterPair inEdges = boost::in_edges(
00481         pStream->getStreamId(),filteredGraph);
00482     for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00483         SharedExecStreamBufAccessor pBufAccessor =
00484             getSharedBufAccessorFromEdge(*(inEdges.first));
00485         bufAccessors.push_back(pBufAccessor);
00486     }
00487     pStream->setInputBufAccessors(bufAccessors);
00488     bufAccessors.clear();
00489 
00490     // bind the output buffers (explicit dataflow only)
00491     OutEdgeIterPair outEdges = boost::out_edges(
00492         pStream->getStreamId(),filteredGraph);
00493     for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00494         SharedExecStreamBufAccessor pBufAccessor =
00495             getSharedBufAccessorFromEdge(*(outEdges.first));
00496         bufAccessors.push_back(pBufAccessor);
00497         pBufAccessor->setProvision(pStream->getOutputBufProvision());
00498     }
00499     pStream->setOutputBufAccessors(bufAccessors);
00500 }

void ExecStreamGraphImpl::mergeFrom ( ExecStreamGraphImpl src  )  [protected, virtual]

Definition at line 282 of file ExecStreamGraph.cpp.

References addVertex(), clear(), getSharedBufAccessorFromEdge(), getStreamFromVertex(), graphRep, isOpen, isPrepared, and sortedStreams.

Referenced by mergeFrom().

00283 {
00284     // Since the identity of the added graph SRC will be lost, at this time both
00285     // graphs must be prepared, and must both be open or both be closed.
00286     permAssert(isPrepared && src.isPrepared);
00287     permAssert(isOpen == src.isOpen);
00288 
00289     // map a source vertex ID to the ID of the copied target vertex
00290     std::map<Vertex, Vertex> vmap;
00291 
00292     // copy the nodes (with attached streams)
00293     FgVertexIterPair verts = boost::vertices(src.graphRep);
00294     for (; verts.first != verts.second; ++verts.first) {
00295         Vertex vsrc = *verts.first;
00296         SharedExecStream pStream = src.getStreamFromVertex(vsrc);
00297         Vertex vnew = addVertex(pStream);
00298         vmap[vsrc] = vnew;
00299     }
00300 
00301     // copy the edges (with attached buffers, which stay bound to the adjacent
00302     // streams)
00303     FgEdgeIterPair edges = boost::edges(src.graphRep);
00304     for (; edges.first != edges.second; ++edges.first) {
00305         Edge esrc = *edges.first;
00306         SharedExecStreamBufAccessor pBuf =
00307             src.getSharedBufAccessorFromEdge(esrc);
00308         std::pair<Edge, bool> x = boost::add_edge(
00309             vmap[boost::source(esrc, src.graphRep)], // image of source node
00310             vmap[boost::target(esrc, src.graphRep)], // image of target node
00311             pBuf,
00312             graphRep);
00313         boost::put(
00314             boost::edge_weight,
00315             graphRep,
00316             x.first,
00317             boost::get(boost::edge_weight, src.graphRep, esrc));
00318         assert(x.second);
00319     }
00320     src.clear();                        // source is empty
00321     sortedStreams.clear();              // invalid now
00322 }

void ExecStreamGraphImpl::mergeFrom ( ExecStreamGraphImpl src,
std::vector< ExecStreamId > const &  nodes 
) [protected, virtual]

Definition at line 325 of file ExecStreamGraph.cpp.

References addVertex(), getSharedBufAccessorFromEdge(), getStreamFromVertex(), graphRep, isOpen, and isPrepared.

00328 {
00329     // both graphs must be prepared, and must both be open or both be closed.
00330     permAssert(isPrepared && src.isPrepared);
00331     permAssert(isOpen == src.isOpen);
00332 
00333     // map a source vertex ID to the ID of the copied target vertex
00334     std::map<Vertex, Vertex> vmap;
00335 
00336     // Copy the nodes (with attached streams)
00337     int nnodes = nodes.size();
00338     for (int i = 0; i < nnodes; i++) {
00339         Vertex vsrc = boost::vertices(src.graphRep).first[nodes[i]];
00340         SharedExecStream pStream = src.getStreamFromVertex(vsrc);
00341         Vertex vnew = addVertex(pStream);
00342         vmap[vsrc] = vnew;
00343     }
00344 
00345     // Copy the internal edges (with attached buffers, which stay bound to the
00346     // adjacent streams).  It suffices to scan the outbound edges. The external
00347     // edges are abandoned.
00348     if (nnodes > 1) {                   // (when only 1 node, no internal edges)
00349         for (int i = 0; i < nnodes; i++) {
00350             // Find all outbound edges E (U,V) in the source subgraph
00351             Vertex u = boost::vertices(src.graphRep).first[nodes[i]];
00352             for (FgOutEdgeIterPair edges = boost::out_edges(u, src.graphRep);
00353                  edges.first != edges.second;
00354                  ++edges.first)
00355             {
00356                 // an edge e (u, v) in the source graph
00357                 Edge e = *edges.first;
00358                 assert(u == boost::source(e, src.graphRep));
00359                 Vertex v = boost::target(e, src.graphRep);
00360                 // V is in the subgraph iff v is a key in the map vmap[]
00361                 if (vmap.find(v) != vmap.end()) {
00362                     SharedExecStreamBufAccessor pBuf =
00363                         src.getSharedBufAccessorFromEdge(e);
00364                     std::pair<Edge, bool> x =
00365                         boost::add_edge(
00366                             vmap[u],
00367                             vmap[v],
00368                             pBuf,
00369                             graphRep);
00370                     assert(x.second);
00371                     boost::put(
00372                         boost::edge_weight,
00373                         graphRep,
00374                         x.first,
00375                         boost::get(boost::edge_weight, src.graphRep, e));
00376                 }
00377             }
00378         }
00379     }
00380 
00381     // delete the copied subgraph from SRC
00382     for (int i = 0; i < nnodes; i++) {
00383         Vertex v = boost::vertices(src.graphRep).first[nodes[i]];
00384         SharedExecStream pStream = src.getStreamFromVertex(v);
00385         src.streamMap.erase(pStream->getName());
00386         src.removeFromStreamOutMap(pStream);
00387         src.freeVertex(v);
00388     }
00389     src.sortedStreams.clear();          // invalidate
00390     sortedStreams.clear();              // invalidate
00391 }

void ExecStreamGraphImpl::clear (  )  [protected, virtual]

frees all nodes and edges: like removeStream() on all streams, but faster

Definition at line 201 of file ExecStreamGraph.cpp.

References freeVertex(), graphRep, isOpen, isPrepared, ClosableObject::needsClose, sortedStreams, streamMap, and streamOutMap.

Referenced by mergeFrom().

00202 {
00203     FgVertexIterPair verts = boost::vertices(graphRep);
00204     while (verts.first != verts.second) {
00205         Vertex v = *verts.first;
00206         freeVertex(v);
00207         ++verts.first;
00208     }
00209 
00210     streamMap.clear();
00211     streamOutMap.clear();
00212     sortedStreams.clear();
00213     needsClose = isOpen = isPrepared = false;
00214 }

ExecStreamGraphImpl::Vertex ExecStreamGraphImpl::addVertex ( SharedExecStream  pStream  )  [protected, virtual]

adds a node

Definition at line 140 of file ExecStreamGraph.cpp.

References findStream(), graphRep, newVertex(), streamMap, and boost::vertex_data.

Referenced by addStream(), and mergeFrom().

00141 {
00142     Vertex v = newVertex();
00143     boost::put(boost::vertex_data, graphRep, v, pStream);
00144     if (pStream) {
00145         // Note that pStream can be null for an exterior node in a farrago
00146         // graph.  Guard against duplicating a stream name.
00147         const std::string& name = pStream->getName();
00148         if (name.length() == 0) {
00149             permFail("cannot add nameless stream to graph " << this);
00150         }
00151         if (findStream(name)) {
00152             permFail("cannot add stream " << name << " to graph " << this);
00153         }
00154         pStream->id = v;
00155         pStream->pGraph = this;
00156         streamMap[name] = pStream->getStreamId();
00157     }
00158     return v;
00159 }

ExecStreamGraphImpl::Vertex ExecStreamGraphImpl::newVertex (  )  [protected]

Returns:
an available Vertex, first trying the free list

Definition at line 117 of file ExecStreamGraph.cpp.

References freeVertices, and graphRep.

Referenced by addInputDataflow(), addOutputDataflow(), and addVertex().

00118 {
00119     if (freeVertices.size() > 0) {
00120         Vertex ret = freeVertices.back();
00121         freeVertices.pop_back();
00122         return ret;
00123     }
00124     return boost::add_vertex(graphRep);
00125 }

void ExecStreamGraphImpl::freeVertex ( Vertex   )  [protected]

releases a Vertex to the free list

Definition at line 127 of file ExecStreamGraph.cpp.

References freeVertices, graphRep, and boost::vertex_data.

Referenced by clear(), and removeStream().

00128 {
00129     boost::clear_vertex(v, graphRep);
00130     boost::get(boost::vertex_data, graphRep)[v].reset();
00131     freeVertices.push_back(v);
00132 }

void ExecStreamGraphImpl::removeFromStreamOutMap ( SharedExecStream   )  [protected]

removes a stream from streamOutMap

Definition at line 183 of file ExecStreamGraph.cpp.

References getOutputCount(), and streamOutMap.

Referenced by removeStream().

00184 {
00185     int outCt = getOutputCount(p->getStreamId());
00186     if (outCt > 0) {
00187         std::string name = p->getName();
00188         // assumes map key pairs <name, index> sort lexicographically, so
00189         // <name, *> is contiguous.
00190         EdgeMap::iterator startNameRange =
00191             streamOutMap.find(std::make_pair(name, 0));
00192         EdgeMap::iterator endNameRange =
00193             streamOutMap.find(std::make_pair(name, outCt - 1));
00194         streamOutMap.erase(startNameRange, endNameRange);
00195     }
00196 }

ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getInputEdge ( ExecStreamId  stream,
uint  iInput 
) [protected, virtual]

Definition at line 586 of file ExecStreamGraph.cpp.

References filteredGraph, and graphRep.

Referenced by getStreamInput(), and getStreamInputAccessor().

00589 {
00590     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00591     InEdgeIter pEdge = boost::in_edges(streamVertex,filteredGraph).first;
00592     for (int i = 0; i < iInput; ++i) {
00593         ++pEdge;
00594     }
00595     return *pEdge;
00596 }

ExecStreamGraphImpl::Edge ExecStreamGraphImpl::getOutputEdge ( ExecStreamId  stream,
uint  iOutput 
) [protected, virtual]

Definition at line 615 of file ExecStreamGraph.cpp.

References filteredGraph, and graphRep.

Referenced by getStreamOutput(), and getStreamOutputAccessor().

00618 {
00619     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00620     OutEdgeIter pEdge = boost::out_edges(streamVertex,filteredGraph).first;
00621     for (int i = 0; i < iOutput; ++i) {
00622         ++pEdge;
00623     }
00624     return *pEdge;
00625 }

ExecStreamGraphImpl::GraphRep const & ExecStreamGraphImpl::getGraphRep (  )  [inline]

Definition at line 319 of file ExecStreamGraphImpl.h.

References filteredGraph.

Referenced by ParallelExecStreamScheduler::alterNeighborInhibition(), ParallelExecStreamScheduler::processCompletedTask(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ParallelExecStreamScheduler::start(), and ExecStreamScheduler::traceStreamBuffers().

00320 {
00321     return filteredGraph;
00322 }

ExecStreamGraphImpl::FullGraphRep const & ExecStreamGraphImpl::getFullGraphRep (  )  [inline]

Definition at line 325 of file ExecStreamGraphImpl.h.

References graphRep.

Referenced by ExecStreamGraphImpl::DotEdgeRenderer::operator()().

00326 {
00327     return graphRep;
00328 }

SharedExecStream ExecStreamGraphImpl::getStreamFromVertex ( Vertex   )  [inline]

Definition at line 330 of file ExecStreamGraphImpl.h.

References graphRep, and boost::vertex_data.

Referenced by ParallelExecStreamScheduler::addToQueue(), closeProducers(), findLastStream(), DfsTreeExecStreamScheduler::findNextConsumer(), findStream(), getStream(), getStreamInput(), getStreamOutput(), mergeFrom(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), removeStream(), sortStreams(), and ParallelExecStreamScheduler::start().

00332 {
00333     return boost::get(boost::vertex_data,graphRep)[vertex];
00334 }

SharedExecStreamBufAccessor & ExecStreamGraphImpl::getSharedBufAccessorFromEdge ( Edge   )  [inline]

Definition at line 337 of file ExecStreamGraphImpl.h.

References boost::edge_data, and graphRep.

Referenced by bindStreamBufAccessors(), getBufAccessorFromEdge(), getStreamInputAccessor(), getStreamOutputAccessor(), mergeFrom(), and ExecStreamGraphImpl::DotEdgeRenderer::operator()().

00339 {
00340     return boost::get(boost::edge_data,graphRep)[edge];
00341 }

ExecStreamBufAccessor & ExecStreamGraphImpl::getBufAccessorFromEdge ( Edge   )  [inline]

Definition at line 343 of file ExecStreamGraphImpl.h.

References getSharedBufAccessorFromEdge().

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), open(), ParallelExecStreamScheduler::processCompletedTask(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ParallelExecStreamScheduler::start(), and ExecStreamScheduler::traceStreamBuffers().

00345 {
00346     return *(getSharedBufAccessorFromEdge(edge));
00347 }

void ExecStreamGraphImpl::setTxn ( SharedLogicalTxn  pTxn  )  [virtual]

Sets the transaction within which this graph should execute.

The transaction is reset whenever the graph is closed.

Parameters:
pTxn transaction

Implements ExecStreamGraph.

Definition at line 71 of file ExecStreamGraph.cpp.

References pTxn.

00072 {
00073     pTxn = pTxnInit;
00074 }

void ExecStreamGraphImpl::setErrorTarget ( SharedErrorTarget  pErrorTarget  )  [virtual]

Sets the ErrorTarget to which this graph's streams should send row errors.

Parameters:
pErrorTarget error target

Implements ExecStreamGraph.

Definition at line 76 of file ExecStreamGraph.cpp.

References pErrorTarget.

00077 {
00078     pErrorTarget = pErrorTargetInit;
00079 }

void ExecStreamGraphImpl::setScratchSegment ( SharedSegment  pScratchSegment  )  [virtual]

Sets the ScratchSegment from which this graph's streams should allocate memory buffers.

Parameters:
pScratchSegment scratch segment

Implements ExecStreamGraph.

Definition at line 81 of file ExecStreamGraph.cpp.

References pScratchSegment.

00083 {
00084     pScratchSegment = pScratchSegmentInit;
00085 }

void ExecStreamGraphImpl::setResourceGovernor ( SharedExecStreamGovernor  pResourceGovernor  )  [virtual]

Sets the global exec stream governor.

Parameters:
pResourceGovernor exec stream governor

Implements ExecStreamGraph.

Definition at line 87 of file ExecStreamGraph.cpp.

References pResourceGovernor.

00089 {
00090     pResourceGovernor = pResourceGovernorInit;
00091 }

SharedLogicalTxn ExecStreamGraphImpl::getTxn (  )  [virtual]

Returns:
the transaction within which this graph is executing

Implements ExecStreamGraph.

Definition at line 93 of file ExecStreamGraph.cpp.

References pTxn.

00094 {
00095     return pTxn;
00096 }

TxnId ExecStreamGraphImpl::getTxnId (  )  [virtual]

Returns:
the transaction ID for this graph

Implements ExecStreamGraph.

Definition at line 98 of file ExecStreamGraph.cpp.

References allowDummyTxnId, FIRST_TXN_ID, and pTxn.

00099 {
00100     if (pTxn) {
00101         return pTxn->getTxnId();
00102     }
00103     assert(allowDummyTxnId);
00104     return FIRST_TXN_ID;
00105 }

void ExecStreamGraphImpl::enableDummyTxnId ( bool  enabled  )  [virtual]

Controls whether it is OK to call getTxnId without first calling setTxn.

Normally, this is a bad idea (since in that case getTxnId will return FIRST_TXN_ID as a dummy, which could lead to concurrency problems), but for non-transactional unit tests, this can be useful. Default is disabled.

Parameters:
enabled whether dummy txn ID's are enabled

Implements ExecStreamGraph.

Definition at line 107 of file ExecStreamGraph.cpp.

References allowDummyTxnId.

00108 {
00109     allowDummyTxnId = enabled;
00110 }

SharedExecStreamGovernor ExecStreamGraphImpl::getResourceGovernor (  )  [virtual]

Returns:
exec stream governor

Implements ExecStreamGraph.

Definition at line 112 of file ExecStreamGraph.cpp.

References pResourceGovernor.

Referenced by closeImpl().

00113 {
00114     return pResourceGovernor;
00115 }

void ExecStreamGraphImpl::prepare ( ExecStreamScheduler scheduler  )  [virtual]

Prepares this graph for execution.

Only called once (before first open) after all streams and dataflows have been defined.

Parameters:
scheduler ExecStreamScheduler which will execute this graph

Implements ExecStreamGraph.

Definition at line 455 of file ExecStreamGraph.cpp.

References bindStreamBufAccessors(), boost::edge_data, filteredGraph, graphRep, isPrepared, ExecStreamScheduler::newBufAccessor(), sortedStreams, and sortStreams().

00456 {
00457     isPrepared = true;
00458     sortStreams();
00459 
00460     // create buffer accessors for all explicit dataflow edges
00461     EdgeIterPair edges = boost::edges(filteredGraph);
00462     for (; edges.first != edges.second; edges.first++) {
00463         SharedExecStreamBufAccessor pBufAccessor = scheduler.newBufAccessor();
00464         boost::put(boost::edge_data,graphRep,*(edges.first),pBufAccessor);
00465     }
00466 
00467     // bind buffer accessors to streams
00468     std::for_each(
00469         sortedStreams.begin(),
00470         sortedStreams.end(),
00471         boost::bind(
00472             &ExecStreamGraphImpl::bindStreamBufAccessors,this,_1));
00473 }

void ExecStreamGraphImpl::open (  )  [virtual]

Opens execution on this graph.

A graph may be repeatedly closed and then reopened.

Implements ExecStreamGraph.

Definition at line 502 of file ExecStreamGraph.cpp.

References ExecStreamBufAccessor::clear(), filteredGraph, getBufAccessorFromEdge(), isOpen, ClosableObject::needsClose, openStream(), sortedStreams, and sortStreams().

00503 {
00504     permAssert(!isOpen);
00505     isOpen = true;
00506     needsClose = true;
00507 
00508     // clear all buffer accessors
00509     EdgeIterPair edges = boost::edges(filteredGraph);
00510     for (; edges.first != edges.second; edges.first++) {
00511         ExecStreamBufAccessor &bufAccessor =
00512             getBufAccessorFromEdge(*(edges.first));
00513         bufAccessor.clear();
00514     }
00515 
00516     // open streams in dataflow order (from producers to consumers)
00517     if (sortedStreams.empty()) {
00518         // in case removeStream() was called after prepare
00519         sortStreams();
00520     }
00521     std::for_each(
00522         sortedStreams.begin(),
00523         sortedStreams.end(),
00524         boost::bind(&ExecStreamGraphImpl::openStream,this,_1));
00525 }

void ExecStreamGraphImpl::addStream ( SharedExecStream  pStream  )  [virtual]

Adds a stream to this graph.

Parameters:
pStream stream to add

Implements ExecStreamGraph.

Definition at line 161 of file ExecStreamGraph.cpp.

References addVertex().

00163 {
00164     (void) addVertex(pStream);
00165 }

void ExecStreamGraphImpl::removeStream ( ExecStreamId   )  [virtual]

Removes a stream from the graph: deletes the edges, and puts the vertex on a free list to be reallocated.

Does not free the ExecStream or its ExecStreamBufAccessors.

Implements ExecStreamGraph.

Definition at line 167 of file ExecStreamGraph.cpp.

References freeVertex(), getStreamFromVertex(), graphRep, removeFromStreamOutMap(), sortedStreams, and streamMap.

00168 {
00169     Vertex v = boost::vertices(graphRep).first[id];
00170     SharedExecStream pStream = getStreamFromVertex(v);
00171     permAssert(pStream->pGraph == this);
00172     permAssert(pStream->id == id);
00173 
00174     streamMap.erase(pStream->getName());
00175     removeFromStreamOutMap(pStream);
00176     sortedStreams.clear();              // invalidate list: recreated on demand
00177     freeVertex(v);
00178     // stream is now detached from any graph, and not usable.
00179     pStream->pGraph = 0;
00180     pStream->id = 0;
00181 }

void ExecStreamGraphImpl::addDataflow ( ExecStreamId  producerId,
ExecStreamId  consumerId,
bool  isImplicit = false 
) [virtual]

Defines a dataflow relationship between two streams in this graph.

Parameters:
producerId ID of producer stream in this graph
consumerId ID of consumer stream in this graph
isImplicit false (the default) if the edge represents direct dataflow; true if the edge represents an implicit dataflow dependency

Implements ExecStreamGraph.

Definition at line 216 of file ExecStreamGraph.cpp.

References graphRep.

Referenced by interposeStream().

00220 {
00221     Edge newEdge =
00222         boost::add_edge(producerId, consumerId, graphRep).first;
00223     boost::put(
00224         boost::edge_weight,
00225         graphRep,
00226         newEdge,
00227         isImplicit ? 0 : 1);
00228 }

void ExecStreamGraphImpl::addOutputDataflow ( ExecStreamId  producerId  )  [virtual]

Defines a dataflow representing external output produced by this graph.

Parameters:
producerId ID of producer stream in this graph

Implements ExecStreamGraph.

Definition at line 230 of file ExecStreamGraph.cpp.

References graphRep, and newVertex().

00232 {
00233     Vertex consumerId = newVertex();
00234     Edge newEdge =
00235         boost::add_edge(producerId, consumerId, graphRep).first;
00236     boost::put(
00237         boost::edge_weight,
00238         graphRep,
00239         newEdge,
00240         1);
00241 }

void ExecStreamGraphImpl::addInputDataflow ( ExecStreamId  consumerId  )  [virtual]

Defines a dataflow representing external input consumed by this graph.

Parameters:
consumerId ID of consumer stream in this graph

Implements ExecStreamGraph.

Definition at line 243 of file ExecStreamGraph.cpp.

References graphRep, and newVertex().

00245 {
00246     Vertex producerId = newVertex();
00247     Edge newEdge =
00248         boost::add_edge(producerId, consumerId, graphRep).first;
00249     boost::put(
00250         boost::edge_weight,
00251         graphRep,
00252         newEdge,
00253         1);
00254 }

void ExecStreamGraphImpl::mergeFrom ( ExecStreamGraph src  )  [virtual]

Adds all the vertices and edges from another graph.

Assumes the graphs are disjoint, and that both have been prepared. The two graphs are both open, or else both closed.

Parameters:
src the other graph, which is left empty.

Implements ExecStreamGraph.

Definition at line 262 of file ExecStreamGraph.cpp.

References mergeFrom().

00263 {
00264     if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) {
00265         mergeFrom(*p);
00266         return;
00267     }
00268     permFail("unknown subtype of ExecStreamGraph");
00269 }

void ExecStreamGraphImpl::mergeFrom ( ExecStreamGraph src,
std::vector< ExecStreamId > const &  nodes 
) [virtual]

Adds a subgraph, taken (removed) from another graph.

(Slower than mergeFrom(ExecStreamGraph&), which merges its entire source). Assumes the graphs are disjoint, and that both have been prepared. The two graphs are both open, or else both closed.

Parameters:
src the source graph
nodes identifies source nodes.

Implements ExecStreamGraph.

Definition at line 271 of file ExecStreamGraph.cpp.

References mergeFrom().

00274 {
00275     if (ExecStreamGraphImpl *p = dynamic_cast<ExecStreamGraphImpl*>(&src)) {
00276         mergeFrom(*p, nodes);
00277         return;
00278     }
00279     permFail("unknown subtype of ExecStreamGraph");
00280 }

SharedExecStream ExecStreamGraphImpl::findStream ( std::string  name  )  [virtual]

Finds a stream by name.

Parameters:
name name of stream to find
Returns:
stream found

Implements ExecStreamGraph.

Definition at line 393 of file ExecStreamGraph.cpp.

References getStreamFromVertex(), and streamMap.

Referenced by addVertex(), and findLastStream().

00395 {
00396     StreamMapConstIter pPair = streamMap.find(name);
00397     if (pPair == streamMap.end()) {
00398         SharedExecStream nullStream;
00399         return nullStream;
00400     } else {
00401         return getStreamFromVertex(pPair->second);
00402     }
00403 }

SharedExecStream ExecStreamGraphImpl::findLastStream ( std::string  name,
uint  iOutput 
) [virtual]

Finds last stream known for name.

May be original stream or an adapter.

Parameters:
name name of stream to find
iOutput ordinal of output arc
Returns:
stream found

Implements ExecStreamGraph.

Definition at line 405 of file ExecStreamGraph.cpp.

References findStream(), getStreamFromVertex(), and streamOutMap.

Referenced by interposeStream().

00408 {
00409     EdgeMap::const_iterator pPair =
00410         streamOutMap.find(std::make_pair(name, iOutput));
00411     if (pPair == streamOutMap.end()) {
00412         return findStream(name);
00413     } else {
00414         return getStreamFromVertex(pPair->second);
00415     }
00416 }

void ExecStreamGraphImpl::interposeStream ( std::string  name,
uint  iOutput,
ExecStreamId  interposedId 
) [virtual]

Interposes an adapter stream.

In the process, creates a dataflow from last stream associated with name to the adapter stream.

Parameters:
name name of stream to adapt
iOutput ordinal of output of stream
interposedId ID of adapter stream within this graph

Implements ExecStreamGraph.

Definition at line 418 of file ExecStreamGraph.cpp.

References addDataflow(), findLastStream(), and streamOutMap.

00422 {
00423     SharedExecStream pLastStream = findLastStream(name, iOutput);
00424     permAssert(pLastStream.get());
00425     streamOutMap[std::make_pair(name, iOutput)] = interposedId;
00426     addDataflow(
00427         pLastStream->getStreamId(),
00428         interposedId,
00429         false);
00430 }

SharedExecStream ExecStreamGraphImpl::getStream ( ExecStreamId  id  )  [virtual]

Translates a stream ID to a stream pointer.

Parameters:
id ID of a stream in this graph
Returns:
shared pointer to the stream

Implements ExecStreamGraph.

Definition at line 566 of file ExecStreamGraph.cpp.

References getStreamFromVertex(), and graphRep.

Referenced by ExecStreamGraphImpl::DotVertexRenderer::operator()().

00567 {
00568     Vertex v = boost::vertices(graphRep).first[id];
00569     return getStreamFromVertex(v);
00570 }

uint ExecStreamGraphImpl::getInputCount ( ExecStreamId  streamId  )  [virtual]

Determines number of explicit input flows consumed by a stream.

Parameters:
streamId ID of stream
Returns:
input count

Implements ExecStreamGraph.

Definition at line 572 of file ExecStreamGraph.cpp.

References filteredGraph, and graphRep.

00574 {
00575     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00576     return boost::in_degree(streamVertex,filteredGraph);
00577 }

uint ExecStreamGraphImpl::getOutputCount ( ExecStreamId  streamId  )  [virtual]

Determines number of explicit output flows produced by a stream.

Parameters:
streamId ID of stream
Returns:
output count

Implements ExecStreamGraph.

Definition at line 579 of file ExecStreamGraph.cpp.

References filteredGraph, and graphRep.

Referenced by removeFromStreamOutMap().

00581 {
00582     Vertex streamVertex = boost::vertices(graphRep).first[streamId];
00583     return boost::out_degree(streamVertex,filteredGraph);
00584 }

SharedExecStream ExecStreamGraphImpl::getStreamInput ( ExecStreamId  streamId,
uint  iInput 
) [virtual]

Accesses a stream's input.

Parameters:
streamId ID of stream
iInput 0-based input explicit flow ordinal
Returns:
upstream producer

Implements ExecStreamGraph.

Definition at line 598 of file ExecStreamGraph.cpp.

References getInputEdge(), getStreamFromVertex(), and graphRep.

00601 {
00602     Edge inputEdge = getInputEdge(streamId, iInput);
00603     Vertex inputVertex = boost::source(inputEdge,graphRep);
00604     return getStreamFromVertex(inputVertex);
00605 }

SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamInputAccessor ( ExecStreamId  streamId,
uint  iInput 
) [virtual]

Accesses a stream's input accessor.

Parameters:
streamId ID of stream
iInput 0-based input explicit flow ordinal
Returns:
accessor used by upstream producer

Implements ExecStreamGraph.

Definition at line 607 of file ExecStreamGraph.cpp.

References getInputEdge(), and getSharedBufAccessorFromEdge().

00610 {
00611     Edge inputEdge = getInputEdge(streamId, iInput);
00612     return getSharedBufAccessorFromEdge(inputEdge);
00613 }

SharedExecStream ExecStreamGraphImpl::getStreamOutput ( ExecStreamId  streamId,
uint  iOutput 
) [virtual]

Accesses a stream's output.

Parameters:
streamId ID of stream
iOutput 0-based output explicit flow ordinal
Returns:
downstream consumer

Implements ExecStreamGraph.

Definition at line 627 of file ExecStreamGraph.cpp.

References getOutputEdge(), getStreamFromVertex(), and graphRep.

00630 {
00631     Edge outputEdge = getOutputEdge(streamId, iOutput);
00632     Vertex outputVertex = boost::target(outputEdge,graphRep);
00633     return getStreamFromVertex(outputVertex);
00634 }

SharedExecStreamBufAccessor ExecStreamGraphImpl::getStreamOutputAccessor ( ExecStreamId  streamId,
uint  iOutput 
) [virtual]

Accesses a stream's output accessor.

Parameters:
streamId ID of stream
iOutput 0-based output explicit flow ordinal
Returns:
accessor used by downstream consumer

Implements ExecStreamGraph.

Definition at line 636 of file ExecStreamGraph.cpp.

References getOutputEdge(), and getSharedBufAccessorFromEdge().

00639 {
00640     Edge outputEdge = getOutputEdge(streamId, iOutput);
00641     return getSharedBufAccessorFromEdge(outputEdge);
00642 }

std::vector< SharedExecStream > ExecStreamGraphImpl::getSortedStreams (  )  [virtual]

Gets streams, sorted topologically.

Can only be called after prepare.

Returns:
vector of sorted streams

Implements ExecStreamGraph.

Definition at line 644 of file ExecStreamGraph.cpp.

References isPrepared, sortedStreams, and sortStreams().

00645 {
00646     permAssert(isPrepared);
00647     if (sortedStreams.empty()) {
00648         sortStreams();
00649     }
00650     return sortedStreams;
00651 }

int ExecStreamGraphImpl::getStreamCount (  )  [virtual]

Returns:
the number of streams in the graph; can only be called after prepare.

Implements ExecStreamGraph.

Definition at line 134 of file ExecStreamGraph.cpp.

References freeVertices, and graphRep.

00135 {
00136     return boost::num_vertices(graphRep) - freeVertices.size();
00137 }

int ExecStreamGraphImpl::getDataflowCount (  )  [virtual]

Returns:
the number of dataflows (edges) in the graph; can only be called after prepare.

Implements ExecStreamGraph.

Definition at line 257 of file ExecStreamGraph.cpp.

References graphRep.

00258 {
00259     return boost::num_edges(graphRep);
00260 }

void ExecStreamGraphImpl::renderGraphviz ( std::ostream &  dotStream  )  [virtual]

Renders the graph in the .dot format defined by Graphviz.

Parameters:
dotStream ostream on which to write .dot representation

Implements ExecStreamGraph.

Definition at line 734 of file ExecStreamGraph.cpp.

References graphRep.

00735 {
00736     boost::write_graphviz(
00737         dotStream,
00738         graphRep,
00739         DotVertexRenderer(*this),
00740         DotEdgeRenderer(*this),
00741         DotGraphRenderer());
00742 }

bool ExecStreamGraphImpl::isAcyclic (  )  [virtual]

Returns:
true if graph has no cycles

Implements ExecStreamGraph.

Definition at line 653 of file ExecStreamGraph.cpp.

References graphRep.

00654 {
00655     int numVertices = boost::num_vertices(graphRep);
00656 
00657     // if # strong components is < # vertices, then there must be at least
00658     // one cycle
00659     std::vector<int> component(numVertices);
00660     int nStrongComps = boost::strong_components(graphRep, &component[0]);
00661     return (nStrongComps >= numVertices);
00662 }

void ExecStreamGraphImpl::closeProducers ( ExecStreamId  streamId  )  [virtual]

Closes the producers of a stream with a given id.

Parameters:
streamId stream id of the stream whose producers will be closed

Implements ExecStreamGraph.

Definition at line 744 of file ExecStreamGraph.cpp.

References getStreamFromVertex(), and graphRep.

Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), and BarrierExecStream::execute().

00745 {
00746     FgInEdgeIterPair inEdges =
00747         boost::in_edges(streamId, graphRep);
00748     for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00749         Edge edge = *(inEdges.first);
00750         // move streamId upstream
00751         streamId = boost::source(edge,graphRep);
00752         // close the producers of this stream before closing the stream
00753         // itself, but only if it's possible to early close the stream
00754         SharedExecStream pStream = getStreamFromVertex(streamId);
00755         if (!pStream->canEarlyClose()) {
00756             continue;
00757         }
00758         closeProducers(streamId);
00759         pStream->close();
00760     }
00761 }

void ExecStreamGraphImpl::declareDynamicParamWriter ( ExecStreamId  streamId,
DynamicParamId  dynamicParamId 
) [virtual]

Declares that a given stream writes a given dynamic parameter.

Parameters:
streamId Stream id
dynamicParamId Dynamic parameter id

Implements ExecStreamGraph.

Definition at line 763 of file ExecStreamGraph.cpp.

References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::writerStreamIds.

00766 {
00767     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00768     info.writerStreamIds.push_back(streamId);
00769 }

void ExecStreamGraphImpl::declareDynamicParamReader ( ExecStreamId  streamId,
DynamicParamId  dynamicParamId 
) [virtual]

Declares that a given stream reads a given dynamic parameter.

Parameters:
streamId Stream id
dynamicParamId Dynamic parameter id

Implements ExecStreamGraph.

Definition at line 771 of file ExecStreamGraph.cpp.

References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::readerStreamIds.

00774 {
00775     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00776     info.readerStreamIds.push_back(streamId);
00777 }

const std::vector< ExecStreamId > & ExecStreamGraphImpl::getDynamicParamWriters ( DynamicParamId  dynamicParamId  )  [virtual]

Returns a list of stream ids that write a given dynamic parameter.

Parameters:
dynamicParamId Dynamic parameter id
Returns:
List of ids of streams that write the parameter

Implements ExecStreamGraph.

Definition at line 779 of file ExecStreamGraph.cpp.

References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::writerStreamIds.

00781 {
00782     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00783     return info.writerStreamIds;
00784 }

const std::vector< ExecStreamId > & ExecStreamGraphImpl::getDynamicParamReaders ( DynamicParamId  dynamicParamId  )  [virtual]

Returns a list of stream ids that read a given dynamic parameter.

Parameters:
dynamicParamId Dynamic parameter id
Returns:
List of ids of streams that read the parameter

Implements ExecStreamGraph.

Definition at line 786 of file ExecStreamGraph.cpp.

References dynamicParamMap, and ExecStreamGraphImpl::DynamicParamInfo::readerStreamIds.

00788 {
00789     DynamicParamInfo &info = dynamicParamMap[dynamicParamId];
00790     return info.readerStreamIds;
00791 }

SharedExecStreamGraph ExecStreamGraph::newExecStreamGraph (  )  [static, inherited]

Constructs a new ExecStreamGraph.

Returns:
new graph

Definition at line 43 of file ExecStreamGraph.cpp.

Referenced by ExecStreamTestBase::newStreamGraph(), and CmdInterpreter::visit().

00044 {
00045     return SharedExecStreamGraph(
00046         new ExecStreamGraphImpl(),
00047         ClosableObjectDestructor());
00048 }

ExecStreamScheduler * ExecStreamGraph::getScheduler (  )  const [inline, inherited]

Returns:
pointer to executing scheduler, or null if there is none.

Definition at line 444 of file ExecStreamGraph.h.

References ExecStreamGraph::pScheduler.

Referenced by ExecStream::checkAbort(), JavaSinkExecStream::execute(), CorrelationJoinExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), MergeExecStream::open(), CorrelationJoinExecStream::open(), and JavaSinkExecStream::stuffByteBuffer().

00445 {
00446     return pScheduler;
00447 }

SharedDynamicParamManager ExecStreamGraph::getDynamicParamManager (  )  [inline, inherited]

Returns:
reference to the DynamicParamManager for this graph.

Definition at line 449 of file ExecStreamGraph.h.

References ExecStreamGraph::pDynamicParamManager.

Referenced by ExecStream::prepare().

00450 {
00451     return pDynamicParamManager;
00452 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }


Member Data Documentation

FullGraphRep ExecStreamGraphImpl::graphRep [protected]

Definition at line 122 of file ExecStreamGraphImpl.h.

Referenced by addDataflow(), addInputDataflow(), addOutputDataflow(), addVertex(), clear(), closeProducers(), freeVertex(), getDataflowCount(), getFullGraphRep(), getInputCount(), getInputEdge(), getOutputCount(), getOutputEdge(), getSharedBufAccessorFromEdge(), getStream(), getStreamCount(), getStreamFromVertex(), getStreamInput(), getStreamOutput(), isAcyclic(), mergeFrom(), newVertex(), prepare(), removeStream(), renderGraphviz(), and sortStreams().

GraphRep ExecStreamGraphImpl::filteredGraph [protected]

Definition at line 124 of file ExecStreamGraphImpl.h.

Referenced by bindStreamBufAccessors(), getGraphRep(), getInputCount(), getInputEdge(), getOutputCount(), getOutputEdge(), open(), and prepare().

std::vector<Vertex> ExecStreamGraphImpl::freeVertices [protected]

List of freed vertices.

Definition at line 133 of file ExecStreamGraphImpl.h.

Referenced by freeVertex(), getStreamCount(), and newVertex().

StreamMap ExecStreamGraphImpl::streamMap [protected]

Map of name to stream.

Definition at line 143 of file ExecStreamGraphImpl.h.

Referenced by addVertex(), clear(), findStream(), and removeStream().

EdgeMap ExecStreamGraphImpl::streamOutMap [protected]

Map of name and output arc to stream output, after add-ons.

Definition at line 148 of file ExecStreamGraphImpl.h.

Referenced by clear(), findLastStream(), interposeStream(), and removeFromStreamOutMap().

std::vector<SharedExecStream> ExecStreamGraphImpl::sortedStreams [protected]

Result of topologically sorting graph (producers before consumers).

Definition at line 153 of file ExecStreamGraphImpl.h.

Referenced by clear(), closeImpl(), getSortedStreams(), mergeFrom(), open(), prepare(), removeStream(), and sortStreams().

SharedLogicalTxn ExecStreamGraphImpl::pTxn [protected]

Transaction being executed.

Definition at line 158 of file ExecStreamGraphImpl.h.

Referenced by closeImpl(), getTxn(), getTxnId(), and setTxn().

SharedErrorTarget ExecStreamGraphImpl::pErrorTarget [protected]

Target for row errors.

Definition at line 163 of file ExecStreamGraphImpl.h.

Referenced by openStream(), and setErrorTarget().

SharedSegment ExecStreamGraphImpl::pScratchSegment [protected]

Source for scratch buffers.

Definition at line 168 of file ExecStreamGraphImpl.h.

Referenced by closeImpl(), and setScratchSegment().

SharedExecStreamGovernor ExecStreamGraphImpl::pResourceGovernor [protected]

Resource governor.

Definition at line 173 of file ExecStreamGraphImpl.h.

Referenced by getResourceGovernor(), and setResourceGovernor().

bool ExecStreamGraphImpl::isOpen [protected]

Whether this graph is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a graph needs to be closed before destruction if it has been prepared but never opened.

Definition at line 181 of file ExecStreamGraphImpl.h.

Referenced by clear(), closeImpl(), ExecStreamGraphImpl(), mergeFrom(), and open().

bool ExecStreamGraphImpl::isPrepared [protected]

Whether this graph has been prepared.

Definition at line 186 of file ExecStreamGraphImpl.h.

Referenced by clear(), ExecStreamGraphImpl(), getSortedStreams(), mergeFrom(), and prepare().

bool ExecStreamGraphImpl::doDataflowClose [protected]

Whether to close this graph in dataflow order (producers to consumers).

Definition at line 191 of file ExecStreamGraphImpl.h.

Referenced by closeImpl(), and ExecStreamGraphImpl().

std::map<DynamicParamId, DynamicParamInfo> ExecStreamGraphImpl::dynamicParamMap [protected]

Information on readers and writers of dynamic parameters.

Definition at line 203 of file ExecStreamGraphImpl.h.

Referenced by declareDynamicParamReader(), declareDynamicParamWriter(), getDynamicParamReaders(), and getDynamicParamWriters().

bool ExecStreamGraphImpl::allowDummyTxnId [protected]

Whether to allow execution without a real transaction.

Definition at line 208 of file ExecStreamGraphImpl.h.

Referenced by enableDummyTxnId(), ExecStreamGraphImpl(), and getTxnId().

ExecStreamScheduler* ExecStreamGraph::pScheduler [protected, inherited]

A Scheduler responsible for executing streams in this graph.

(Can be null if the current graph is only building streams, not executing them.) Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 73 of file ExecStreamGraph.h.

Referenced by ExecStreamGraph::getScheduler().

SharedDynamicParamManager ExecStreamGraph::pDynamicParamManager [protected, inherited]

Manager that handles dynamic parameters for this graph.

Definition at line 78 of file ExecStreamGraph.h.

Referenced by closeImpl(), and ExecStreamGraph::getDynamicParamManager().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:30 2009 for Fennel by  doxygen 1.5.1