ParallelExecStreamScheduler.h

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.h#12 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2008-2009 The Eigenbase Project
00005 // Copyright (C) 2008-2009 SQLstream, Inc.
00006 // Copyright (C) 2008-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #ifndef Fennel_ParallelExecStreamScheduler_Included
00024 #define Fennel_ParallelExecStreamScheduler_Included
00025 
00026 #include "fennel/exec/ExecStreamScheduler.h"
00027 #include "fennel/exec/ExecStreamGraphImpl.h"
00028 #include "fennel/synch/ThreadPool.h"
00029 #include "fennel/synch/SynchMonitoredObject.h"
00030 #include "fennel/common/FennelExcn.h"
00031 
00032 #include <hash_map>
00033 #include <deque>
00034 #include <boost/scoped_ptr.hpp>
00035 
00036 FENNEL_BEGIN_NAMESPACE
00037 
00038 class ExecStreamGraphImpl;
00039 class ParallelExecStreamScheduler;
00040 class ThreadTracker;
00041 
00046 class FENNEL_EXEC_EXPORT ParallelExecTask
00047 {
00048     ParallelExecStreamScheduler &scheduler;
00049     ExecStream *pStream;
00050 
00051 public:
00052     explicit ParallelExecTask(
00053         ParallelExecStreamScheduler &scheduler,
00054         ExecStream *pStream);
00055 
00056     inline ExecStreamId getStreamId() const
00057     {
00058         return pStream->getStreamId();
00059     }
00060 
00061     void execute();
00062 };
00063 
00068 class FENNEL_EXEC_EXPORT ParallelExecResult
00069 {
00070     ExecStreamId streamId;
00071     ExecStreamResult rc;
00072 
00073 public:
00074     explicit ParallelExecResult(
00075         ExecStreamId streamId,
00076         ExecStreamResult rc);
00077 
00078     inline ExecStreamId getStreamId() const
00079     {
00080         return streamId;
00081     }
00082 
00083     inline ExecStreamResult getResultCode() const
00084     {
00085         return rc;
00086     }
00087 };
00088 
00098 class FENNEL_EXEC_EXPORT ParallelExecStreamScheduler
00099     : public ExecStreamScheduler, public SynchMonitoredObject
00100 {
00101     enum StreamState
00102     {
00103         SS_SLEEPING,
00104         SS_RUNNING,
00105         SS_INHIBITED
00106     };
00107 
00108     struct StreamStateMapEntry
00109     {
00110         StreamState state;
00111         int inhibitionCount;
00112     };
00113 
00114     enum ManagerState {
00115         MGR_RUNNING,
00116         MGR_STOPPING,
00117         MGR_STOPPED
00118     };
00119 
00120     typedef std::hash_map<ExecStreamId, StreamStateMapEntry>
00121         StreamStateMap;
00122     typedef std::deque<ExecStreamId> InhibitedQueue;
00123 
00124     friend class ParallelExecTask;
00125 
00126     SharedExecStreamGraph pGraph;
00127 
00128     ThreadPool<ParallelExecTask> threadPool;
00129     std::deque<ParallelExecResult> completedQueue;
00130 
00131     ThreadTracker &threadTracker;
00132 
00133     StreamStateMap streamStateMap;
00134     ManagerState mgrState;
00135 
00136     InhibitedQueue inhibitedQueue;
00137     InhibitedQueue transitQueue;
00138     LocalCondition sentinelCondition;
00139 
00140     uint degreeOfParallelism;
00141 
00142     boost::scoped_ptr<FennelExcn> pPendingExcn;
00143 
00144     void tryExecuteManager();
00145     void executeManager();
00146     void tryExecuteTask(ExecStream &);
00147     void executeTask(ExecStream &);
00148     bool addToQueue(ExecStreamId streamId);
00149     void signalSentinel(ExecStreamId sentinelId);
00150     void retryInhibitedQueue();
00151     void processCompletedTask(ParallelExecResult const &task);
00152     inline bool isInhibited(ExecStreamId streamId);
00153     inline void alterNeighborInhibition(ExecStreamId streamId, int delta);
00154 
00155 public:
00169     explicit ParallelExecStreamScheduler(
00170         SharedTraceTarget pTraceTarget,
00171         std::string name,
00172         ThreadTracker &threadTracker,
00173         uint degreeOfParallelism);
00174 
00175     virtual ~ParallelExecStreamScheduler();
00176 
00177     // implement the ExecStreamScheduler interface
00178     virtual void addGraph(SharedExecStreamGraph pGraph);
00179     virtual void removeGraph(SharedExecStreamGraph pGraph);
00180     virtual void start();
00181     virtual void setRunnable(ExecStream &stream, bool);
00182     virtual void makeRunnable(ExecStream &stream);
00183     virtual void abort(ExecStreamGraph &graph);
00184     virtual void checkAbort() const;
00185     virtual void stop();
00186     virtual ExecStreamBufAccessor &readStream(ExecStream &stream);
00187     virtual void createBufferProvisionAdapter(
00188         ExecStreamEmbryo &embryo);
00189     virtual uint getDegreeOfParallelism();
00190 };
00191 
00192 FENNEL_END_NAMESPACE
00193 
00194 #endif
00195 
00196 // End ParallelExecStreamScheduler.h

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1