00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef Fennel_ExecStreamScheduler_Included
00025 #define Fennel_ExecStreamScheduler_Included
00026
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/common/TraceSource.h"
00029
00030 #include <boost/utility.hpp>
00031
00032 FENNEL_BEGIN_NAMESPACE
00033
00042 class FENNEL_EXEC_EXPORT ExecStreamScheduler
00043 : public boost::noncopyable,
00044 public virtual TraceSource
00045 {
00046 protected:
00047 bool tracingFine;
00048
00057 explicit ExecStreamScheduler(
00058 SharedTraceTarget pTraceTarget,
00059 std::string name);
00060
00070 inline ExecStreamResult executeStream(
00071 ExecStream &stream,
00072 ExecStreamQuantum const &quantum);
00073
00081 virtual void tracePreExecution(
00082 ExecStream &stream,
00083 ExecStreamQuantum const &quantum);
00084
00092 virtual void tracePostExecution(
00093 ExecStream &stream,
00094 ExecStreamResult rc);
00095
00108 virtual void traceStreamBuffers(
00109 ExecStream &stream,
00110 TraceLevel inputTupleTraceLevel,
00111 TraceLevel outputTupleTraceLevel);
00112
00113 public:
00114 virtual ~ExecStreamScheduler();
00115
00125 virtual void traceStreamBufferContents(
00126 ExecStream &stream,
00127 ExecStreamBufAccessor &bufAccessor,
00128 TraceLevel traceLevel);
00129
00137 virtual void addGraph(SharedExecStreamGraph pGraph);
00138
00146 virtual void removeGraph(SharedExecStreamGraph pGraph);
00147
00151 virtual void start() = 0;
00152
00160 inline void makeRunnable(ExecStream &stream);
00161
00167 virtual void setRunnable(
00168 ExecStream &stream,
00169 bool runnable) = 0;
00170
00179 virtual void abort(ExecStreamGraph &graph) = 0;
00180
00185 virtual void checkAbort() const;
00186
00191 virtual void stop() = 0;
00192
00199 virtual SharedExecStreamBufAccessor newBufAccessor();
00200
00210 virtual void createBufferProvisionAdapter(
00211 ExecStreamEmbryo &embryo);
00212
00222 virtual void createCopyProvisionAdapter(
00223 ExecStreamEmbryo &embryo);
00224
00233 virtual ExecStreamBufAccessor &readStream(
00234 ExecStream &stream) = 0;
00235
00240 virtual uint getDegreeOfParallelism();
00241 };
00242
00243 inline ExecStreamResult ExecStreamScheduler::executeStream(
00244 ExecStream &stream,
00245 ExecStreamQuantum const &quantum)
00246 {
00247 if (tracingFine) {
00248 tracePreExecution(stream, quantum);
00249 ExecStreamResult rc = stream.execute(quantum);
00250 tracePostExecution(stream, rc);
00251 return rc;
00252 } else {
00253 return stream.execute(quantum);
00254 }
00255 }
00256
00257 inline void ExecStreamScheduler::makeRunnable(
00258 ExecStream &stream)
00259 {
00260 setRunnable(stream, true);
00261 }
00262
00263 FENNEL_END_NAMESPACE
00264
00265 #endif
00266
00267