00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamEmbryo.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/cache/QuotaCacheAccessor.h"
00032 #include "fennel/segment/SegmentAccessor.h"
00033 #include "fennel/segment/SegmentFactory.h"
00034 #include "fennel/cache/Cache.h"
00035 #include <iostream>
00036
00037 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $");
00038
00039 ExecStreamGraphEmbryo::ExecStreamGraphEmbryo(
00040 SharedExecStreamGraph pGraphInit,
00041 SharedExecStreamScheduler pSchedulerInit,
00042 SharedCache pCacheInit,
00043 SharedSegmentFactory pSegmentFactoryInit)
00044 {
00045 pGraph = pGraphInit;
00046 pScheduler = pSchedulerInit;
00047 pCacheAccessor = pCacheInit;
00048 scratchAccessor =
00049 pSegmentFactoryInit->newScratchSegment(pCacheInit);
00050
00051 pGraph->setScratchSegment(scratchAccessor.pSegment);
00052 }
00053
00054 ExecStreamGraphEmbryo::~ExecStreamGraphEmbryo()
00055 {
00056 }
00057
00058 SharedExecStream ExecStreamGraphEmbryo::addAdapterFor(
00059 const std::string &name,
00060 uint iOutput,
00061 ExecStreamBufProvision requiredDataflow)
00062 {
00063
00064
00065
00066
00067
00068 SharedExecStream pLastStream = pGraph->findLastStream(name, iOutput);
00069 ExecStreamBufProvision availableDataflow =
00070 pLastStream->getOutputBufProvision();
00071 assert(availableDataflow != BUFPROV_NONE);
00072
00073
00074 std::string adapterName;
00075 {
00076 int id = pGraph->getOutputCount(pLastStream->getStreamId());
00077 std::ostringstream oss;
00078 oss << pLastStream->getName() << "#" << id << ".provisioner";
00079 adapterName = oss.str();
00080 }
00081
00082
00083 switch (requiredDataflow) {
00084 case BUFPROV_CONSUMER:
00085 if (availableDataflow == BUFPROV_PRODUCER) {
00086 ExecStreamEmbryo embryo;
00087 pScheduler->createCopyProvisionAdapter(embryo);
00088 initializeAdapter(embryo, name, iOutput, adapterName);
00089 return embryo.getStream();
00090 }
00091 break;
00092 case BUFPROV_PRODUCER:
00093 if (availableDataflow == BUFPROV_CONSUMER) {
00094 ExecStreamEmbryo embryo;
00095 pScheduler->createBufferProvisionAdapter(embryo);
00096 initializeAdapter(embryo, name, iOutput, adapterName);
00097 return embryo.getStream();
00098 }
00099 break;
00100 default:
00101 permAssert(false);
00102 }
00103 return pLastStream;
00104 }
00105
00106 void ExecStreamGraphEmbryo::initializeAdapter(
00107 ExecStreamEmbryo &embryo,
00108 std::string const &streamName,
00109 uint iOutput,
00110 std::string const &adapterName)
00111 {
00112 initStreamParams(*(embryo.getParams()));
00113 embryo.getStream()->setName(adapterName);
00114 saveStreamEmbryo(embryo);
00115 pGraph->interposeStream(
00116 streamName, iOutput, embryo.getStream()->getStreamId());
00117 }
00118
00119 void ExecStreamGraphEmbryo::saveStreamEmbryo(ExecStreamEmbryo &embryo)
00120 {
00121 allStreamEmbryos[embryo.getStream()->getName()] = embryo;
00122 pGraph->addStream(embryo.getStream());
00123 }
00124
00125 ExecStreamEmbryo &ExecStreamGraphEmbryo::getStreamEmbryo(
00126 std::string const &name)
00127 {
00128 StreamMapIter pPair = allStreamEmbryos.find(name);
00129 assert(pPair != allStreamEmbryos.end());
00130 return pPair->second;
00131 }
00132
00133 void ExecStreamGraphEmbryo::addDataflow(
00134 const std::string &source,
00135 const std::string &target,
00136 bool isImplicit)
00137 {
00138 SharedExecStream pSourceStream =
00139 pGraph->findStream(source);
00140 SharedExecStream pTargetStream =
00141 pGraph->findStream(target);
00142 SharedExecStream pInput;
00143 if (isImplicit) {
00144 pInput = pSourceStream;
00145 } else {
00146 uint iOutput = pGraph->getOutputCount(pSourceStream->getStreamId());
00147 ExecStreamBufProvision requiredConversion =
00148 pSourceStream->getOutputBufConversion();
00149 if (requiredConversion != BUFPROV_NONE) {
00150 addAdapterFor(source, iOutput, requiredConversion);
00151 }
00152 ExecStreamBufProvision requiredDataflow =
00153 pTargetStream->getInputBufProvision();
00154 addAdapterFor(source, iOutput, requiredDataflow);
00155 pInput = pGraph->findLastStream(source, iOutput);
00156 }
00157 pGraph->addDataflow(
00158 pInput->getStreamId(),
00159 pTargetStream->getStreamId(),
00160 isImplicit);
00161 }
00162
00163 void ExecStreamGraphEmbryo::initStreamParams(ExecStreamParams ¶ms)
00164 {
00165 params.pCacheAccessor = pCacheAccessor;
00166 params.scratchAccessor = scratchAccessor;
00167
00168
00169
00170 uint quota = 0;
00171 SharedQuotaCacheAccessor pQuotaAccessor(
00172 new QuotaCacheAccessor(
00173 SharedQuotaCacheAccessor(),
00174 params.pCacheAccessor,
00175 quota));
00176 params.pCacheAccessor = pQuotaAccessor;
00177
00178
00179
00180 params.scratchAccessor.pCacheAccessor.reset(
00181 new QuotaCacheAccessor(
00182 pQuotaAccessor,
00183 params.scratchAccessor.pCacheAccessor,
00184 quota));
00185 }
00186
00187 ExecStreamGraph &ExecStreamGraphEmbryo::getGraph()
00188 {
00189 return *pGraph;
00190 }
00191
00192 SegmentAccessor &ExecStreamGraphEmbryo::getScratchAccessor()
00193 {
00194 return scratchAccessor;
00195 }
00196
00197 void ExecStreamGraphEmbryo::prepareGraph(
00198 SharedTraceTarget pTraceTarget,
00199 std::string const &tracePrefix)
00200 {
00201 pGraph->prepare(*pScheduler);
00202 std::vector<SharedExecStream> sortedStreams =
00203 pGraph->getSortedStreams();
00204 std::vector<SharedExecStream>::iterator pos;
00205 for (pos = sortedStreams.begin(); pos != sortedStreams.end(); pos++) {
00206 std::string name = (*pos)->getName();
00207 ExecStreamEmbryo &embryo = getStreamEmbryo(name);
00208
00209
00210 std::string traceName = tracePrefix + name;
00211 ExecStreamId streamId = embryo.getStream()->getStreamId();
00212 embryo.getStream()->initTraceSource(
00213 pTraceTarget,
00214 traceName);
00215 embryo.prepareStream();
00216
00217
00218 uint outputCount = pGraph->getOutputCount(streamId);
00219 for (uint i = 0; i < outputCount; ++i) {
00220 SharedExecStreamBufAccessor outAccessor =
00221 pGraph->getStreamOutputAccessor(streamId, i);
00222 if (outAccessor->getTupleDesc().empty()) {
00223 permFail("Forgot to initialize output #" << i << "of stream '"
00224 << traceName << "'");
00225 }
00226 }
00227 }
00228
00229 pScheduler->addGraph(pGraph);
00230 }
00231
00232 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $");
00233
00234