ExecStreamGraphEmbryo.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamEmbryo.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/cache/QuotaCacheAccessor.h"
00032 #include "fennel/segment/SegmentAccessor.h"
00033 #include "fennel/segment/SegmentFactory.h"
00034 #include "fennel/cache/Cache.h"
00035 #include <iostream>
00036 
00037 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $");
00038 
00039 ExecStreamGraphEmbryo::ExecStreamGraphEmbryo(
00040     SharedExecStreamGraph pGraphInit,
00041     SharedExecStreamScheduler pSchedulerInit,
00042     SharedCache pCacheInit,
00043     SharedSegmentFactory pSegmentFactoryInit)
00044 {
00045     pGraph = pGraphInit;
00046     pScheduler = pSchedulerInit;
00047     pCacheAccessor = pCacheInit;
00048     scratchAccessor =
00049         pSegmentFactoryInit->newScratchSegment(pCacheInit);
00050 
00051     pGraph->setScratchSegment(scratchAccessor.pSegment);
00052 }
00053 
00054 ExecStreamGraphEmbryo::~ExecStreamGraphEmbryo()
00055 {
00056 }
00057 
00058 SharedExecStream ExecStreamGraphEmbryo::addAdapterFor(
00059     const std::string &name,
00060     uint iOutput,
00061     ExecStreamBufProvision requiredDataflow)
00062 {
00063     // REVIEW jvs 18-Nov-2004:  in the case of multiple outputs from one
00064     // stream, with consumers having different provisioning, this
00065     // could result in chains of adapters, which would be less than optimal
00066 
00067     // Get available dataflow from last stream of group
00068     SharedExecStream pLastStream = pGraph->findLastStream(name, iOutput);
00069     ExecStreamBufProvision availableDataflow =
00070         pLastStream->getOutputBufProvision();
00071     assert(availableDataflow != BUFPROV_NONE);
00072 
00073     // Generate a name.
00074     std::string adapterName;
00075     {
00076         int id = pGraph->getOutputCount(pLastStream->getStreamId());
00077         std::ostringstream oss;
00078         oss << pLastStream->getName() << "#" << id << ".provisioner";
00079         adapterName = oss.str();
00080     }
00081 
00082     // If necessary, create an adapter based on the last stream
00083     switch (requiredDataflow) {
00084     case BUFPROV_CONSUMER:
00085         if (availableDataflow == BUFPROV_PRODUCER) {
00086             ExecStreamEmbryo embryo;
00087             pScheduler->createCopyProvisionAdapter(embryo);
00088             initializeAdapter(embryo, name, iOutput, adapterName);
00089             return embryo.getStream();
00090         }
00091         break;
00092     case BUFPROV_PRODUCER:
00093         if (availableDataflow == BUFPROV_CONSUMER) {
00094             ExecStreamEmbryo embryo;
00095             pScheduler->createBufferProvisionAdapter(embryo);
00096             initializeAdapter(embryo, name, iOutput, adapterName);
00097             return embryo.getStream();
00098         }
00099         break;
00100     default:
00101         permAssert(false);
00102     }
00103     return pLastStream;
00104 }
00105 
00106 void ExecStreamGraphEmbryo::initializeAdapter(
00107     ExecStreamEmbryo &embryo,
00108     std::string const &streamName,
00109     uint iOutput,
00110     std::string const &adapterName)
00111 {
00112     initStreamParams(*(embryo.getParams()));
00113     embryo.getStream()->setName(adapterName);
00114     saveStreamEmbryo(embryo);
00115     pGraph->interposeStream(
00116         streamName, iOutput, embryo.getStream()->getStreamId());
00117 }
00118 
00119 void ExecStreamGraphEmbryo::saveStreamEmbryo(ExecStreamEmbryo &embryo)
00120 {
00121     allStreamEmbryos[embryo.getStream()->getName()] = embryo;
00122     pGraph->addStream(embryo.getStream());
00123 }
00124 
00125 ExecStreamEmbryo &ExecStreamGraphEmbryo::getStreamEmbryo(
00126     std::string const &name)
00127 {
00128     StreamMapIter pPair = allStreamEmbryos.find(name);
00129     assert(pPair != allStreamEmbryos.end());
00130     return pPair->second;
00131 }
00132 
00133 void ExecStreamGraphEmbryo::addDataflow(
00134     const std::string &source,
00135     const std::string &target,
00136     bool isImplicit)
00137 {
00138     SharedExecStream pSourceStream =
00139         pGraph->findStream(source);
00140     SharedExecStream pTargetStream =
00141         pGraph->findStream(target);
00142     SharedExecStream pInput;
00143     if (isImplicit) {
00144         pInput = pSourceStream;
00145     } else {
00146         uint iOutput = pGraph->getOutputCount(pSourceStream->getStreamId());
00147         ExecStreamBufProvision requiredConversion =
00148             pSourceStream->getOutputBufConversion();
00149         if (requiredConversion != BUFPROV_NONE) {
00150             addAdapterFor(source, iOutput, requiredConversion);
00151         }
00152         ExecStreamBufProvision requiredDataflow =
00153             pTargetStream->getInputBufProvision();
00154         addAdapterFor(source, iOutput, requiredDataflow);
00155         pInput = pGraph->findLastStream(source, iOutput);
00156     }
00157     pGraph->addDataflow(
00158         pInput->getStreamId(),
00159         pTargetStream->getStreamId(),
00160         isImplicit);
00161 }
00162 
00163 void ExecStreamGraphEmbryo::initStreamParams(ExecStreamParams &params)
00164 {
00165     params.pCacheAccessor = pCacheAccessor;
00166     params.scratchAccessor = scratchAccessor;
00167 
00168     // All cache access should be wrapped by quota checks.  Actual
00169     // quotas and TxnIds will be set per-execution.
00170     uint quota = 0;
00171     SharedQuotaCacheAccessor pQuotaAccessor(
00172         new QuotaCacheAccessor(
00173             SharedQuotaCacheAccessor(),
00174             params.pCacheAccessor,
00175             quota));
00176     params.pCacheAccessor = pQuotaAccessor;
00177 
00178     // scratch access has to go through a separate CacheAccessor, but
00179     // delegates quota checking to pQuotaAccessor
00180     params.scratchAccessor.pCacheAccessor.reset(
00181         new QuotaCacheAccessor(
00182             pQuotaAccessor,
00183             params.scratchAccessor.pCacheAccessor,
00184             quota));
00185 }
00186 
00187 ExecStreamGraph &ExecStreamGraphEmbryo::getGraph()
00188 {
00189     return *pGraph;
00190 }
00191 
00192 SegmentAccessor &ExecStreamGraphEmbryo::getScratchAccessor()
00193 {
00194     return scratchAccessor;
00195 }
00196 
00197 void ExecStreamGraphEmbryo::prepareGraph(
00198     SharedTraceTarget pTraceTarget,
00199     std::string const &tracePrefix)
00200 {
00201     pGraph->prepare(*pScheduler);
00202     std::vector<SharedExecStream> sortedStreams =
00203         pGraph->getSortedStreams();
00204     std::vector<SharedExecStream>::iterator pos;
00205     for (pos = sortedStreams.begin(); pos != sortedStreams.end(); pos++) {
00206         std::string name = (*pos)->getName();
00207         ExecStreamEmbryo &embryo = getStreamEmbryo(name);
00208         // Give streams a source name with an XO prefix so that users can
00209         // choose to trace XOs as a group
00210         std::string traceName = tracePrefix + name;
00211         ExecStreamId streamId = embryo.getStream()->getStreamId();
00212         embryo.getStream()->initTraceSource(
00213             pTraceTarget,
00214             traceName);
00215         embryo.prepareStream();
00216 
00217         // Check that stream remembered to initialize its outputs.
00218         uint outputCount = pGraph->getOutputCount(streamId);
00219         for (uint i = 0; i < outputCount; ++i) {
00220             SharedExecStreamBufAccessor outAccessor =
00221                 pGraph->getStreamOutputAccessor(streamId, i);
00222             if (outAccessor->getTupleDesc().empty()) {
00223                 permFail("Forgot to initialize output #" << i << "of stream '"
00224                          << traceName << "'");
00225             }
00226         }
00227     }
00228 
00229     pScheduler->addGraph(pGraph);
00230 }
00231 
00232 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $");
00233 
00234 // End ExecStreamGraphEmbryo.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1