00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/ParallelExecStreamScheduler.h"
00025 #include "fennel/exec/ExecStreamGraphImpl.h"
00026 #include "fennel/exec/ExecStream.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/DoubleBufferExecStream.h"
00029 #include "fennel/exec/ExecStreamEmbryo.h"
00030 #include "fennel/common/AbortExcn.h"
00031 #include "fennel/synch/ThreadTracker.h"
00032
00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $");
00034
00035
00036
00037 ParallelExecStreamScheduler::ParallelExecStreamScheduler(
00038 SharedTraceTarget pTraceTarget,
00039 std::string name,
00040 ThreadTracker &threadTrackerInit,
00041 uint degreeOfParallelismInit)
00042 : TraceSource(pTraceTarget, name),
00043 ExecStreamScheduler(pTraceTarget, name),
00044 threadTracker(threadTrackerInit)
00045 {
00046 degreeOfParallelism = degreeOfParallelismInit;
00047 assert(degreeOfParallelism > 0);
00048 threadPool.setThreadTracker(threadTracker);
00049 mgrState = MGR_STOPPED;
00050 }
00051
00052 ParallelExecStreamScheduler::~ParallelExecStreamScheduler()
00053 {
00054 }
00055
00056 inline void ParallelExecStreamScheduler::alterNeighborInhibition(
00057 ExecStreamId streamId, int delta)
00058 {
00059 ExecStreamGraphImpl &graphImpl =
00060 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00061 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00062 ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00063 boost::out_edges(streamId, graphRep);
00064 for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00065 ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00066 ExecStreamId consumer = boost::target(edge, graphRep);
00067 streamStateMap[consumer].inhibitionCount += delta;
00068 assert(streamStateMap[consumer].inhibitionCount >= 0);
00069 }
00070 ExecStreamGraphImpl::InEdgeIterPair inEdges =
00071 boost::in_edges(streamId, graphRep);
00072 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00073 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00074 ExecStreamId producer = boost::source(edge, graphRep);
00075 streamStateMap[producer].inhibitionCount += delta;
00076 assert(streamStateMap[producer].inhibitionCount >= 0);
00077 }
00078 }
00079
00080 inline bool ParallelExecStreamScheduler::isInhibited(ExecStreamId streamId)
00081 {
00082 return streamStateMap[streamId].inhibitionCount > 0;
00083 }
00084
00085 void ParallelExecStreamScheduler::addGraph(
00086 SharedExecStreamGraph pGraphInit)
00087 {
00088 assert(!pGraph);
00089
00090 ExecStreamScheduler::addGraph(pGraphInit);
00091 pGraph = pGraphInit;
00092 }
00093
00094 void ParallelExecStreamScheduler::removeGraph(
00095 SharedExecStreamGraph pGraphInit)
00096 {
00097 assert(pGraph == pGraphInit);
00098
00099 pGraph.reset();
00100 ExecStreamScheduler::removeGraph(pGraphInit);
00101 }
00102
00103 void ParallelExecStreamScheduler::start()
00104 {
00105 FENNEL_TRACE(TRACE_FINE,"start");
00106 assert(pGraph->isAcyclic());
00107 pPendingExcn.reset();
00108
00109 ExecStreamGraphImpl &graphImpl =
00110 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00111 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00112 ExecStreamGraphImpl::VertexIterPair vertices = boost::vertices(graphRep);
00113 while (vertices.first != vertices.second) {
00114 ExecStreamId streamId = *vertices.first;
00115 streamStateMap[streamId].state = SS_SLEEPING;
00116 streamStateMap[streamId].inhibitionCount = 0;
00117 ++vertices.first;
00118 }
00119
00120 vertices = boost::vertices(graphRep);
00121 while (vertices.first != vertices.second) {
00122 ExecStreamId streamId = *vertices.first;
00123 if (!graphImpl.getStreamFromVertex(streamId)) {
00124
00125 alterNeighborInhibition(streamId, + 1);
00126 ExecStreamGraphImpl::InEdgeIterPair inEdges =
00127 boost::in_edges(streamId, graphRep);
00128 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00129 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00130 ExecStreamBufAccessor &bufAccessor =
00131 graphImpl.getBufAccessorFromEdge(edge);
00132 bufAccessor.requestProduction();
00133 }
00134 }
00135 ++vertices.first;
00136 }
00137
00138
00139 threadPool.start(degreeOfParallelism + 1);
00140
00141
00142 ParallelExecTask managerTask(*this, NULL);
00143 mgrState = MGR_RUNNING;
00144 threadPool.submitTask(managerTask);
00145 }
00146
00147 void ParallelExecStreamScheduler::setRunnable(ExecStream &stream, bool runnable)
00148 {
00149 permAssert(false);
00150 }
00151
00152 void ParallelExecStreamScheduler::makeRunnable(ExecStream &stream)
00153 {
00154 permAssert(false);
00155 }
00156
00157 void ParallelExecStreamScheduler::abort(ExecStreamGraph &graph)
00158 {
00159 StrictMutexGuard mutexGuard(mutex);
00160 FENNEL_TRACE(TRACE_FINE,"abort requested");
00161
00162 if (!pPendingExcn) {
00163 pPendingExcn.reset(new AbortExcn());
00164 }
00165 condition.notify_one();
00166 }
00167
00168 void ParallelExecStreamScheduler::checkAbort() const
00169 {
00170 if (pPendingExcn) {
00171 throw AbortExcn();
00172 }
00173 }
00174
00175 void ParallelExecStreamScheduler::stop()
00176 {
00177 FENNEL_TRACE(TRACE_FINE,"stop");
00178
00179 StrictMutexGuard mutexGuard(mutex);
00180 if (mgrState != MGR_STOPPED) {
00181 mgrState = MGR_STOPPING;
00182 condition.notify_one();
00183 while (mgrState != MGR_STOPPED) {
00184 sentinelCondition.wait(mutexGuard);
00185 }
00186 }
00187 mutexGuard.unlock();
00188
00189 threadPool.stop();
00190
00191
00192
00193
00194
00195 pPendingExcn.reset();
00196
00197 completedQueue.clear();
00198 inhibitedQueue.clear();
00199 }
00200
00201 void ParallelExecStreamScheduler::createBufferProvisionAdapter(
00202 ExecStreamEmbryo &embryo)
00203 {
00204
00205
00206 DoubleBufferExecStreamParams adapterParams;
00207 embryo.init(
00208 new DoubleBufferExecStream(),
00209 adapterParams);
00210 }
00211
00212 void ParallelExecStreamScheduler::executeManager()
00213 {
00214
00215 try {
00216 tryExecuteManager();
00217 } catch (...) {
00218 StrictMutexGuard mutexGuard(mutex);
00219 mgrState = MGR_STOPPED;
00220 sentinelCondition.notify_all();
00221 throw;
00222 }
00223 StrictMutexGuard mutexGuard(mutex);
00224 mgrState = MGR_STOPPED;
00225 sentinelCondition.notify_all();
00226 }
00227
00228 void ParallelExecStreamScheduler::tryExecuteManager()
00229 {
00230 FENNEL_TRACE(TRACE_FINE,"manager task starting");
00231 for (;;) {
00232 StrictMutexGuard mutexGuard(mutex);
00233 while (completedQueue.empty() && (mgrState == MGR_RUNNING)
00234 && !pPendingExcn)
00235 {
00236 condition.wait(mutexGuard);
00237 }
00238 if (pPendingExcn) {
00239 return;
00240 }
00241 if (mgrState != MGR_RUNNING) {
00242 return;
00243 }
00244 while (!completedQueue.empty()) {
00245 ParallelExecResult result = completedQueue.front();
00246 completedQueue.pop_front();
00247
00248 mutexGuard.unlock();
00249 processCompletedTask(result);
00250 if (pPendingExcn) {
00251 return;
00252 }
00253 mutexGuard.lock();
00254 }
00255 }
00256 }
00257
00258 ExecStreamBufAccessor &ParallelExecStreamScheduler::readStream(
00259 ExecStream &stream)
00260 {
00261 FENNEL_TRACE(
00262 TRACE_FINE,
00263 "entering readStream " << stream.getName());
00264
00265 ExecStreamId current = stream.getStreamId();
00266 ExecStreamGraphImpl &graphImpl =
00267 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00268 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00269
00270
00271 assert(boost::out_degree(current,graphRep) == 1);
00272 ExecStreamGraphImpl::Edge edge =
00273 *(boost::out_edges(current,graphRep).first);
00274 ExecStreamBufAccessor &bufAccessor = graphImpl.getBufAccessorFromEdge(edge);
00275 current = boost::target(edge, graphRep);
00276 assert(!graphImpl.getStreamFromVertex(current));
00277
00278 if (bufAccessor.getState() == EXECBUF_EMPTY) {
00279 bufAccessor.requestProduction();
00280 } else if (bufAccessor.getState() != EXECBUF_UNDERFLOW) {
00281
00282 return bufAccessor;
00283 }
00284
00285
00286 ParallelExecResult result(current, EXECRC_BUF_UNDERFLOW);
00287 StrictMutexGuard mutexGuard(mutex);
00288 streamStateMap[current].state = SS_SLEEPING;
00289 completedQueue.push_back(result);
00290 condition.notify_one();
00291
00292 while ((streamStateMap[current].state == SS_SLEEPING) && !pPendingExcn) {
00293 sentinelCondition.wait(mutexGuard);
00294 }
00295
00296 if (pPendingExcn) {
00297 pPendingExcn->throwSelf();
00298 }
00299
00300 return bufAccessor;
00301 }
00302
00303 void ParallelExecStreamScheduler::processCompletedTask(
00304 ParallelExecResult const &result)
00305 {
00306 ExecStreamId current = result.getStreamId();
00307 ExecStreamGraphImpl &graphImpl =
00308 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00309 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00310
00311 streamStateMap[current].state = SS_SLEEPING;
00312 alterNeighborInhibition(current, -1);
00313
00314 switch (result.getResultCode()) {
00315 case EXECRC_EOS:
00316 case EXECRC_BUF_OVERFLOW:
00317 case EXECRC_BUF_UNDERFLOW:
00318 {
00319 ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00320 boost::out_edges(current, graphRep);
00321 for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00322 ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00323 ExecStreamBufAccessor &bufAccessor =
00324 graphImpl.getBufAccessorFromEdge(edge);
00325 if (bufAccessor.getState() != EXECBUF_UNDERFLOW) {
00326 ExecStreamId consumer = boost::target(edge, graphRep);
00327 bool sentinel = addToQueue(consumer);
00328 if (sentinel) {
00329 if (bufAccessor.getState() != EXECBUF_EMPTY) {
00330 signalSentinel(consumer);
00331 }
00332 }
00333 }
00334 }
00335 ExecStreamGraphImpl::InEdgeIterPair inEdges =
00336 boost::in_edges(current, graphRep);
00337 bool sawUnderflow = false;
00338 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00339 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00340 ExecStreamBufAccessor &bufAccessor =
00341 graphImpl.getBufAccessorFromEdge(edge);
00342 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00343 ExecStreamId producer = boost::source(edge, graphRep);
00344 addToQueue(producer);
00345 sawUnderflow = true;
00346 }
00347 }
00348 if (!sawUnderflow &&
00349 (result.getResultCode() == EXECRC_BUF_UNDERFLOW))
00350 {
00351
00352
00353
00354
00355 addToQueue(current);
00356 }
00357 }
00358 break;
00359 case EXECRC_QUANTUM_EXPIRED:
00360 addToQueue(current);
00361 break;
00362 default:
00363 permAssert(false);
00364 }
00365
00366
00367 retryInhibitedQueue();
00368 }
00369
00370 void ParallelExecStreamScheduler::signalSentinel(ExecStreamId sentinelId)
00371 {
00372 alterNeighborInhibition(sentinelId, + 1);
00373
00374 StrictMutexGuard mutexGuard(mutex);
00375 streamStateMap[sentinelId].state = SS_RUNNING;
00376 sentinelCondition.notify_all();
00377 }
00378
00379 void ParallelExecStreamScheduler::executeTask(ExecStream &stream)
00380 {
00381 try {
00382 tryExecuteTask(stream);
00383 } catch (std::exception &ex) {
00384 StrictMutexGuard mutexGuard(mutex);
00385 if (!pPendingExcn) {
00386 pPendingExcn.reset(threadTracker.cloneExcn(ex));
00387 }
00388 condition.notify_one();
00389 } catch (...) {
00390
00391 StrictMutexGuard mutexGuard(mutex);
00392 if (!pPendingExcn) {
00393 pPendingExcn.reset(new FennelExcn("Unknown error"));
00394 }
00395 condition.notify_one();
00396 }
00397 }
00398
00399 void ParallelExecStreamScheduler::tryExecuteTask(ExecStream &stream)
00400 {
00401 ExecStreamQuantum quantum;
00402 ExecStreamResult rc = executeStream(stream, quantum);
00403 ParallelExecResult result(stream.getStreamId(), rc);
00404
00405 StrictMutexGuard mutexGuard(mutex);
00406 completedQueue.push_back(result);
00407 condition.notify_one();
00408 }
00409
00410 void ParallelExecStreamScheduler::retryInhibitedQueue()
00411 {
00412
00413
00414 transitQueue = inhibitedQueue;
00415 inhibitedQueue.clear();
00416 while (!transitQueue.empty()) {
00417 ExecStreamId inhibitedStreamId = transitQueue.front();
00418 transitQueue.pop_front();
00419 streamStateMap[inhibitedStreamId].state = SS_SLEEPING;
00420 addToQueue(inhibitedStreamId);
00421 }
00422 }
00423
00424 bool ParallelExecStreamScheduler::addToQueue(ExecStreamId streamId)
00425 {
00426 if (pPendingExcn) {
00427 return false;
00428 }
00429 switch (streamStateMap[streamId].state) {
00430 case SS_SLEEPING:
00431 {
00432 ExecStreamGraphImpl &graphImpl =
00433 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00434 SharedExecStream pStream = graphImpl.getStreamFromVertex(streamId);
00435 if (!pStream) {
00436
00437 return true;
00438 }
00439 if (isInhibited(streamId)) {
00440 streamStateMap[streamId].state = SS_INHIBITED;
00441 inhibitedQueue.push_back(streamId);
00442 } else {
00443 streamStateMap[streamId].state = SS_RUNNING;
00444 alterNeighborInhibition(streamId, + 1);
00445 ParallelExecTask task(*this, pStream.get());
00446 threadPool.submitTask(task);
00447 }
00448 }
00449 break;
00450 case SS_INHIBITED:
00451 case SS_RUNNING:
00452
00453 break;
00454 default:
00455 permAssert(false);
00456 }
00457 return false;
00458 }
00459
00460 ParallelExecTask::ParallelExecTask(
00461 ParallelExecStreamScheduler &schedulerInit,
00462 ExecStream *pStreamInit)
00463 : scheduler(schedulerInit)
00464 {
00465 pStream = pStreamInit;
00466 }
00467
00468 void ParallelExecTask::execute()
00469 {
00470 if (pStream) {
00471 scheduler.executeTask(*pStream);
00472 } else {
00473 scheduler.executeManager();
00474 }
00475 }
00476
00477 ParallelExecResult::ParallelExecResult(
00478 ExecStreamId streamIdInit,
00479 ExecStreamResult rcInit)
00480 {
00481 streamId = streamIdInit;
00482 rc = rcInit;
00483 }
00484
00485 uint ParallelExecStreamScheduler::getDegreeOfParallelism()
00486 {
00487 return degreeOfParallelism;
00488 }
00489
00490 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $");
00491
00492