#include <ExecStreamGraphEmbryo.h>
Public Member Functions | |
ExecStreamGraphEmbryo (SharedExecStreamGraph pGraph, SharedExecStreamScheduler pScheduler, SharedCache pCache, SharedSegmentFactory pSegmentFactory) | |
virtual | ~ExecStreamGraphEmbryo () |
void | initStreamParams (ExecStreamParams ¶ms) |
Initializes a stream's generic parameters. | |
ExecStreamGraph & | getGraph () |
| |
SegmentAccessor & | getScratchAccessor () |
| |
SharedExecStream | addAdapterFor (const std::string &name, uint iOutput, ExecStreamBufProvision requiredDataFlow) |
Ensures that a producer is capable of the specified buffer provisioning requirements. | |
void | saveStreamEmbryo (ExecStreamEmbryo &embryo) |
Registers a newly created, unprepared stream and adds it to the graph. | |
ExecStreamEmbryo & | getStreamEmbryo (const std::string &name) |
Looks up a registered stream. | |
void | addDataflow (const std::string &source, const std::string &target, bool isImplicit=false) |
Adds dataflow to graph, from one stream's output, after adapters, to another stream. | |
void | prepareGraph (SharedTraceTarget pTraceTarget, std::string const &tracePrefix) |
Prepares graph and all of its streams. | |
Private Types | |
typedef std::map< std::string, ExecStreamEmbryo > | StreamMap |
typedef StreamMap::const_iterator | StreamMapConstIter |
typedef StreamMap::iterator | StreamMapIter |
Private Member Functions | |
void | initializeAdapter (ExecStreamEmbryo &embryo, std::string const &streamName, uint iOutput, std::string const &adapterName) |
Initializes a new adapter stream. | |
Private Attributes | |
SharedExecStreamGraph | pGraph |
Unprepared graph. | |
SharedExecStreamScheduler | pScheduler |
Scheduler which will execute streams. | |
SharedCacheAccessor | pCacheAccessor |
Default cache accessor to be used by streams. | |
SegmentAccessor | scratchAccessor |
Default scratch segment accessor to be used by streams. | |
StreamMap | allStreamEmbryos |
Streams to be linked and prepared, mapped by name. |
Definition at line 41 of file ExecStreamGraphEmbryo.h.
typedef std::map<std::string,ExecStreamEmbryo> ExecStreamGraphEmbryo::StreamMap [private] |
Definition at line 44 of file ExecStreamGraphEmbryo.h.
typedef StreamMap::const_iterator ExecStreamGraphEmbryo::StreamMapConstIter [private] |
Definition at line 45 of file ExecStreamGraphEmbryo.h.
typedef StreamMap::iterator ExecStreamGraphEmbryo::StreamMapIter [private] |
Definition at line 46 of file ExecStreamGraphEmbryo.h.
ExecStreamGraphEmbryo::ExecStreamGraphEmbryo | ( | SharedExecStreamGraph | pGraph, | |
SharedExecStreamScheduler | pScheduler, | |||
SharedCache | pCache, | |||
SharedSegmentFactory | pSegmentFactory | |||
) | [explicit] |
Definition at line 39 of file ExecStreamGraphEmbryo.cpp.
References pCacheAccessor, pGraph, pScheduler, SegmentAccessor::pSegment, and scratchAccessor.
00044 { 00045 pGraph = pGraphInit; 00046 pScheduler = pSchedulerInit; 00047 pCacheAccessor = pCacheInit; 00048 scratchAccessor = 00049 pSegmentFactoryInit->newScratchSegment(pCacheInit); 00050 00051 pGraph->setScratchSegment(scratchAccessor.pSegment); 00052 }
ExecStreamGraphEmbryo::~ExecStreamGraphEmbryo | ( | ) | [virtual] |
void ExecStreamGraphEmbryo::initializeAdapter | ( | ExecStreamEmbryo & | embryo, | |
std::string const & | streamName, | |||
uint | iOutput, | |||
std::string const & | adapterName | |||
) | [private] |
Initializes a new adapter stream.
embryo | embryo for new adapter | |
streamName | name of stream being adapted | |
iOutput | ordinal of the output within the stream being adapted | |
adapterName | name of adapter stream |
Definition at line 106 of file ExecStreamGraphEmbryo.cpp.
References ExecStreamEmbryo::getParams(), ExecStreamEmbryo::getStream(), initStreamParams(), pGraph, and saveStreamEmbryo().
Referenced by addAdapterFor().
00111 { 00112 initStreamParams(*(embryo.getParams())); 00113 embryo.getStream()->setName(adapterName); 00114 saveStreamEmbryo(embryo); 00115 pGraph->interposeStream( 00116 streamName, iOutput, embryo.getStream()->getStreamId()); 00117 }
void ExecStreamGraphEmbryo::initStreamParams | ( | ExecStreamParams & | params | ) |
Initializes a stream's generic parameters.
params | parameters to be initialized |
Definition at line 163 of file ExecStreamGraphEmbryo.cpp.
References SegmentAccessor::pCacheAccessor, pCacheAccessor, ExecStreamParams::pCacheAccessor, scratchAccessor, and ExecStreamParams::scratchAccessor.
Referenced by ExecStreamFactory::createQuotaAccessors(), and initializeAdapter().
00164 { 00165 params.pCacheAccessor = pCacheAccessor; 00166 params.scratchAccessor = scratchAccessor; 00167 00168 // All cache access should be wrapped by quota checks. Actual 00169 // quotas and TxnIds will be set per-execution. 00170 uint quota = 0; 00171 SharedQuotaCacheAccessor pQuotaAccessor( 00172 new QuotaCacheAccessor( 00173 SharedQuotaCacheAccessor(), 00174 params.pCacheAccessor, 00175 quota)); 00176 params.pCacheAccessor = pQuotaAccessor; 00177 00178 // scratch access has to go through a separate CacheAccessor, but 00179 // delegates quota checking to pQuotaAccessor 00180 params.scratchAccessor.pCacheAccessor.reset( 00181 new QuotaCacheAccessor( 00182 pQuotaAccessor, 00183 params.scratchAccessor.pCacheAccessor, 00184 quota)); 00185 }
ExecStreamGraph & ExecStreamGraphEmbryo::getGraph | ( | ) |
Definition at line 187 of file ExecStreamGraphEmbryo.cpp.
References pGraph.
Referenced by ExecStreamBuilder::buildStream(), and ExecStreamBuilder::buildStreamGraph().
00188 { 00189 return *pGraph; 00190 }
SegmentAccessor & ExecStreamGraphEmbryo::getScratchAccessor | ( | ) |
Definition at line 192 of file ExecStreamGraphEmbryo.cpp.
References scratchAccessor.
Referenced by ExecStreamBuilder::buildStreamGraph().
00193 { 00194 return scratchAccessor; 00195 }
SharedExecStream ExecStreamGraphEmbryo::addAdapterFor | ( | const std::string & | name, | |
uint | iOutput, | |||
ExecStreamBufProvision | requiredDataFlow | |||
) |
Ensures that a producer is capable of the specified buffer provisioning requirements.
If producer is not capable, an adapter stream is appended to supply the required buffer provisioning.
The "producer" may be a single stream or may be a chain of streams. In either case, the adapter is appended to the end of the group under the name of the original stream. It is named according to the last stream: lastName#iOutput.provisioner
name | name of original stream | |
iOutput | ordinal of the output within the producer | |
requiredDataFlow | buffer provisioning requirement |
Definition at line 58 of file ExecStreamGraphEmbryo.cpp.
References BUFPROV_CONSUMER, BUFPROV_NONE, BUFPROV_PRODUCER, ExecStreamEmbryo::getStream(), initializeAdapter(), pGraph, and pScheduler.
Referenced by addDataflow(), and ExecStreamBuilder::buildStreamGraph().
00062 { 00063 // REVIEW jvs 18-Nov-2004: in the case of multiple outputs from one 00064 // stream, with consumers having different provisioning, this 00065 // could result in chains of adapters, which would be less than optimal 00066 00067 // Get available dataflow from last stream of group 00068 SharedExecStream pLastStream = pGraph->findLastStream(name, iOutput); 00069 ExecStreamBufProvision availableDataflow = 00070 pLastStream->getOutputBufProvision(); 00071 assert(availableDataflow != BUFPROV_NONE); 00072 00073 // Generate a name. 00074 std::string adapterName; 00075 { 00076 int id = pGraph->getOutputCount(pLastStream->getStreamId()); 00077 std::ostringstream oss; 00078 oss << pLastStream->getName() << "#" << id << ".provisioner"; 00079 adapterName = oss.str(); 00080 } 00081 00082 // If necessary, create an adapter based on the last stream 00083 switch (requiredDataflow) { 00084 case BUFPROV_CONSUMER: 00085 if (availableDataflow == BUFPROV_PRODUCER) { 00086 ExecStreamEmbryo embryo; 00087 pScheduler->createCopyProvisionAdapter(embryo); 00088 initializeAdapter(embryo, name, iOutput, adapterName); 00089 return embryo.getStream(); 00090 } 00091 break; 00092 case BUFPROV_PRODUCER: 00093 if (availableDataflow == BUFPROV_CONSUMER) { 00094 ExecStreamEmbryo embryo; 00095 pScheduler->createBufferProvisionAdapter(embryo); 00096 initializeAdapter(embryo, name, iOutput, adapterName); 00097 return embryo.getStream(); 00098 } 00099 break; 00100 default: 00101 permAssert(false); 00102 } 00103 return pLastStream; 00104 }
void ExecStreamGraphEmbryo::saveStreamEmbryo | ( | ExecStreamEmbryo & | embryo | ) |
Registers a newly created, unprepared stream and adds it to the graph.
embryo | stream embryo |
Definition at line 119 of file ExecStreamGraphEmbryo.cpp.
References allStreamEmbryos, ExecStreamEmbryo::getStream(), and pGraph.
Referenced by ExecStreamBuilder::buildStream(), and initializeAdapter().
00120 { 00121 allStreamEmbryos[embryo.getStream()->getName()] = embryo; 00122 pGraph->addStream(embryo.getStream()); 00123 }
ExecStreamEmbryo & ExecStreamGraphEmbryo::getStreamEmbryo | ( | const std::string & | name | ) |
Looks up a registered stream.
The stream *must* already be registered.
name | of stream to find |
Definition at line 125 of file ExecStreamGraphEmbryo.cpp.
References allStreamEmbryos.
Referenced by prepareGraph().
00127 { 00128 StreamMapIter pPair = allStreamEmbryos.find(name); 00129 assert(pPair != allStreamEmbryos.end()); 00130 return pPair->second; 00131 }
void ExecStreamGraphEmbryo::addDataflow | ( | const std::string & | source, | |
const std::string & | target, | |||
bool | isImplicit = false | |||
) |
Adds dataflow to graph, from one stream's output, after adapters, to another stream.
source | name of source stream | |
target | name of target stream | |
isImplicit | false (the default) if the edge represents direct dataflow; true if the edge represents an implicit dataflow dependency |
Definition at line 133 of file ExecStreamGraphEmbryo.cpp.
References addAdapterFor(), BUFPROV_NONE, and pGraph.
Referenced by ExecStreamBuilder::buildStreamInputs(), and ExecStreamBuilder::buildStreamOutputs().
00137 { 00138 SharedExecStream pSourceStream = 00139 pGraph->findStream(source); 00140 SharedExecStream pTargetStream = 00141 pGraph->findStream(target); 00142 SharedExecStream pInput; 00143 if (isImplicit) { 00144 pInput = pSourceStream; 00145 } else { 00146 uint iOutput = pGraph->getOutputCount(pSourceStream->getStreamId()); 00147 ExecStreamBufProvision requiredConversion = 00148 pSourceStream->getOutputBufConversion(); 00149 if (requiredConversion != BUFPROV_NONE) { 00150 addAdapterFor(source, iOutput, requiredConversion); 00151 } 00152 ExecStreamBufProvision requiredDataflow = 00153 pTargetStream->getInputBufProvision(); 00154 addAdapterFor(source, iOutput, requiredDataflow); 00155 pInput = pGraph->findLastStream(source, iOutput); 00156 } 00157 pGraph->addDataflow( 00158 pInput->getStreamId(), 00159 pTargetStream->getStreamId(), 00160 isImplicit); 00161 }
void ExecStreamGraphEmbryo::prepareGraph | ( | SharedTraceTarget | pTraceTarget, | |
std::string const & | tracePrefix | |||
) |
Prepares graph and all of its streams.
pTraceTarget | trace target for stream execution | |
tracePrefix | common prefix for stream trace names |
Definition at line 197 of file ExecStreamGraphEmbryo.cpp.
References ExecStreamId, ExecStreamEmbryo::getStream(), getStreamEmbryo(), TraceSource::name, pGraph, ExecStreamEmbryo::prepareStream(), pScheduler, and traceName.
Referenced by ExecStreamBuilder::buildStreamGraph().
00200 { 00201 pGraph->prepare(*pScheduler); 00202 std::vector<SharedExecStream> sortedStreams = 00203 pGraph->getSortedStreams(); 00204 std::vector<SharedExecStream>::iterator pos; 00205 for (pos = sortedStreams.begin(); pos != sortedStreams.end(); pos++) { 00206 std::string name = (*pos)->getName(); 00207 ExecStreamEmbryo &embryo = getStreamEmbryo(name); 00208 // Give streams a source name with an XO prefix so that users can 00209 // choose to trace XOs as a group 00210 std::string traceName = tracePrefix + name; 00211 ExecStreamId streamId = embryo.getStream()->getStreamId(); 00212 embryo.getStream()->initTraceSource( 00213 pTraceTarget, 00214 traceName); 00215 embryo.prepareStream(); 00216 00217 // Check that stream remembered to initialize its outputs. 00218 uint outputCount = pGraph->getOutputCount(streamId); 00219 for (uint i = 0; i < outputCount; ++i) { 00220 SharedExecStreamBufAccessor outAccessor = 00221 pGraph->getStreamOutputAccessor(streamId, i); 00222 if (outAccessor->getTupleDesc().empty()) { 00223 permFail("Forgot to initialize output #" << i << "of stream '" 00224 << traceName << "'"); 00225 } 00226 } 00227 } 00228 00229 pScheduler->addGraph(pGraph); 00230 }
Unprepared graph.
Definition at line 51 of file ExecStreamGraphEmbryo.h.
Referenced by addAdapterFor(), addDataflow(), ExecStreamGraphEmbryo(), getGraph(), initializeAdapter(), prepareGraph(), and saveStreamEmbryo().
Scheduler which will execute streams.
Definition at line 56 of file ExecStreamGraphEmbryo.h.
Referenced by addAdapterFor(), ExecStreamGraphEmbryo(), and prepareGraph().
Default cache accessor to be used by streams.
Definition at line 61 of file ExecStreamGraphEmbryo.h.
Referenced by ExecStreamGraphEmbryo(), and initStreamParams().
Default scratch segment accessor to be used by streams.
Definition at line 66 of file ExecStreamGraphEmbryo.h.
Referenced by ExecStreamGraphEmbryo(), getScratchAccessor(), and initStreamParams().
Streams to be linked and prepared, mapped by name.
Definition at line 71 of file ExecStreamGraphEmbryo.h.
Referenced by getStreamEmbryo(), and saveStreamEmbryo().