ParallelExecStreamScheduler.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2008-2009 The Eigenbase Project
00005 // Copyright (C) 2008-2009 SQLstream, Inc.
00006 // Copyright (C) 2008-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/ParallelExecStreamScheduler.h"
00025 #include "fennel/exec/ExecStreamGraphImpl.h"
00026 #include "fennel/exec/ExecStream.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/DoubleBufferExecStream.h"
00029 #include "fennel/exec/ExecStreamEmbryo.h"
00030 #include "fennel/common/AbortExcn.h"
00031 #include "fennel/synch/ThreadTracker.h"
00032 
00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $");
00034 
00035 // TODO jvs 5-Jul-2008:  more tracing
00036 
00037 ParallelExecStreamScheduler::ParallelExecStreamScheduler(
00038     SharedTraceTarget pTraceTarget,
00039     std::string name,
00040     ThreadTracker &threadTrackerInit,
00041     uint degreeOfParallelismInit)
00042     : TraceSource(pTraceTarget, name),
00043       ExecStreamScheduler(pTraceTarget, name),
00044       threadTracker(threadTrackerInit)
00045 {
00046     degreeOfParallelism = degreeOfParallelismInit;
00047     assert(degreeOfParallelism > 0);
00048     threadPool.setThreadTracker(threadTracker);
00049     mgrState = MGR_STOPPED;
00050 }
00051 
00052 ParallelExecStreamScheduler::~ParallelExecStreamScheduler()
00053 {
00054 }
00055 
00056 inline void ParallelExecStreamScheduler::alterNeighborInhibition(
00057     ExecStreamId streamId, int delta)
00058 {
00059     ExecStreamGraphImpl &graphImpl =
00060         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00061     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00062     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00063         boost::out_edges(streamId, graphRep);
00064     for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00065         ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00066         ExecStreamId consumer = boost::target(edge, graphRep);
00067         streamStateMap[consumer].inhibitionCount += delta;
00068         assert(streamStateMap[consumer].inhibitionCount >= 0);
00069     }
00070     ExecStreamGraphImpl::InEdgeIterPair inEdges =
00071         boost::in_edges(streamId, graphRep);
00072     for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00073         ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00074         ExecStreamId producer = boost::source(edge, graphRep);
00075         streamStateMap[producer].inhibitionCount += delta;
00076         assert(streamStateMap[producer].inhibitionCount >= 0);
00077     }
00078 }
00079 
00080 inline bool ParallelExecStreamScheduler::isInhibited(ExecStreamId streamId)
00081 {
00082     return streamStateMap[streamId].inhibitionCount > 0;
00083 }
00084 
00085 void ParallelExecStreamScheduler::addGraph(
00086     SharedExecStreamGraph pGraphInit)
00087 {
00088     assert(!pGraph);
00089 
00090     ExecStreamScheduler::addGraph(pGraphInit);
00091     pGraph = pGraphInit;
00092 }
00093 
00094 void ParallelExecStreamScheduler::removeGraph(
00095     SharedExecStreamGraph pGraphInit)
00096 {
00097     assert(pGraph == pGraphInit);
00098 
00099     pGraph.reset();
00100     ExecStreamScheduler::removeGraph(pGraphInit);
00101 }
00102 
00103 void ParallelExecStreamScheduler::start()
00104 {
00105     FENNEL_TRACE(TRACE_FINE,"start");
00106     assert(pGraph->isAcyclic());
00107     pPendingExcn.reset();
00108 
00109     ExecStreamGraphImpl &graphImpl =
00110         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00111     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00112     ExecStreamGraphImpl::VertexIterPair vertices = boost::vertices(graphRep);
00113     while (vertices.first != vertices.second) {
00114         ExecStreamId streamId = *vertices.first;
00115         streamStateMap[streamId].state = SS_SLEEPING;
00116         streamStateMap[streamId].inhibitionCount = 0;
00117         ++vertices.first;
00118     }
00119 
00120     vertices = boost::vertices(graphRep);
00121     while (vertices.first != vertices.second) {
00122         ExecStreamId streamId = *vertices.first;
00123         if (!graphImpl.getStreamFromVertex(streamId)) {
00124             // initially inhibit producers until first call to readStream
00125             alterNeighborInhibition(streamId, + 1);
00126             ExecStreamGraphImpl::InEdgeIterPair inEdges =
00127                 boost::in_edges(streamId, graphRep);
00128             for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00129                 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00130                 ExecStreamBufAccessor &bufAccessor =
00131                     graphImpl.getBufAccessorFromEdge(edge);
00132                 bufAccessor.requestProduction();
00133             }
00134         }
00135         ++vertices.first;
00136     }
00137 
00138     // +1 for the manager task, which will tie up one thread
00139     threadPool.start(degreeOfParallelism + 1);
00140 
00141     // kick off the manager task
00142     ParallelExecTask managerTask(*this, NULL);
00143     mgrState = MGR_RUNNING;
00144     threadPool.submitTask(managerTask);
00145 }
00146 
00147 void ParallelExecStreamScheduler::setRunnable(ExecStream &stream, bool runnable)
00148 {
00149     permAssert(false);
00150 }
00151 
00152 void ParallelExecStreamScheduler::makeRunnable(ExecStream &stream)
00153 {
00154     permAssert(false);
00155 }
00156 
00157 void ParallelExecStreamScheduler::abort(ExecStreamGraph &graph)
00158 {
00159     StrictMutexGuard mutexGuard(mutex);
00160     FENNEL_TRACE(TRACE_FINE,"abort requested");
00161 
00162     if (!pPendingExcn) {
00163         pPendingExcn.reset(new AbortExcn());
00164     }
00165     condition.notify_one();
00166 }
00167 
00168 void ParallelExecStreamScheduler::checkAbort() const
00169 {
00170     if (pPendingExcn) {
00171         throw AbortExcn();
00172     }
00173 }
00174 
00175 void ParallelExecStreamScheduler::stop()
00176 {
00177     FENNEL_TRACE(TRACE_FINE,"stop");
00178 
00179     StrictMutexGuard mutexGuard(mutex);
00180     if (mgrState != MGR_STOPPED) {
00181         mgrState = MGR_STOPPING;
00182         condition.notify_one();
00183         while (mgrState != MGR_STOPPED) {
00184             sentinelCondition.wait(mutexGuard);
00185         }
00186     }
00187     mutexGuard.unlock();
00188 
00189     threadPool.stop();
00190 
00191     // NOTE jvs 10-Aug-2008:  This is how we keep the cloned excn
00192     // from becoming a memory leak.  It assumes that the caller
00193     // doesn't invoke pScheduler->stop() until *after* the exception
00194     // has been completely handled and is no longer referenced.
00195     pPendingExcn.reset();
00196 
00197     completedQueue.clear();
00198     inhibitedQueue.clear();
00199 }
00200 
00201 void ParallelExecStreamScheduler::createBufferProvisionAdapter(
00202     ExecStreamEmbryo &embryo)
00203 {
00204     // use double buffering so that producers and consumers can run
00205     // in parallel
00206     DoubleBufferExecStreamParams adapterParams;
00207     embryo.init(
00208         new DoubleBufferExecStream(),
00209         adapterParams);
00210 }
00211 
00212 void ParallelExecStreamScheduler::executeManager()
00213 {
00214     // TODO jvs 16-Aug-2008:  RAII
00215     try {
00216         tryExecuteManager();
00217     } catch (...) {
00218         StrictMutexGuard mutexGuard(mutex);
00219         mgrState = MGR_STOPPED;
00220         sentinelCondition.notify_all();
00221         throw;
00222     }
00223     StrictMutexGuard mutexGuard(mutex);
00224     mgrState = MGR_STOPPED;
00225     sentinelCondition.notify_all();
00226 }
00227 
00228 void ParallelExecStreamScheduler::tryExecuteManager()
00229 {
00230     FENNEL_TRACE(TRACE_FINE,"manager task starting");
00231     for (;;) {
00232         StrictMutexGuard mutexGuard(mutex);
00233         while (completedQueue.empty() && (mgrState == MGR_RUNNING)
00234             && !pPendingExcn)
00235         {
00236             condition.wait(mutexGuard);
00237         }
00238         if (pPendingExcn) {
00239             return;
00240         }
00241         if (mgrState != MGR_RUNNING) {
00242             return;
00243         }
00244         while (!completedQueue.empty()) {
00245             ParallelExecResult result = completedQueue.front();
00246             completedQueue.pop_front();
00247             // don't hold lock while doing expensive state maintenance
00248             mutexGuard.unlock();
00249             processCompletedTask(result);
00250             if (pPendingExcn) {
00251                 return;
00252             }
00253             mutexGuard.lock();
00254         }
00255     }
00256 }
00257 
00258 ExecStreamBufAccessor &ParallelExecStreamScheduler::readStream(
00259     ExecStream &stream)
00260 {
00261     FENNEL_TRACE(
00262         TRACE_FINE,
00263         "entering readStream " << stream.getName());
00264 
00265     ExecStreamId current = stream.getStreamId();
00266     ExecStreamGraphImpl &graphImpl =
00267         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00268     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00269 
00270     // assert that we're reading from a designated output stream
00271     assert(boost::out_degree(current,graphRep) == 1);
00272     ExecStreamGraphImpl::Edge edge =
00273         *(boost::out_edges(current,graphRep).first);
00274     ExecStreamBufAccessor &bufAccessor = graphImpl.getBufAccessorFromEdge(edge);
00275     current = boost::target(edge, graphRep);
00276     assert(!graphImpl.getStreamFromVertex(current));
00277 
00278     if (bufAccessor.getState() == EXECBUF_EMPTY) {
00279         bufAccessor.requestProduction();
00280     } else if (bufAccessor.getState() != EXECBUF_UNDERFLOW) {
00281         // data or EOS already available
00282         return bufAccessor;
00283     }
00284 
00285     // please sir, I'd like some more
00286     ParallelExecResult result(current, EXECRC_BUF_UNDERFLOW);
00287     StrictMutexGuard mutexGuard(mutex);
00288     streamStateMap[current].state = SS_SLEEPING;
00289     completedQueue.push_back(result);
00290     condition.notify_one();
00291 
00292     while ((streamStateMap[current].state == SS_SLEEPING) && !pPendingExcn) {
00293         sentinelCondition.wait(mutexGuard);
00294     }
00295 
00296     if (pPendingExcn) {
00297         pPendingExcn->throwSelf();
00298     }
00299 
00300     return bufAccessor;
00301 }
00302 
00303 void ParallelExecStreamScheduler::processCompletedTask(
00304     ParallelExecResult const &result)
00305 {
00306     ExecStreamId current = result.getStreamId();
00307     ExecStreamGraphImpl &graphImpl =
00308         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00309     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00310 
00311     streamStateMap[current].state = SS_SLEEPING;
00312     alterNeighborInhibition(current, -1);
00313 
00314     switch (result.getResultCode()) {
00315     case EXECRC_EOS:
00316     case EXECRC_BUF_OVERFLOW:
00317     case EXECRC_BUF_UNDERFLOW:
00318         {
00319             ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00320                 boost::out_edges(current, graphRep);
00321             for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00322                 ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00323                 ExecStreamBufAccessor &bufAccessor =
00324                     graphImpl.getBufAccessorFromEdge(edge);
00325                 if (bufAccessor.getState() != EXECBUF_UNDERFLOW) {
00326                     ExecStreamId consumer = boost::target(edge, graphRep);
00327                     bool sentinel = addToQueue(consumer);
00328                     if (sentinel) {
00329                         if (bufAccessor.getState() != EXECBUF_EMPTY) {
00330                             signalSentinel(consumer);
00331                         }
00332                     }
00333                 }
00334             }
00335             ExecStreamGraphImpl::InEdgeIterPair inEdges =
00336                 boost::in_edges(current, graphRep);
00337             bool sawUnderflow = false;
00338             for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00339                 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00340                 ExecStreamBufAccessor &bufAccessor =
00341                     graphImpl.getBufAccessorFromEdge(edge);
00342                 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00343                     ExecStreamId producer = boost::source(edge, graphRep);
00344                     addToQueue(producer);
00345                     sawUnderflow = true;
00346                 }
00347             }
00348             if (!sawUnderflow &&
00349                 (result.getResultCode() == EXECRC_BUF_UNDERFLOW))
00350             {
00351                 // sometimes, a stream may return underflow, even though none
00352                 // of its inputs are in underflow state; instead, some are in
00353                 // EOS; we interpret this as a yield request to help keep
00354                 // the stream's state machine logic simpler
00355                 addToQueue(current);
00356             }
00357         }
00358         break;
00359     case EXECRC_QUANTUM_EXPIRED:
00360         addToQueue(current);
00361         break;
00362     default:
00363         permAssert(false);
00364     }
00365 
00366     // REVIEW jvs 4-Jul-2008:  only reschedule inhibited neighbors?
00367     retryInhibitedQueue();
00368 }
00369 
00370 void ParallelExecStreamScheduler::signalSentinel(ExecStreamId sentinelId)
00371 {
00372     alterNeighborInhibition(sentinelId, + 1);
00373 
00374     StrictMutexGuard mutexGuard(mutex);
00375     streamStateMap[sentinelId].state = SS_RUNNING;
00376     sentinelCondition.notify_all();
00377 }
00378 
00379 void ParallelExecStreamScheduler::executeTask(ExecStream &stream)
00380 {
00381     try {
00382         tryExecuteTask(stream);
00383     } catch (std::exception &ex) {
00384         StrictMutexGuard mutexGuard(mutex);
00385         if (!pPendingExcn) {
00386             pPendingExcn.reset(threadTracker.cloneExcn(ex));
00387         }
00388         condition.notify_one();
00389     } catch (...) {
00390         // REVIEW jvs 22-Jul-2008:  panic instead?
00391         StrictMutexGuard mutexGuard(mutex);
00392         if (!pPendingExcn) {
00393             pPendingExcn.reset(new FennelExcn("Unknown error"));
00394         }
00395         condition.notify_one();
00396     }
00397 }
00398 
00399 void ParallelExecStreamScheduler::tryExecuteTask(ExecStream &stream)
00400 {
00401     ExecStreamQuantum quantum;
00402     ExecStreamResult rc = executeStream(stream, quantum);
00403     ParallelExecResult result(stream.getStreamId(), rc);
00404 
00405     StrictMutexGuard mutexGuard(mutex);
00406     completedQueue.push_back(result);
00407     condition.notify_one();
00408 }
00409 
00410 void ParallelExecStreamScheduler::retryInhibitedQueue()
00411 {
00412     // addToQueue may bounce some back to inhibitedQueue,
00413     // so process via transitQueue
00414     transitQueue = inhibitedQueue;
00415     inhibitedQueue.clear();
00416     while (!transitQueue.empty()) {
00417         ExecStreamId inhibitedStreamId = transitQueue.front();
00418         transitQueue.pop_front();
00419         streamStateMap[inhibitedStreamId].state = SS_SLEEPING;
00420         addToQueue(inhibitedStreamId);
00421     }
00422 }
00423 
00424 bool ParallelExecStreamScheduler::addToQueue(ExecStreamId streamId)
00425 {
00426     if (pPendingExcn) {
00427         return false;
00428     }
00429     switch (streamStateMap[streamId].state) {
00430     case SS_SLEEPING:
00431         {
00432             ExecStreamGraphImpl &graphImpl =
00433                 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00434             SharedExecStream pStream = graphImpl.getStreamFromVertex(streamId);
00435             if (!pStream) {
00436                 // sentinel node
00437                 return true;
00438             }
00439             if (isInhibited(streamId)) {
00440                 streamStateMap[streamId].state = SS_INHIBITED;
00441                 inhibitedQueue.push_back(streamId);
00442             } else {
00443                 streamStateMap[streamId].state = SS_RUNNING;
00444                 alterNeighborInhibition(streamId, + 1);
00445                 ParallelExecTask task(*this, pStream.get());
00446                 threadPool.submitTask(task);
00447             }
00448         }
00449         break;
00450     case SS_INHIBITED:
00451     case SS_RUNNING:
00452         // ignore request
00453         break;
00454     default:
00455         permAssert(false);
00456     }
00457     return false;
00458 }
00459 
00460 ParallelExecTask::ParallelExecTask(
00461     ParallelExecStreamScheduler &schedulerInit,
00462     ExecStream *pStreamInit)
00463     : scheduler(schedulerInit)
00464 {
00465     pStream = pStreamInit;
00466 }
00467 
00468 void ParallelExecTask::execute()
00469 {
00470     if (pStream) {
00471         scheduler.executeTask(*pStream);
00472     } else {
00473         scheduler.executeManager();
00474     }
00475 }
00476 
00477 ParallelExecResult::ParallelExecResult(
00478     ExecStreamId streamIdInit,
00479     ExecStreamResult rcInit)
00480 {
00481     streamId = streamIdInit;
00482     rc = rcInit;
00483 }
00484 
00485 uint ParallelExecStreamScheduler::getDegreeOfParallelism()
00486 {
00487     return degreeOfParallelism;
00488 }
00489 
00490 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $");
00491 
00492 // End ParallelExecStreamScheduler.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1