DfsTreeExecStreamScheduler.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/DfsTreeExecStreamScheduler.h"
00026 #include "fennel/exec/ExecStreamGraphImpl.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamBufAccessor.h"
00029 #include "fennel/common/AbortExcn.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $");
00032 
00033 DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler(
00034     SharedTraceTarget pTraceTargetInit,
00035     std::string nameInit)
00036     : TraceSource(pTraceTargetInit, nameInit),
00037       ExecStreamScheduler(pTraceTargetInit, nameInit)
00038 {
00039 }
00040 
00041 DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler()
00042 {
00043 }
00044 
00045 void DfsTreeExecStreamScheduler::addGraph(SharedExecStreamGraph pGraphInit)
00046 {
00047     assert(!pGraph);
00048 
00049     ExecStreamScheduler::addGraph(pGraphInit);
00050     pGraph = pGraphInit;
00051 }
00052 
00053 void DfsTreeExecStreamScheduler::removeGraph(SharedExecStreamGraph pGraphInit)
00054 {
00055     assert(pGraph == pGraphInit);
00056 
00057     pGraph.reset();
00058     ExecStreamScheduler::removeGraph(pGraphInit);
00059 }
00060 
00061 void DfsTreeExecStreamScheduler::start()
00062 {
00063     FENNEL_TRACE(TRACE_FINE,"start");
00064 
00065     // TODO jvs 2-Jan-2006:  rename this class now that it's no longer
00066     // restricted to trees; come up with something more generic in case
00067     // DFS becomes irrelevant also.
00068 
00069     // note: we no longer check that graph is a tree (or forest of trees)
00070     // since it is now possible to have multiple consumers from a single
00071     // producer
00072     assert(pGraph->isAcyclic());
00073     aborted = false;
00074 }
00075 
00076 void DfsTreeExecStreamScheduler::setRunnable(ExecStream &, bool)
00077 {
00078     permAssert(false);
00079 }
00080 
00081 void DfsTreeExecStreamScheduler::abort(ExecStreamGraph &)
00082 {
00083     FENNEL_TRACE(TRACE_FINE,"abort requested");
00084 
00085     aborted = true;
00086 }
00087 
00088 void DfsTreeExecStreamScheduler::checkAbort() const
00089 {
00090     if (aborted) {
00091         FENNEL_TRACE(TRACE_FINE,"abort detected");
00092         throw AbortExcn();
00093     }
00094 }
00095 
00096 void DfsTreeExecStreamScheduler::stop()
00097 {
00098     FENNEL_TRACE(TRACE_FINE,"stop");
00099 
00100     // nothing to do
00101     aborted = false;
00102 }
00103 
00104 ExecStreamBufAccessor &DfsTreeExecStreamScheduler::readStream(
00105     ExecStream &stream)
00106 {
00107     FENNEL_TRACE(
00108         TRACE_FINE,
00109         "entering readStream " << stream.getName());
00110 
00111     ExecStreamId current = stream.getStreamId();
00112     ExecStreamQuantum quantum;
00113 
00114     ExecStreamGraphImpl &graphImpl =
00115         dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00116     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00117 
00118     // assert that we're reading from a designated output stream
00119     assert(boost::out_degree(current,graphRep) == 1);
00120     assert(!graphImpl.getStreamFromVertex(
00121                boost::target(
00122                    *(boost::out_edges(current,graphRep).first),
00123                    graphRep)));
00124 
00125     // TODO:  assertions about accessor state/provision
00126 
00127     for (;;) {
00128         ExecStreamGraphImpl::InEdgeIterPair inEdges =
00129             boost::in_edges(current,graphRep);
00130         for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00131             ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00132             ExecStreamBufAccessor &bufAccessor =
00133                 graphImpl.getBufAccessorFromEdge(edge);
00134             if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00135                 // move current upstream
00136                 current = boost::source(edge,graphRep);
00137                 break;
00138             }
00139         }
00140         if (inEdges.first != inEdges.second) {
00141             // hit EXECBUF_UNDERFLOW
00142             continue;
00143         }
00144 
00145         SharedExecStream pStream = graphImpl.getStreamFromVertex(current);
00146         ExecStreamResult rc = executeStream(*pStream, quantum);
00147 
00148         checkAbort();
00149 
00150         ExecStreamGraphImpl::Edge edge;
00151 
00152         switch (rc) {
00153         case EXECRC_EOS:
00154             // find a consumer that is not in EOS state
00155             if (!findNextConsumer(
00156                 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS))
00157             {
00158                 return graphImpl.getBufAccessorFromEdge(edge);
00159             }
00160             // if all were in eos, just use the last consumer
00161             break;
00162         case EXECRC_BUF_OVERFLOW:
00163             // find a consumer that is not in underflow state; i.e., not
00164             // waiting on this producer to continue execution
00165             if (!findNextConsumer(
00166                 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW))
00167             {
00168                 return graphImpl.getBufAccessorFromEdge(edge);
00169             }
00170             break;
00171         case EXECRC_BUF_UNDERFLOW:
00172             // TODO:  assert that at least one input is in state
00173             // EXECBUF_UNDERFLOW
00174             break;
00175         case EXECRC_QUANTUM_EXPIRED:
00176             break;
00177         default:
00178             permAssert(false);
00179         }
00180     }
00181 }
00182 
00183 bool DfsTreeExecStreamScheduler::findNextConsumer(
00184     ExecStreamGraphImpl &graphImpl,
00185     const ExecStreamGraphImpl::GraphRep &graphRep,
00186     const ExecStream &stream,
00187     ExecStreamGraphImpl::Edge &edge,
00188     ExecStreamId &current,
00189     ExecStreamBufState skipState)
00190 {
00191     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00192         boost::out_edges(current,graphRep);
00193 
00194     bool emptyFound = false;
00195     // dummy initializations to avoid compiler error
00196     ExecStreamGraphImpl::Edge emptyEdge = edge;
00197     ExecStreamId emptyStreamId = current;
00198 
00199     for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00200         edge = *(outEdges.first);
00201         current = boost::target(edge,graphRep);
00202         if (boost::out_degree(current,graphRep) == 0) {
00203             // we've hit the output sentinel
00204             assert(!graphImpl.getStreamFromVertex(current));
00205             FENNEL_TRACE(
00206                 TRACE_FINE,
00207                 "leaving readStream " << stream.getName());
00208             return false;
00209         }
00210 
00211         ExecStreamBufAccessor &bufAccessor =
00212             graphImpl.getBufAccessorFromEdge(edge);
00213 
00214         // Save the first edge with an empty state that we find, but don't
00215         // return that as the next consumer.  We want to give priority to
00216         // streams that have explicity requested data.  So, only return the
00217         // empty edge consumer if there are no consumers that have explicitly
00218         // requested data.
00219         if (bufAccessor.getState() == EXECBUF_EMPTY) {
00220             if (!emptyFound) {
00221                 emptyFound = true;
00222                 emptyEdge = edge;
00223                 emptyStreamId = current;
00224             }
00225             continue;
00226         }
00227 
00228         if (bufAccessor.getState() != skipState) {
00229             break;
00230         }
00231         assert(!(skipState == EXECBUF_UNDERFLOW &&
00232                     bufAccessor.getState() == EXECBUF_EOS));
00233     }
00234 
00235     if (outEdges.first == outEdges.second && emptyFound) {
00236         edge = emptyEdge;
00237         current = emptyStreamId;
00238     } else {
00239         assert(!(skipState == EXECBUF_UNDERFLOW &&
00240                     outEdges.first == outEdges.second));
00241     }
00242 
00243     return true;
00244 }
00245 
00246 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $");
00247 
00248 // End DfsTreeExecStreamScheduler.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1