00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef Fennel_ParallelExecStreamScheduler_Included
00024 #define Fennel_ParallelExecStreamScheduler_Included
00025
00026 #include "fennel/exec/ExecStreamScheduler.h"
00027 #include "fennel/exec/ExecStreamGraphImpl.h"
00028 #include "fennel/synch/ThreadPool.h"
00029 #include "fennel/synch/SynchMonitoredObject.h"
00030 #include "fennel/common/FennelExcn.h"
00031
00032 #include <hash_map>
00033 #include <deque>
00034 #include <boost/scoped_ptr.hpp>
00035
00036 FENNEL_BEGIN_NAMESPACE
00037
00038 class ExecStreamGraphImpl;
00039 class ParallelExecStreamScheduler;
00040 class ThreadTracker;
00041
00046 class FENNEL_EXEC_EXPORT ParallelExecTask
00047 {
00048 ParallelExecStreamScheduler &scheduler;
00049 ExecStream *pStream;
00050
00051 public:
00052 explicit ParallelExecTask(
00053 ParallelExecStreamScheduler &scheduler,
00054 ExecStream *pStream);
00055
00056 inline ExecStreamId getStreamId() const
00057 {
00058 return pStream->getStreamId();
00059 }
00060
00061 void execute();
00062 };
00063
00068 class FENNEL_EXEC_EXPORT ParallelExecResult
00069 {
00070 ExecStreamId streamId;
00071 ExecStreamResult rc;
00072
00073 public:
00074 explicit ParallelExecResult(
00075 ExecStreamId streamId,
00076 ExecStreamResult rc);
00077
00078 inline ExecStreamId getStreamId() const
00079 {
00080 return streamId;
00081 }
00082
00083 inline ExecStreamResult getResultCode() const
00084 {
00085 return rc;
00086 }
00087 };
00088
00098 class FENNEL_EXEC_EXPORT ParallelExecStreamScheduler
00099 : public ExecStreamScheduler, public SynchMonitoredObject
00100 {
00101 enum StreamState
00102 {
00103 SS_SLEEPING,
00104 SS_RUNNING,
00105 SS_INHIBITED
00106 };
00107
00108 struct StreamStateMapEntry
00109 {
00110 StreamState state;
00111 int inhibitionCount;
00112 };
00113
00114 enum ManagerState {
00115 MGR_RUNNING,
00116 MGR_STOPPING,
00117 MGR_STOPPED
00118 };
00119
00120 typedef std::hash_map<ExecStreamId, StreamStateMapEntry>
00121 StreamStateMap;
00122 typedef std::deque<ExecStreamId> InhibitedQueue;
00123
00124 friend class ParallelExecTask;
00125
00126 SharedExecStreamGraph pGraph;
00127
00128 ThreadPool<ParallelExecTask> threadPool;
00129 std::deque<ParallelExecResult> completedQueue;
00130
00131 ThreadTracker &threadTracker;
00132
00133 StreamStateMap streamStateMap;
00134 ManagerState mgrState;
00135
00136 InhibitedQueue inhibitedQueue;
00137 InhibitedQueue transitQueue;
00138 LocalCondition sentinelCondition;
00139
00140 uint degreeOfParallelism;
00141
00142 boost::scoped_ptr<FennelExcn> pPendingExcn;
00143
00144 void tryExecuteManager();
00145 void executeManager();
00146 void tryExecuteTask(ExecStream &);
00147 void executeTask(ExecStream &);
00148 bool addToQueue(ExecStreamId streamId);
00149 void signalSentinel(ExecStreamId sentinelId);
00150 void retryInhibitedQueue();
00151 void processCompletedTask(ParallelExecResult const &task);
00152 inline bool isInhibited(ExecStreamId streamId);
00153 inline void alterNeighborInhibition(ExecStreamId streamId, int delta);
00154
00155 public:
00169 explicit ParallelExecStreamScheduler(
00170 SharedTraceTarget pTraceTarget,
00171 std::string name,
00172 ThreadTracker &threadTracker,
00173 uint degreeOfParallelism);
00174
00175 virtual ~ParallelExecStreamScheduler();
00176
00177
00178 virtual void addGraph(SharedExecStreamGraph pGraph);
00179 virtual void removeGraph(SharedExecStreamGraph pGraph);
00180 virtual void start();
00181 virtual void setRunnable(ExecStream &stream, bool);
00182 virtual void makeRunnable(ExecStream &stream);
00183 virtual void abort(ExecStreamGraph &graph);
00184 virtual void checkAbort() const;
00185 virtual void stop();
00186 virtual ExecStreamBufAccessor &readStream(ExecStream &stream);
00187 virtual void createBufferProvisionAdapter(
00188 ExecStreamEmbryo &embryo);
00189 virtual uint getDegreeOfParallelism();
00190 };
00191
00192 FENNEL_END_NAMESPACE
00193
00194 #endif
00195
00196