00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef Fennel_ExecStreamGraphImpl_Included
00025 #define Fennel_ExecStreamGraphImpl_Included
00026
00027 #include "fennel/exec/ExecStreamGraph.h"
00028
00029 #include <vector>
00030 #include <boost/property_map.hpp>
00031 #include <boost/graph/adjacency_list.hpp>
00032 #include <boost/graph/properties.hpp>
00033 #include <boost/graph/filtered_graph.hpp>
00034
00035
00036 namespace boost
00037 {
00038 enum vertex_data_t { vertex_data };
00039 enum edge_data_t { edge_data };
00040 BOOST_INSTALL_PROPERTY(vertex,data);
00041 BOOST_INSTALL_PROPERTY(edge,data);
00042 }
00043
00044 FENNEL_BEGIN_NAMESPACE
00045
00050 class FENNEL_EXEC_EXPORT ExecStreamGraphImpl
00051 : virtual public ExecStreamGraph
00052 {
00053 public:
00054 typedef boost::adjacency_list<
00055 boost::vecS,
00056 boost::vecS,
00057 boost::bidirectionalS,
00058 boost::property<boost::vertex_data_t,SharedExecStream>,
00059 boost::property<
00060 boost::edge_data_t,SharedExecStreamBufAccessor,
00061 boost::property<boost::edge_weight_t,int> > >
00062 FullGraphRep;
00063
00064 typedef boost::graph_traits<FullGraphRep>::vertex_descriptor Vertex;
00065 typedef boost::graph_traits<FullGraphRep>::edge_descriptor Edge;
00066
00067 typedef boost::graph_traits<FullGraphRep>::vertex_iterator FgVertexIter;
00068 typedef boost::graph_traits<FullGraphRep>::edge_iterator FgEdgeIter;
00069 typedef boost::graph_traits<FullGraphRep>::out_edge_iterator FgOutEdgeIter;
00070 typedef boost::graph_traits<FullGraphRep>::in_edge_iterator FgInEdgeIter;
00071 typedef std::pair<FgVertexIter,FgVertexIter> FgVertexIterPair;
00072 typedef std::pair<FgEdgeIter,FgEdgeIter> FgEdgeIterPair;
00073 typedef std::pair<FgOutEdgeIter,FgOutEdgeIter> FgOutEdgeIterPair;
00074 typedef std::pair<FgInEdgeIter,FgInEdgeIter> FgInEdgeIterPair;
00075
00076 typedef boost::property_map<FullGraphRep, boost::edge_weight_t>::type
00077 EdgeWeightMap;
00078
00079 struct ExplicitEdgePredicate
00080 {
00081 EdgeWeightMap weightMap;
00082
00083
00084
00085
00086 ExplicitEdgePredicate()
00087 {
00088 }
00089
00090 ExplicitEdgePredicate(EdgeWeightMap weightMapInit)
00091 : weightMap(weightMapInit)
00092 {
00093 }
00094
00095 bool operator () (Edge const &edge) const
00096 {
00097 return boost::get(weightMap, edge) > 0;
00098 }
00099 };
00100
00101 typedef boost::filtered_graph<FullGraphRep, ExplicitEdgePredicate>
00102 GraphRep;
00103 typedef boost::graph_traits<GraphRep>::vertex_iterator VertexIter;
00104 typedef boost::graph_traits<GraphRep>::edge_iterator EdgeIter;
00105 typedef boost::graph_traits<GraphRep>::out_edge_iterator OutEdgeIter;
00106 typedef boost::graph_traits<GraphRep>::in_edge_iterator InEdgeIter;
00107 typedef std::pair<VertexIter,VertexIter> VertexIterPair;
00108 typedef std::pair<EdgeIter,EdgeIter> EdgeIterPair;
00109 typedef std::pair<OutEdgeIter,OutEdgeIter> OutEdgeIterPair;
00110 typedef std::pair<InEdgeIter,InEdgeIter> InEdgeIterPair;
00111
00112
00113 protected:
00114
00115
00116
00117
00118
00119
00120
00121
00122 FullGraphRep graphRep;
00123
00124 GraphRep filteredGraph;
00125
00126 typedef std::map<std::string,ExecStreamId> StreamMap;
00127 typedef StreamMap::const_iterator StreamMapConstIter;
00128 typedef std::map<std::pair<std::string, uint>,ExecStreamId> EdgeMap;
00129
00130
00131 class DotGraphRenderer;
00132 class DotVertexRenderer;
00133 class DotEdgeRenderer;
00134
00138 std::vector<Vertex> freeVertices;
00139
00143 StreamMap streamMap;
00144
00148 EdgeMap streamOutMap;
00149
00153 std::vector<SharedExecStream> sortedStreams;
00154
00158 SharedLogicalTxn pTxn;
00159
00163 SharedErrorTarget pErrorTarget;
00164
00168 SharedSegment pScratchSegment;
00169
00173 SharedExecStreamGovernor pResourceGovernor;
00174
00181 bool isOpen;
00182
00186 bool isPrepared;
00187
00191 bool doDataflowClose;
00192
00193 class DynamicParamInfo
00194 {
00195 public:
00196 std::vector<ExecStreamId> readerStreamIds;
00197 std::vector<ExecStreamId> writerStreamIds;
00198 };
00199
00203 std::map<DynamicParamId, DynamicParamInfo> dynamicParamMap;
00204
00208 bool allowDummyTxnId;
00209
00210 virtual void closeImpl();
00211 virtual void sortStreams();
00212 virtual void openStream(SharedExecStream pStream);
00213 virtual void bindStreamBufAccessors(SharedExecStream pStream);
00214 virtual void mergeFrom(ExecStreamGraphImpl& src);
00215 virtual void mergeFrom(
00216 ExecStreamGraphImpl& src,
00217 std::vector<ExecStreamId> const& nodes);
00218
00221 virtual void clear();
00223 virtual Vertex addVertex(SharedExecStream pStream);
00224
00225
00227 Vertex newVertex();
00229 void freeVertex(Vertex);
00230
00232 void removeFromStreamOutMap(SharedExecStream);
00233
00234 virtual Edge getInputEdge(ExecStreamId stream, uint iInput);
00235 virtual Edge getOutputEdge(ExecStreamId stream, uint iOutput);
00236
00237 public:
00238 explicit ExecStreamGraphImpl();
00239 virtual ~ExecStreamGraphImpl() {}
00240
00241 inline GraphRep const &getGraphRep();
00242 inline FullGraphRep const &getFullGraphRep();
00243 inline SharedExecStream getStreamFromVertex(Vertex);
00244 inline SharedExecStreamBufAccessor &getSharedBufAccessorFromEdge(Edge);
00245 inline ExecStreamBufAccessor &getBufAccessorFromEdge(Edge);
00246
00247
00248 virtual void setTxn(SharedLogicalTxn pTxn);
00249 virtual void setErrorTarget(SharedErrorTarget pErrorTarget);
00250 virtual void setScratchSegment(
00251 SharedSegment pScratchSegment);
00252 virtual void setResourceGovernor(
00253 SharedExecStreamGovernor pResourceGovernor);
00254 virtual SharedLogicalTxn getTxn();
00255 virtual TxnId getTxnId();
00256 virtual void enableDummyTxnId(bool enabled);
00257 virtual SharedExecStreamGovernor getResourceGovernor();
00258 virtual void prepare(ExecStreamScheduler &scheduler);
00259 virtual void open();
00260 virtual void addStream(SharedExecStream pStream);
00261 virtual void removeStream(ExecStreamId);
00262 virtual void addDataflow(
00263 ExecStreamId producerId,
00264 ExecStreamId consumerId,
00265 bool isImplicit = false);
00266 virtual void addOutputDataflow(
00267 ExecStreamId producerId);
00268 virtual void addInputDataflow(
00269 ExecStreamId consumerId);
00270 virtual void mergeFrom(ExecStreamGraph& src);
00271 virtual void mergeFrom(
00272 ExecStreamGraph& src,
00273 std::vector<ExecStreamId> const& nodes);
00274 virtual SharedExecStream findStream(
00275 std::string name);
00276 virtual SharedExecStream findLastStream(
00277 std::string name,
00278 uint iOutput);
00279 virtual void interposeStream(
00280 std::string name,
00281 uint iOutput,
00282 ExecStreamId interposedId);
00283 virtual SharedExecStream getStream(ExecStreamId id);
00284 virtual uint getInputCount(
00285 ExecStreamId streamId);
00286 virtual uint getOutputCount(
00287 ExecStreamId streamId);
00288 virtual SharedExecStream getStreamInput(
00289 ExecStreamId streamId,
00290 uint iInput);
00291 virtual SharedExecStreamBufAccessor getStreamInputAccessor(
00292 ExecStreamId streamId,
00293 uint iInput);
00294 virtual SharedExecStream getStreamOutput(
00295 ExecStreamId streamId,
00296 uint iOutput);
00297 virtual SharedExecStreamBufAccessor getStreamOutputAccessor(
00298 ExecStreamId streamId,
00299 uint iOutput);
00300 virtual std::vector<SharedExecStream> getSortedStreams();
00301 virtual int getStreamCount();
00302 virtual int getDataflowCount();
00303 virtual void renderGraphviz(std::ostream &dotStream);
00304 virtual bool isAcyclic();
00305 virtual void closeProducers(ExecStreamId streamId);
00306 virtual void declareDynamicParamWriter(
00307 ExecStreamId streamId,
00308 DynamicParamId dynamicParamId);
00309 virtual void declareDynamicParamReader(
00310 ExecStreamId streamId,
00311 DynamicParamId dynamicParamId);
00312 virtual const std::vector<ExecStreamId> &getDynamicParamWriters(
00313 DynamicParamId dynamicParamId);
00314 virtual const std::vector<ExecStreamId> &getDynamicParamReaders(
00315 DynamicParamId dynamicParamId);
00316 };
00317
00318 inline ExecStreamGraphImpl::GraphRep const &
00319 ExecStreamGraphImpl::getGraphRep()
00320 {
00321 return filteredGraph;
00322 }
00323
00324 inline ExecStreamGraphImpl::FullGraphRep const &
00325 ExecStreamGraphImpl::getFullGraphRep()
00326 {
00327 return graphRep;
00328 }
00329
00330 inline SharedExecStream ExecStreamGraphImpl::getStreamFromVertex(
00331 Vertex vertex)
00332 {
00333 return boost::get(boost::vertex_data,graphRep)[vertex];
00334 }
00335
00336 inline SharedExecStreamBufAccessor &
00337 ExecStreamGraphImpl::getSharedBufAccessorFromEdge(
00338 Edge edge)
00339 {
00340 return boost::get(boost::edge_data,graphRep)[edge];
00341 }
00342
00343 inline ExecStreamBufAccessor &ExecStreamGraphImpl::getBufAccessorFromEdge(
00344 Edge edge)
00345 {
00346 return *(getSharedBufAccessorFromEdge(edge));
00347 }
00348
00349 FENNEL_END_NAMESPACE
00350
00351 #endif
00352
00353