ExecStreamBuilder.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2003-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/ExecStreamBuilder.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/db/Database.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $");
00031 
00032 ExecStreamBuilder::ExecStreamBuilder(
00033     ExecStreamGraphEmbryo &graphEmbryoInit,
00034     ExecStreamFactory &streamFactoryInit)
00035     : graphEmbryo(graphEmbryoInit),
00036       streamFactory(streamFactoryInit)
00037 {
00038 }
00039 
00040 ExecStreamBuilder::~ExecStreamBuilder()
00041 {
00042 }
00043 
00044 void ExecStreamBuilder::buildStreamGraph(
00045     ProxyCmdPrepareExecutionStreamGraph &cmd,
00046     bool assumeOutputFromSinks)
00047 {
00048     streamFactory.setScratchAccessor(graphEmbryo.getScratchAccessor());
00049 
00050     // PASS 1: add streams to graph
00051     SharedProxyExecutionStreamDef pStreamDef = cmd.getStreamDefs();
00052     for (; pStreamDef; ++pStreamDef) {
00053         buildStream(*pStreamDef);
00054     }
00055 
00056     // PASS 2: add input dataflows (provided the source input has only output)
00057     pStreamDef = cmd.getStreamDefs();
00058     for (; pStreamDef; ++pStreamDef) {
00059         buildStreamInputs(*pStreamDef);
00060 
00061         if (!getExplicitOutputCount(*pStreamDef) && assumeOutputFromSinks) {
00062             // Streams with no consumer are read directly by clients.  They
00063             // are expected to support producer provisioned results.
00064             std::string name = pStreamDef->getName();
00065             SharedExecStream pAdaptedStream =
00066                 graphEmbryo.addAdapterFor(name, 0, BUFPROV_PRODUCER);
00067             graphEmbryo.getGraph().addOutputDataflow(
00068                 pAdaptedStream->getStreamId());
00069         }
00070     }
00071 
00072     // PASS 3: add output dataflows in the cases where a stream has multiple
00073     // outputs
00074     pStreamDef = cmd.getStreamDefs();
00075     for (; pStreamDef; ++pStreamDef) {
00076         buildStreamOutputs(*pStreamDef);
00077     }
00078 
00079     // PASS 4: sort and prepare streams
00080     graphEmbryo.prepareGraph(
00081         streamFactory.getDatabase()->getSharedTraceTarget(),
00082         "xo.");
00083 }
00084 
00085 void ExecStreamBuilder::buildStream(
00086     ProxyExecutionStreamDef &streamDef)
00087 {
00088     ExecStreamEmbryo embryo = streamFactory.visitStream(streamDef);
00089     graphEmbryo.saveStreamEmbryo(embryo);
00090     SharedProxyDynamicParamUse pParamUse = streamDef.getDynamicParamUse();
00091     for (; pParamUse; ++pParamUse) {
00092         DynamicParamId dynamicParamId(pParamUse->getDynamicParamId());
00093         if (pParamUse->isRead()) {
00094             if (false)
00095                 std::cout << "stream " << embryo.getStream()->getStreamId()
00096                           << " reads param " << dynamicParamId << std::endl;
00097             graphEmbryo.getGraph().declareDynamicParamReader(
00098                 embryo.getStream()->getStreamId(),
00099                 dynamicParamId);
00100         } else {
00101             if (false)
00102                 std::cout << "stream " << embryo.getStream()->getStreamId()
00103                           << " writes param " << dynamicParamId << std::endl;
00104             graphEmbryo.getGraph().declareDynamicParamWriter(
00105                 embryo.getStream()->getStreamId(),
00106                 dynamicParamId);
00107         }
00108     }
00109 }
00110 
00111 void ExecStreamBuilder::buildStreamInputs(
00112     ProxyExecutionStreamDef &streamDef)
00113 {
00114     std::string name = streamDef.getName();
00115     SharedProxyExecStreamDataFlow pInputFlow = streamDef.getInputFlow();
00116     for (; pInputFlow; ++pInputFlow) {
00117         SharedProxyExecutionStreamDef pInput = pInputFlow->getProducer();
00118         // If the source input has multiple outputs, defer adding that flow
00119         // till later so we can add those flows in the order in which they
00120         // appear in the output flow list.
00121         //
00122         // NOTE zfong 12/4/06 - By deferring adding the input flows in the
00123         // scenario described above, this means we don't handle the case where
00124         // a dataflow is an ordered dataflow for both an input and an output.
00125         // The ordering will only be preserved on the output flows.
00126         if (getExplicitOutputCount(*pInput) > 1) {
00127             continue;
00128         }
00129         std::string inputName = pInput->getName();
00130         graphEmbryo.addDataflow(inputName, name, pInputFlow->isImplicit());
00131     }
00132 }
00133 
00134 void ExecStreamBuilder::buildStreamOutputs(
00135     ProxyExecutionStreamDef &streamDef)
00136 {
00137     std::string name = streamDef.getName();
00138     SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow();
00139     if (!(getExplicitOutputCount(streamDef) > 1)) {
00140         return;
00141     }
00142     for (; pOutputFlow; ++pOutputFlow) {
00143         SharedProxyExecutionStreamDef pOutput = pOutputFlow->getConsumer();
00144         std::string outputName = pOutput->getName();
00145         graphEmbryo.addDataflow(name, outputName, pOutputFlow->isImplicit());
00146     }
00147 }
00148 
00149 int ExecStreamBuilder::getExplicitOutputCount(
00150     ProxyExecutionStreamDef &streamDef)
00151 {
00152     int nExplicitOutputs = 0;
00153     SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow();
00154     for (; pOutputFlow; ++pOutputFlow) {
00155         if (!pOutputFlow->isImplicit()) {
00156             ++nExplicitOutputs;
00157         }
00158     }
00159     return nExplicitOutputs;
00160 }
00161 
00162 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $");
00163 
00164 // End ExecStreamBuilder.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1