00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamEmbryo.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/cache/QuotaCacheAccessor.h"
00032 #include "fennel/segment/SegmentAccessor.h"
00033 #include "fennel/segment/SegmentFactory.h"
00034 #include "fennel/cache/Cache.h"
00035 #include <iostream>
00036 
00037 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $");
00038 
00039 ExecStreamGraphEmbryo::ExecStreamGraphEmbryo(
00040     SharedExecStreamGraph pGraphInit,
00041     SharedExecStreamScheduler pSchedulerInit,
00042     SharedCache pCacheInit,
00043     SharedSegmentFactory pSegmentFactoryInit)
00044 {
00045     pGraph = pGraphInit;
00046     pScheduler = pSchedulerInit;
00047     pCacheAccessor = pCacheInit;
00048     scratchAccessor =
00049         pSegmentFactoryInit->newScratchSegment(pCacheInit);
00050 
00051     pGraph->setScratchSegment(scratchAccessor.pSegment);
00052 }
00053 
00054 ExecStreamGraphEmbryo::~ExecStreamGraphEmbryo()
00055 {
00056 }
00057 
00058 SharedExecStream ExecStreamGraphEmbryo::addAdapterFor(
00059     const std::string &name,
00060     uint iOutput,
00061     ExecStreamBufProvision requiredDataflow)
00062 {
00063     
00064     
00065     
00066 
00067     
00068     SharedExecStream pLastStream = pGraph->findLastStream(name, iOutput);
00069     ExecStreamBufProvision availableDataflow =
00070         pLastStream->getOutputBufProvision();
00071     assert(availableDataflow != BUFPROV_NONE);
00072 
00073     
00074     std::string adapterName;
00075     {
00076         int id = pGraph->getOutputCount(pLastStream->getStreamId());
00077         std::ostringstream oss;
00078         oss << pLastStream->getName() << "#" << id << ".provisioner";
00079         adapterName = oss.str();
00080     }
00081 
00082     
00083     switch (requiredDataflow) {
00084     case BUFPROV_CONSUMER:
00085         if (availableDataflow == BUFPROV_PRODUCER) {
00086             ExecStreamEmbryo embryo;
00087             pScheduler->createCopyProvisionAdapter(embryo);
00088             initializeAdapter(embryo, name, iOutput, adapterName);
00089             return embryo.getStream();
00090         }
00091         break;
00092     case BUFPROV_PRODUCER:
00093         if (availableDataflow == BUFPROV_CONSUMER) {
00094             ExecStreamEmbryo embryo;
00095             pScheduler->createBufferProvisionAdapter(embryo);
00096             initializeAdapter(embryo, name, iOutput, adapterName);
00097             return embryo.getStream();
00098         }
00099         break;
00100     default:
00101         permAssert(false);
00102     }
00103     return pLastStream;
00104 }
00105 
00106 void ExecStreamGraphEmbryo::initializeAdapter(
00107     ExecStreamEmbryo &embryo,
00108     std::string const &streamName,
00109     uint iOutput,
00110     std::string const &adapterName)
00111 {
00112     initStreamParams(*(embryo.getParams()));
00113     embryo.getStream()->setName(adapterName);
00114     saveStreamEmbryo(embryo);
00115     pGraph->interposeStream(
00116         streamName, iOutput, embryo.getStream()->getStreamId());
00117 }
00118 
00119 void ExecStreamGraphEmbryo::saveStreamEmbryo(ExecStreamEmbryo &embryo)
00120 {
00121     allStreamEmbryos[embryo.getStream()->getName()] = embryo;
00122     pGraph->addStream(embryo.getStream());
00123 }
00124 
00125 ExecStreamEmbryo &ExecStreamGraphEmbryo::getStreamEmbryo(
00126     std::string const &name)
00127 {
00128     StreamMapIter pPair = allStreamEmbryos.find(name);
00129     assert(pPair != allStreamEmbryos.end());
00130     return pPair->second;
00131 }
00132 
00133 void ExecStreamGraphEmbryo::addDataflow(
00134     const std::string &source,
00135     const std::string &target,
00136     bool isImplicit)
00137 {
00138     SharedExecStream pSourceStream =
00139         pGraph->findStream(source);
00140     SharedExecStream pTargetStream =
00141         pGraph->findStream(target);
00142     SharedExecStream pInput;
00143     if (isImplicit) {
00144         pInput = pSourceStream;
00145     } else {
00146         uint iOutput = pGraph->getOutputCount(pSourceStream->getStreamId());
00147         ExecStreamBufProvision requiredConversion =
00148             pSourceStream->getOutputBufConversion();
00149         if (requiredConversion != BUFPROV_NONE) {
00150             addAdapterFor(source, iOutput, requiredConversion);
00151         }
00152         ExecStreamBufProvision requiredDataflow =
00153             pTargetStream->getInputBufProvision();
00154         addAdapterFor(source, iOutput, requiredDataflow);
00155         pInput = pGraph->findLastStream(source, iOutput);
00156     }
00157     pGraph->addDataflow(
00158         pInput->getStreamId(),
00159         pTargetStream->getStreamId(),
00160         isImplicit);
00161 }
00162 
00163 void ExecStreamGraphEmbryo::initStreamParams(ExecStreamParams ¶ms)
00164 {
00165     params.pCacheAccessor = pCacheAccessor;
00166     params.scratchAccessor = scratchAccessor;
00167 
00168     
00169     
00170     uint quota = 0;
00171     SharedQuotaCacheAccessor pQuotaAccessor(
00172         new QuotaCacheAccessor(
00173             SharedQuotaCacheAccessor(),
00174             params.pCacheAccessor,
00175             quota));
00176     params.pCacheAccessor = pQuotaAccessor;
00177 
00178     
00179     
00180     params.scratchAccessor.pCacheAccessor.reset(
00181         new QuotaCacheAccessor(
00182             pQuotaAccessor,
00183             params.scratchAccessor.pCacheAccessor,
00184             quota));
00185 }
00186 
00187 ExecStreamGraph &ExecStreamGraphEmbryo::getGraph()
00188 {
00189     return *pGraph;
00190 }
00191 
00192 SegmentAccessor &ExecStreamGraphEmbryo::getScratchAccessor()
00193 {
00194     return scratchAccessor;
00195 }
00196 
00197 void ExecStreamGraphEmbryo::prepareGraph(
00198     SharedTraceTarget pTraceTarget,
00199     std::string const &tracePrefix)
00200 {
00201     pGraph->prepare(*pScheduler);
00202     std::vector<SharedExecStream> sortedStreams =
00203         pGraph->getSortedStreams();
00204     std::vector<SharedExecStream>::iterator pos;
00205     for (pos = sortedStreams.begin(); pos != sortedStreams.end(); pos++) {
00206         std::string name = (*pos)->getName();
00207         ExecStreamEmbryo &embryo = getStreamEmbryo(name);
00208         
00209         
00210         std::string traceName = tracePrefix + name;
00211         ExecStreamId streamId = embryo.getStream()->getStreamId();
00212         embryo.getStream()->initTraceSource(
00213             pTraceTarget,
00214             traceName);
00215         embryo.prepareStream();
00216 
00217         
00218         uint outputCount = pGraph->getOutputCount(streamId);
00219         for (uint i = 0; i < outputCount; ++i) {
00220             SharedExecStreamBufAccessor outAccessor =
00221                 pGraph->getStreamOutputAccessor(streamId, i);
00222             if (outAccessor->getTupleDesc().empty()) {
00223                 permFail("Forgot to initialize output #" << i << "of stream '"
00224                          << traceName << "'");
00225             }
00226         }
00227     }
00228 
00229     pScheduler->addGraph(pGraph);
00230 }
00231 
00232 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $");
00233 
00234