#include <ExecStreamBuilder.h>
Public Member Functions | |
ExecStreamBuilder (ExecStreamGraphEmbryo &graphEmbryo, ExecStreamFactory &streamFactory) | |
Creates a new ExecStreamBuilder. | |
virtual | ~ExecStreamBuilder () |
void | buildStreamGraph (ProxyCmdPrepareExecutionStreamGraph &cmd, bool assumeOutputFromSinks) |
Main builder entry point. | |
Private Member Functions | |
void | buildStream (ProxyExecutionStreamDef &) |
Allocates a stream based on stream definition, adds the stream to a graph and records how to prepare the stream. | |
void | buildStreamInputs (ProxyExecutionStreamDef &streamDef) |
Adds dataflows between a stream and its inputs, in the case where the source input has only one output. | |
int | getExplicitOutputCount (ProxyExecutionStreamDef &streamDef) |
| |
void | buildStreamOutputs (ProxyExecutionStreamDef &streamDef) |
Adds dataflows between a stream and its outputs, preserving order in the case where a stream has multiple outputs. | |
Private Attributes | |
ExecStreamGraphEmbryo & | graphEmbryo |
Embryo for graph being built up. | |
ExecStreamFactory & | streamFactory |
Factory for creating ExecStream objects. |
It builds a graph in three phases:
Cache. A new scratch segment is allocated by the builder and is shared between the graph and its streams.
Tracing. All streams are assigned a trace name of: xo.streamName
Depending on a the TraceTarget, this typically corresponds to a trace property like org.eigenbase.fennel.xo.streamName
Buffer Provisioning. Provisioning adapters are special streams interposed between two other streams when the producer's result provisioning does not meet the consumer's input requirements. They are interposed during the dataflow phase. They are named: producerName.provisioner
Interposition. When provisioning adapters are appended to a stream, they consume the original stream's output and produce a new output. To make the appended streams work transparently, the chain of streams is treated as a single unit. Subsequent access to the stream's output is available through the graph by finding the "last" stream registered under the original stream's name.
Definition at line 71 of file ExecStreamBuilder.h.
ExecStreamBuilder::ExecStreamBuilder | ( | ExecStreamGraphEmbryo & | graphEmbryo, | |
ExecStreamFactory & | streamFactory | |||
) | [explicit] |
Creates a new ExecStreamBuilder.
graphEmbryo | embryo for graph to be built | |
streamFactory | factory for creating streams |
Definition at line 32 of file ExecStreamBuilder.cpp.
00035 : graphEmbryo(graphEmbryoInit), 00036 streamFactory(streamFactoryInit) 00037 { 00038 }
ExecStreamBuilder::~ExecStreamBuilder | ( | ) | [virtual] |
void ExecStreamBuilder::buildStream | ( | ProxyExecutionStreamDef & | ) | [private] |
Allocates a stream based on stream definition, adds the stream to a graph and records how to prepare the stream.
Definition at line 85 of file ExecStreamBuilder.cpp.
References ExecStreamGraph::declareDynamicParamReader(), ExecStreamGraph::declareDynamicParamWriter(), ProxyExecutionStreamDef::getDynamicParamUse(), ExecStreamGraphEmbryo::getGraph(), ExecStreamEmbryo::getStream(), graphEmbryo, ExecStreamGraphEmbryo::saveStreamEmbryo(), streamFactory, and ExecStreamFactory::visitStream().
Referenced by buildStreamGraph().
00087 { 00088 ExecStreamEmbryo embryo = streamFactory.visitStream(streamDef); 00089 graphEmbryo.saveStreamEmbryo(embryo); 00090 SharedProxyDynamicParamUse pParamUse = streamDef.getDynamicParamUse(); 00091 for (; pParamUse; ++pParamUse) { 00092 DynamicParamId dynamicParamId(pParamUse->getDynamicParamId()); 00093 if (pParamUse->isRead()) { 00094 if (false) 00095 std::cout << "stream " << embryo.getStream()->getStreamId() 00096 << " reads param " << dynamicParamId << std::endl; 00097 graphEmbryo.getGraph().declareDynamicParamReader( 00098 embryo.getStream()->getStreamId(), 00099 dynamicParamId); 00100 } else { 00101 if (false) 00102 std::cout << "stream " << embryo.getStream()->getStreamId() 00103 << " writes param " << dynamicParamId << std::endl; 00104 graphEmbryo.getGraph().declareDynamicParamWriter( 00105 embryo.getStream()->getStreamId(), 00106 dynamicParamId); 00107 } 00108 } 00109 }
void ExecStreamBuilder::buildStreamInputs | ( | ProxyExecutionStreamDef & | streamDef | ) | [private] |
Adds dataflows between a stream and its inputs, in the case where the source input has only one output.
Interposes provisioning adapters as required.
streamDef | corresponding Java stream definition being converted |
Definition at line 111 of file ExecStreamBuilder.cpp.
References ExecStreamGraphEmbryo::addDataflow(), getExplicitOutputCount(), ProxyExecutionStreamDef::getInputFlow(), ProxyExecutionStreamDef::getName(), and graphEmbryo.
Referenced by buildStreamGraph().
00113 { 00114 std::string name = streamDef.getName(); 00115 SharedProxyExecStreamDataFlow pInputFlow = streamDef.getInputFlow(); 00116 for (; pInputFlow; ++pInputFlow) { 00117 SharedProxyExecutionStreamDef pInput = pInputFlow->getProducer(); 00118 // If the source input has multiple outputs, defer adding that flow 00119 // till later so we can add those flows in the order in which they 00120 // appear in the output flow list. 00121 // 00122 // NOTE zfong 12/4/06 - By deferring adding the input flows in the 00123 // scenario described above, this means we don't handle the case where 00124 // a dataflow is an ordered dataflow for both an input and an output. 00125 // The ordering will only be preserved on the output flows. 00126 if (getExplicitOutputCount(*pInput) > 1) { 00127 continue; 00128 } 00129 std::string inputName = pInput->getName(); 00130 graphEmbryo.addDataflow(inputName, name, pInputFlow->isImplicit()); 00131 } 00132 }
int ExecStreamBuilder::getExplicitOutputCount | ( | ProxyExecutionStreamDef & | streamDef | ) | [private] |
Definition at line 149 of file ExecStreamBuilder.cpp.
References ProxyExecutionStreamDef::getOutputFlow().
Referenced by buildStreamGraph(), buildStreamInputs(), and buildStreamOutputs().
00151 { 00152 int nExplicitOutputs = 0; 00153 SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow(); 00154 for (; pOutputFlow; ++pOutputFlow) { 00155 if (!pOutputFlow->isImplicit()) { 00156 ++nExplicitOutputs; 00157 } 00158 } 00159 return nExplicitOutputs; 00160 }
void ExecStreamBuilder::buildStreamOutputs | ( | ProxyExecutionStreamDef & | streamDef | ) | [private] |
Adds dataflows between a stream and its outputs, preserving order in the case where a stream has multiple outputs.
streamDef | corresponding Java stream definition being converted |
Definition at line 134 of file ExecStreamBuilder.cpp.
References ExecStreamGraphEmbryo::addDataflow(), getExplicitOutputCount(), ProxyExecutionStreamDef::getName(), ProxyExecutionStreamDef::getOutputFlow(), and graphEmbryo.
Referenced by buildStreamGraph().
00136 { 00137 std::string name = streamDef.getName(); 00138 SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow(); 00139 if (!(getExplicitOutputCount(streamDef) > 1)) { 00140 return; 00141 } 00142 for (; pOutputFlow; ++pOutputFlow) { 00143 SharedProxyExecutionStreamDef pOutput = pOutputFlow->getConsumer(); 00144 std::string outputName = pOutput->getName(); 00145 graphEmbryo.addDataflow(name, outputName, pOutputFlow->isImplicit()); 00146 } 00147 }
void ExecStreamBuilder::buildStreamGraph | ( | ProxyCmdPrepareExecutionStreamGraph & | cmd, | |
bool | assumeOutputFromSinks | |||
) |
Main builder entry point.
cmd | Java representation for command containing collection of stream definitions | |
assumeOutputFromSinks | if true, sinks in the graph are assumed to be dataflow output nodes; if false, sinks in the graph are not treated specially |
Definition at line 44 of file ExecStreamBuilder.cpp.
References ExecStreamGraphEmbryo::addAdapterFor(), ExecStreamGraph::addOutputDataflow(), BUFPROV_PRODUCER, buildStream(), buildStreamInputs(), buildStreamOutputs(), ExecStreamFactory::getDatabase(), getExplicitOutputCount(), ExecStreamGraphEmbryo::getGraph(), ExecStreamGraphEmbryo::getScratchAccessor(), ProxyCmdPrepareExecutionStreamGraph::getStreamDefs(), graphEmbryo, ExecStreamGraphEmbryo::prepareGraph(), ExecStreamFactory::setScratchAccessor(), and streamFactory.
Referenced by CmdInterpreter::visit().
00047 { 00048 streamFactory.setScratchAccessor(graphEmbryo.getScratchAccessor()); 00049 00050 // PASS 1: add streams to graph 00051 SharedProxyExecutionStreamDef pStreamDef = cmd.getStreamDefs(); 00052 for (; pStreamDef; ++pStreamDef) { 00053 buildStream(*pStreamDef); 00054 } 00055 00056 // PASS 2: add input dataflows (provided the source input has only output) 00057 pStreamDef = cmd.getStreamDefs(); 00058 for (; pStreamDef; ++pStreamDef) { 00059 buildStreamInputs(*pStreamDef); 00060 00061 if (!getExplicitOutputCount(*pStreamDef) && assumeOutputFromSinks) { 00062 // Streams with no consumer are read directly by clients. They 00063 // are expected to support producer provisioned results. 00064 std::string name = pStreamDef->getName(); 00065 SharedExecStream pAdaptedStream = 00066 graphEmbryo.addAdapterFor(name, 0, BUFPROV_PRODUCER); 00067 graphEmbryo.getGraph().addOutputDataflow( 00068 pAdaptedStream->getStreamId()); 00069 } 00070 } 00071 00072 // PASS 3: add output dataflows in the cases where a stream has multiple 00073 // outputs 00074 pStreamDef = cmd.getStreamDefs(); 00075 for (; pStreamDef; ++pStreamDef) { 00076 buildStreamOutputs(*pStreamDef); 00077 } 00078 00079 // PASS 4: sort and prepare streams 00080 graphEmbryo.prepareGraph( 00081 streamFactory.getDatabase()->getSharedTraceTarget(), 00082 "xo."); 00083 }
Embryo for graph being built up.
Definition at line 77 of file ExecStreamBuilder.h.
Referenced by buildStream(), buildStreamGraph(), buildStreamInputs(), and buildStreamOutputs().
Factory for creating ExecStream objects.
Definition at line 82 of file ExecStreamBuilder.h.
Referenced by buildStream(), and buildStreamGraph().