ExecStreamGraphImpl.h

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ExecStreamGraphImpl.h#28 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #ifndef Fennel_ExecStreamGraphImpl_Included
00025 #define Fennel_ExecStreamGraphImpl_Included
00026 
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 
00029 #include <vector>
00030 #include <boost/property_map.hpp>
00031 #include <boost/graph/adjacency_list.hpp>
00032 #include <boost/graph/properties.hpp>
00033 #include <boost/graph/filtered_graph.hpp>
00034 
00035 // REVIEW:  can this be pulled into fennel namespace somehow?
00036 namespace boost
00037 {
00038 enum vertex_data_t { vertex_data };
00039 enum edge_data_t { edge_data };
00040 BOOST_INSTALL_PROPERTY(vertex,data);
00041 BOOST_INSTALL_PROPERTY(edge,data);
00042 }
00043 
00044 FENNEL_BEGIN_NAMESPACE
00045 
00050 class FENNEL_EXEC_EXPORT ExecStreamGraphImpl
00051     : virtual public ExecStreamGraph
00052 {
00053 public:
00054     typedef boost::adjacency_list<
00055         boost::vecS,
00056         boost::vecS,
00057         boost::bidirectionalS,
00058         boost::property<boost::vertex_data_t,SharedExecStream>,
00059         boost::property<
00060         boost::edge_data_t,SharedExecStreamBufAccessor,
00061         boost::property<boost::edge_weight_t,int> > >
00062     FullGraphRep;
00063 
00064     typedef boost::graph_traits<FullGraphRep>::vertex_descriptor Vertex;
00065     typedef boost::graph_traits<FullGraphRep>::edge_descriptor Edge;
00066 
00067     typedef boost::graph_traits<FullGraphRep>::vertex_iterator FgVertexIter;
00068     typedef boost::graph_traits<FullGraphRep>::edge_iterator FgEdgeIter;
00069     typedef boost::graph_traits<FullGraphRep>::out_edge_iterator FgOutEdgeIter;
00070     typedef boost::graph_traits<FullGraphRep>::in_edge_iterator FgInEdgeIter;
00071     typedef std::pair<FgVertexIter,FgVertexIter> FgVertexIterPair;
00072     typedef std::pair<FgEdgeIter,FgEdgeIter> FgEdgeIterPair;
00073     typedef std::pair<FgOutEdgeIter,FgOutEdgeIter> FgOutEdgeIterPair;
00074     typedef std::pair<FgInEdgeIter,FgInEdgeIter> FgInEdgeIterPair;
00075 
00076     typedef boost::property_map<FullGraphRep, boost::edge_weight_t>::type
00077         EdgeWeightMap;
00078 
00079     struct ExplicitEdgePredicate
00080     {
00081         EdgeWeightMap weightMap;
00082 
00083         // NOTE jvs 6-Jan-2006:  Lack of keyword "explicit" on constructors
00084         // here is intentional.
00085 
00086         ExplicitEdgePredicate()
00087         {
00088         }
00089 
00090         ExplicitEdgePredicate(EdgeWeightMap weightMapInit)
00091             : weightMap(weightMapInit)
00092         {
00093         }
00094 
00095         bool operator () (Edge const &edge) const
00096         {
00097             return boost::get(weightMap, edge) > 0;
00098         }
00099     };
00100 
00101     typedef boost::filtered_graph<FullGraphRep, ExplicitEdgePredicate>
00102         GraphRep;
00103     typedef boost::graph_traits<GraphRep>::vertex_iterator VertexIter;
00104     typedef boost::graph_traits<GraphRep>::edge_iterator EdgeIter;
00105     typedef boost::graph_traits<GraphRep>::out_edge_iterator OutEdgeIter;
00106     typedef boost::graph_traits<GraphRep>::in_edge_iterator InEdgeIter;
00107     typedef std::pair<VertexIter,VertexIter> VertexIterPair;
00108     typedef std::pair<EdgeIter,EdgeIter> EdgeIterPair;
00109     typedef std::pair<OutEdgeIter,OutEdgeIter> OutEdgeIterPair;
00110     typedef std::pair<InEdgeIter,InEdgeIter> InEdgeIterPair;
00111 
00112 
00113 protected:
00114 
00115     // NOTE jvs 8-Jan-2007:  We maintain two boost graphs;
00116     // graphRep is the "full" graph, including both implicit
00117     // and explicit dataflow edges; filteredGraph is a subgraph
00118     // view selecting just the explicit dataflows.  Code which
00119     // accesses the graph needs to decide which view it wants
00120     // and use the corresponding iterators.
00121 
00122     FullGraphRep graphRep;
00123 
00124     GraphRep filteredGraph;
00125 
00126     typedef std::map<std::string,ExecStreamId> StreamMap;
00127     typedef StreamMap::const_iterator StreamMapConstIter;
00128     typedef std::map<std::pair<std::string, uint>,ExecStreamId> EdgeMap;
00129 
00130     // nested classes for implementing renderGraphviz
00131     class DotGraphRenderer;
00132     class DotVertexRenderer;
00133     class DotEdgeRenderer;
00134 
00138     std::vector<Vertex> freeVertices;
00139 
00143     StreamMap streamMap;
00144 
00148     EdgeMap streamOutMap;
00149 
00153     std::vector<SharedExecStream> sortedStreams;
00154 
00158     SharedLogicalTxn pTxn;
00159 
00163     SharedErrorTarget pErrorTarget;
00164 
00168     SharedSegment pScratchSegment;
00169 
00173     SharedExecStreamGovernor pResourceGovernor;
00174 
00181     bool isOpen;
00182 
00186     bool isPrepared;
00187 
00191     bool doDataflowClose;
00192 
00193     class DynamicParamInfo
00194     {
00195     public:
00196       std::vector<ExecStreamId> readerStreamIds;
00197       std::vector<ExecStreamId> writerStreamIds;
00198     };
00199 
00203     std::map<DynamicParamId, DynamicParamInfo> dynamicParamMap;
00204 
00208     bool allowDummyTxnId;
00209 
00210     virtual void closeImpl();
00211     virtual void sortStreams();
00212     virtual void openStream(SharedExecStream pStream);
00213     virtual void bindStreamBufAccessors(SharedExecStream pStream);
00214     virtual void mergeFrom(ExecStreamGraphImpl& src);
00215     virtual void mergeFrom(
00216         ExecStreamGraphImpl& src,
00217         std::vector<ExecStreamId> const& nodes);
00218 
00221     virtual void clear();
00223     virtual Vertex addVertex(SharedExecStream pStream);
00224 
00225     // manage the free list
00227     Vertex newVertex();
00229     void freeVertex(Vertex);
00230 
00232     void removeFromStreamOutMap(SharedExecStream);
00233 
00234     virtual Edge getInputEdge(ExecStreamId stream, uint iInput);
00235     virtual Edge getOutputEdge(ExecStreamId stream, uint iOutput);
00236 
00237 public:
00238     explicit ExecStreamGraphImpl();
00239     virtual ~ExecStreamGraphImpl() {}
00240 
00241     inline GraphRep const &getGraphRep();
00242     inline FullGraphRep const &getFullGraphRep();
00243     inline SharedExecStream getStreamFromVertex(Vertex);
00244     inline SharedExecStreamBufAccessor &getSharedBufAccessorFromEdge(Edge);
00245     inline ExecStreamBufAccessor &getBufAccessorFromEdge(Edge);
00246 
00247     // implement ExecStreamGraph
00248     virtual void setTxn(SharedLogicalTxn pTxn);
00249     virtual void setErrorTarget(SharedErrorTarget pErrorTarget);
00250     virtual void setScratchSegment(
00251         SharedSegment pScratchSegment);
00252     virtual void setResourceGovernor(
00253         SharedExecStreamGovernor pResourceGovernor);
00254     virtual SharedLogicalTxn getTxn();
00255     virtual TxnId getTxnId();
00256     virtual void enableDummyTxnId(bool enabled);
00257     virtual SharedExecStreamGovernor getResourceGovernor();
00258     virtual void prepare(ExecStreamScheduler &scheduler);
00259     virtual void open();
00260     virtual void addStream(SharedExecStream pStream);
00261     virtual void removeStream(ExecStreamId);
00262     virtual void addDataflow(
00263         ExecStreamId producerId,
00264         ExecStreamId consumerId,
00265         bool isImplicit = false);
00266     virtual void addOutputDataflow(
00267         ExecStreamId producerId);
00268     virtual void addInputDataflow(
00269         ExecStreamId consumerId);
00270     virtual void mergeFrom(ExecStreamGraph& src);
00271     virtual void mergeFrom(
00272         ExecStreamGraph& src,
00273         std::vector<ExecStreamId> const& nodes);
00274     virtual SharedExecStream findStream(
00275         std::string name);
00276     virtual SharedExecStream findLastStream(
00277         std::string name,
00278         uint iOutput);
00279     virtual void interposeStream(
00280         std::string name,
00281         uint iOutput,
00282         ExecStreamId interposedId);
00283     virtual SharedExecStream getStream(ExecStreamId id);
00284     virtual uint getInputCount(
00285         ExecStreamId streamId);
00286     virtual uint getOutputCount(
00287         ExecStreamId streamId);
00288     virtual SharedExecStream getStreamInput(
00289         ExecStreamId streamId,
00290         uint iInput);
00291     virtual SharedExecStreamBufAccessor getStreamInputAccessor(
00292         ExecStreamId streamId,
00293         uint iInput);
00294     virtual SharedExecStream getStreamOutput(
00295         ExecStreamId streamId,
00296         uint iOutput);
00297     virtual SharedExecStreamBufAccessor getStreamOutputAccessor(
00298         ExecStreamId streamId,
00299         uint iOutput);
00300     virtual std::vector<SharedExecStream> getSortedStreams();
00301     virtual int getStreamCount();
00302     virtual int getDataflowCount();
00303     virtual void renderGraphviz(std::ostream &dotStream);
00304     virtual bool isAcyclic();
00305     virtual void closeProducers(ExecStreamId streamId);
00306     virtual void declareDynamicParamWriter(
00307         ExecStreamId streamId,
00308         DynamicParamId dynamicParamId);
00309     virtual void declareDynamicParamReader(
00310         ExecStreamId streamId,
00311         DynamicParamId dynamicParamId);
00312     virtual const std::vector<ExecStreamId> &getDynamicParamWriters(
00313         DynamicParamId dynamicParamId);
00314     virtual const std::vector<ExecStreamId> &getDynamicParamReaders(
00315         DynamicParamId dynamicParamId);
00316 };
00317 
00318 inline ExecStreamGraphImpl::GraphRep const &
00319     ExecStreamGraphImpl::getGraphRep()
00320 {
00321     return filteredGraph;
00322 }
00323 
00324 inline ExecStreamGraphImpl::FullGraphRep const &
00325     ExecStreamGraphImpl::getFullGraphRep()
00326 {
00327     return graphRep;
00328 }
00329 
00330 inline SharedExecStream ExecStreamGraphImpl::getStreamFromVertex(
00331     Vertex vertex)
00332 {
00333     return boost::get(boost::vertex_data,graphRep)[vertex];
00334 }
00335 
00336 inline SharedExecStreamBufAccessor &
00337     ExecStreamGraphImpl::getSharedBufAccessorFromEdge(
00338         Edge edge)
00339 {
00340     return boost::get(boost::edge_data,graphRep)[edge];
00341 }
00342 
00343 inline ExecStreamBufAccessor &ExecStreamGraphImpl::getBufAccessorFromEdge(
00344     Edge edge)
00345 {
00346     return *(getSharedBufAccessorFromEdge(edge));
00347 }
00348 
00349 FENNEL_END_NAMESPACE
00350 
00351 #endif
00352 
00353 // End ExecStreamGraphImpl.h

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1