ExecStreamScheduler.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamScheduler.h"
00026 #include "fennel/exec/ExecStreamGraphImpl.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/CopyExecStream.h"
00029 #include "fennel/exec/ScratchBufferExecStream.h"
00030 #include "fennel/exec/ExecStreamEmbryo.h"
00031 #include "fennel/tuple/TupleData.h"
00032 #include "fennel/tuple/TuplePrinter.h"
00033 
00034 #include <fstream>
00035 
00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $");
00037 
00038 ExecStreamScheduler::ExecStreamScheduler(
00039     SharedTraceTarget pTraceTargetInit,
00040     std::string nameInit)
00041     : TraceSource(pTraceTargetInit, nameInit)
00042 {
00043     tracingFine = isTracingLevel(TRACE_FINE);
00044 }
00045 
00046 ExecStreamScheduler::~ExecStreamScheduler()
00047 {
00048 }
00049 
00050 void ExecStreamScheduler::addGraph(SharedExecStreamGraph pGraph)
00051 {
00052     assert(!pGraph->pScheduler);
00053     pGraph->pScheduler = this;
00054 
00055     if (tracingFine) {
00056         std::string dotFileName;
00057         const char *fennelHome = getenv("FENNEL_HOME");
00058         if (fennelHome) {
00059             dotFileName += fennelHome;
00060             dotFileName += "/trace/";
00061         }
00062         dotFileName += "ExecStreamGraph.dot";
00063         std::ofstream dotStream(dotFileName.c_str());
00064         pGraph->renderGraphviz(dotStream);
00065     }
00066 
00067     // if any of the streams in the new graph require tracing, then
00068     // disable our tracing short-circuit
00069     std::vector<SharedExecStream> streams = pGraph->getSortedStreams();
00070     for (uint i = 0; i < streams.size(); ++i) {
00071         if (streams[i]->isTracingLevel(TRACE_FINE)) {
00072             tracingFine = true;
00073             return;
00074         }
00075     }
00076 }
00077 
00078 SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor()
00079 {
00080     return SharedExecStreamBufAccessor(new ExecStreamBufAccessor());
00081 }
00082 
00083 void ExecStreamScheduler::createBufferProvisionAdapter(
00084     ExecStreamEmbryo &embryo)
00085 {
00086     ScratchBufferExecStreamParams adapterParams;
00087     embryo.init(
00088         new ScratchBufferExecStream(),
00089         adapterParams);
00090 }
00091 
00092 void ExecStreamScheduler::createCopyProvisionAdapter(
00093     ExecStreamEmbryo &embryo)
00094 {
00095     CopyExecStreamParams adapterParams;
00096     embryo.init(
00097         new CopyExecStream(),
00098         adapterParams);
00099 }
00100 
00101 void ExecStreamScheduler::removeGraph(SharedExecStreamGraph pGraph)
00102 {
00103     assert(pGraph->pScheduler == this);
00104     pGraph->pScheduler = NULL;
00105 }
00106 
00107 // Summary of per-stream trace levels:
00108 // TRACE_FINE: result of execution
00109 // TRACE_FINER: buffer states before and after, output after execution.
00110 // TRACE_FINEST: both input and output before and after each execution
00111 
00112 void ExecStreamScheduler::tracePreExecution(
00113     ExecStream &stream,
00114     ExecStreamQuantum const &quantum)
00115 {
00116     FENNEL_TRACE(
00117         TRACE_FINE,
00118         "executing " << stream.getStreamId() << ' ' << stream.getName());
00119     if (!isMAXU(quantum.nTuplesMax)) {
00120         FENNEL_TRACE(
00121             TRACE_FINE,
00122             "nTuplesMax = " << quantum.nTuplesMax);
00123     }
00124 
00125     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST);
00126 }
00127 
00128 void ExecStreamScheduler::tracePostExecution(
00129     ExecStream &stream,
00130     ExecStreamResult rc)
00131 {
00132     FENNEL_TRACE(
00133         TRACE_FINE,
00134         "executed " << stream.getStreamId() << ' ' << stream.getName()
00135         << " with result " << ExecStreamResult_names[rc]);
00136 
00137     traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER);
00138 }
00139 
00140 void ExecStreamScheduler::traceStreamBuffers(
00141     ExecStream &stream,
00142     TraceLevel inputTupleTraceLevel,
00143     TraceLevel outputTupleTraceLevel)
00144 {
00145     ExecStreamGraphImpl &graphImpl =
00146         dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph());
00147     ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00148 
00149     ExecStreamGraphImpl::InEdgeIterPair inEdges =
00150         boost::in_edges(stream.getStreamId(),graphRep);
00151     for (uint i = 0; inEdges.first != inEdges.second;
00152          ++(inEdges.first),  ++i)
00153     {
00154         ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00155         ExecStreamBufAccessor &bufAccessor =
00156             graphImpl.getBufAccessorFromEdge(edge);
00157         FENNEL_TRACE(
00158             TRACE_FINER,
00159             "input buffer " << i << ":  "
00160             << ExecStreamBufState_names[bufAccessor.getState()]
00161             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00162             << ",  consumption available = "
00163             << bufAccessor.getConsumptionAvailable());
00164         if (stream.isTracingLevel(inputTupleTraceLevel)) {
00165             traceStreamBufferContents(
00166                 stream, bufAccessor, inputTupleTraceLevel);
00167         }
00168     }
00169 
00170     ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00171         boost::out_edges(stream.getStreamId(),graphRep);
00172     for (uint i = 0; outEdges.first != outEdges.second;
00173          ++(outEdges.first),  ++i) {
00174         ExecStreamGraphImpl::Edge edge = *(outEdges.first);
00175         ExecStreamBufAccessor &bufAccessor =
00176             graphImpl.getBufAccessorFromEdge(edge);
00177         FENNEL_TRACE(
00178             TRACE_FINER,
00179             "output buffer " << i << ":  "
00180             << ExecStreamBufState_names[bufAccessor.getState()]
00181             << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "")
00182             << ",  consumption available = "
00183             << bufAccessor.getConsumptionAvailable()
00184             << ",  production available = "
00185             << bufAccessor.getProductionAvailable());
00186         if (stream.isTracingLevel(outputTupleTraceLevel)) {
00187             traceStreamBufferContents(
00188                 stream, bufAccessor, outputTupleTraceLevel);
00189         }
00190     }
00191 }
00192 
00193 void ExecStreamScheduler::traceStreamBufferContents(
00194     ExecStream &stream,
00195     ExecStreamBufAccessor &bufAccessor,
00196     TraceLevel traceLevel)
00197 {
00198     TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc();
00199     TupleData tupleData(tupleDesc);
00200     TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor();
00201 
00202     for (PConstBuffer pTuple = bufAccessor.getConsumptionStart();
00203          pTuple != bufAccessor.getConsumptionEnd();
00204          pTuple += tupleAccessor.getCurrentByteCount())
00205     {
00206         tupleAccessor.setCurrentTupleBuf(pTuple);
00207         // while we're here, we might as well sanity-check the content
00208         assert(pTuple + tupleAccessor.getCurrentByteCount()
00209             <= bufAccessor.getConsumptionEnd());
00210         tupleAccessor.unmarshal(tupleData);
00211         // TODO:  sanity-check individual data values?
00212         std::ostringstream oss;
00213         TuplePrinter tuplePrinter;
00214         tuplePrinter.print(oss,tupleDesc,tupleData);
00215         stream.trace(traceLevel,oss.str());
00216     }
00217 }
00218 
00219 void ExecStreamScheduler::checkAbort() const
00220 {
00221 }
00222 
00223 uint ExecStreamScheduler::getDegreeOfParallelism()
00224 {
00225     return 1;
00226 }
00227 
00228 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $");
00229 
00230 // End ExecStreamScheduler.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1