00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/ExecStreamBuilder.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/db/Database.h"
00029
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $");
00031
00032 ExecStreamBuilder::ExecStreamBuilder(
00033 ExecStreamGraphEmbryo &graphEmbryoInit,
00034 ExecStreamFactory &streamFactoryInit)
00035 : graphEmbryo(graphEmbryoInit),
00036 streamFactory(streamFactoryInit)
00037 {
00038 }
00039
00040 ExecStreamBuilder::~ExecStreamBuilder()
00041 {
00042 }
00043
00044 void ExecStreamBuilder::buildStreamGraph(
00045 ProxyCmdPrepareExecutionStreamGraph &cmd,
00046 bool assumeOutputFromSinks)
00047 {
00048 streamFactory.setScratchAccessor(graphEmbryo.getScratchAccessor());
00049
00050
00051 SharedProxyExecutionStreamDef pStreamDef = cmd.getStreamDefs();
00052 for (; pStreamDef; ++pStreamDef) {
00053 buildStream(*pStreamDef);
00054 }
00055
00056
00057 pStreamDef = cmd.getStreamDefs();
00058 for (; pStreamDef; ++pStreamDef) {
00059 buildStreamInputs(*pStreamDef);
00060
00061 if (!getExplicitOutputCount(*pStreamDef) && assumeOutputFromSinks) {
00062
00063
00064 std::string name = pStreamDef->getName();
00065 SharedExecStream pAdaptedStream =
00066 graphEmbryo.addAdapterFor(name, 0, BUFPROV_PRODUCER);
00067 graphEmbryo.getGraph().addOutputDataflow(
00068 pAdaptedStream->getStreamId());
00069 }
00070 }
00071
00072
00073
00074 pStreamDef = cmd.getStreamDefs();
00075 for (; pStreamDef; ++pStreamDef) {
00076 buildStreamOutputs(*pStreamDef);
00077 }
00078
00079
00080 graphEmbryo.prepareGraph(
00081 streamFactory.getDatabase()->getSharedTraceTarget(),
00082 "xo.");
00083 }
00084
00085 void ExecStreamBuilder::buildStream(
00086 ProxyExecutionStreamDef &streamDef)
00087 {
00088 ExecStreamEmbryo embryo = streamFactory.visitStream(streamDef);
00089 graphEmbryo.saveStreamEmbryo(embryo);
00090 SharedProxyDynamicParamUse pParamUse = streamDef.getDynamicParamUse();
00091 for (; pParamUse; ++pParamUse) {
00092 DynamicParamId dynamicParamId(pParamUse->getDynamicParamId());
00093 if (pParamUse->isRead()) {
00094 if (false)
00095 std::cout << "stream " << embryo.getStream()->getStreamId()
00096 << " reads param " << dynamicParamId << std::endl;
00097 graphEmbryo.getGraph().declareDynamicParamReader(
00098 embryo.getStream()->getStreamId(),
00099 dynamicParamId);
00100 } else {
00101 if (false)
00102 std::cout << "stream " << embryo.getStream()->getStreamId()
00103 << " writes param " << dynamicParamId << std::endl;
00104 graphEmbryo.getGraph().declareDynamicParamWriter(
00105 embryo.getStream()->getStreamId(),
00106 dynamicParamId);
00107 }
00108 }
00109 }
00110
00111 void ExecStreamBuilder::buildStreamInputs(
00112 ProxyExecutionStreamDef &streamDef)
00113 {
00114 std::string name = streamDef.getName();
00115 SharedProxyExecStreamDataFlow pInputFlow = streamDef.getInputFlow();
00116 for (; pInputFlow; ++pInputFlow) {
00117 SharedProxyExecutionStreamDef pInput = pInputFlow->getProducer();
00118
00119
00120
00121
00122
00123
00124
00125
00126 if (getExplicitOutputCount(*pInput) > 1) {
00127 continue;
00128 }
00129 std::string inputName = pInput->getName();
00130 graphEmbryo.addDataflow(inputName, name, pInputFlow->isImplicit());
00131 }
00132 }
00133
00134 void ExecStreamBuilder::buildStreamOutputs(
00135 ProxyExecutionStreamDef &streamDef)
00136 {
00137 std::string name = streamDef.getName();
00138 SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow();
00139 if (!(getExplicitOutputCount(streamDef) > 1)) {
00140 return;
00141 }
00142 for (; pOutputFlow; ++pOutputFlow) {
00143 SharedProxyExecutionStreamDef pOutput = pOutputFlow->getConsumer();
00144 std::string outputName = pOutput->getName();
00145 graphEmbryo.addDataflow(name, outputName, pOutputFlow->isImplicit());
00146 }
00147 }
00148
00149 int ExecStreamBuilder::getExplicitOutputCount(
00150 ProxyExecutionStreamDef &streamDef)
00151 {
00152 int nExplicitOutputs = 0;
00153 SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow();
00154 for (; pOutputFlow; ++pOutputFlow) {
00155 if (!pOutputFlow->isImplicit()) {
00156 ++nExplicitOutputs;
00157 }
00158 }
00159 return nExplicitOutputs;
00160 }
00161
00162 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $");
00163
00164