00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/calculator/CalcExcn.h"
00026 #include "fennel/calculator/CalcExecStream.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 #include "fennel/exec/ExecStreamBufAccessor.h"
00029
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $");
00031
00032 void CalcExecStream::prepare(CalcExecStreamParams const ¶ms)
00033 {
00034 ConduitExecStream::prepare(params);
00035 stopOnCalcError = params.stopOnCalcError;
00036
00037 try {
00038
00039 (void) CalcInit::instance();
00040
00041 pCalc.reset(new Calculator(pDynamicParamManager.get()));
00042 if (isTracing()) {
00043 pCalc->initTraceSource(getSharedTraceTarget(), "calc");
00044 }
00045
00046 pCalc->assemble(params.program.c_str());
00047
00048 if (params.isFilter) {
00049 pFilterDatum = &((*(pCalc->getStatusRegister()))[0]);
00050 } else {
00051 pFilterDatum = NULL;
00052 }
00053
00054 FENNEL_TRACE(
00055 TRACE_FINER,
00056 "calc program = "
00057 << std::endl << params.program);
00058
00059 FENNEL_TRACE(
00060 TRACE_FINER,
00061 "calc input TupleDescriptor = "
00062 << pCalc->getInputRegisterDescriptor());
00063
00064 inputDesc = pInAccessor->getTupleDesc();
00065 FENNEL_TRACE(
00066 TRACE_FINER,
00067 "xo input TupleDescriptor = "
00068 << inputDesc);
00069
00070 FENNEL_TRACE(
00071 TRACE_FINER,
00072 "calc output TupleDescriptor = "
00073 << pCalc->getOutputRegisterDescriptor());
00074
00075 FENNEL_TRACE(
00076 TRACE_FINER,
00077 "xo output TupleDescriptor = "
00078 << params.outputTupleDesc);
00079
00080 assert(inputDesc.storageEqual(pCalc->getInputRegisterDescriptor()));
00081
00082 TupleDescriptor outputDesc = pCalc->getOutputRegisterDescriptor();
00083
00084 if (!params.outputTupleDesc.empty()) {
00085 assert(outputDesc.storageEqual(params.outputTupleDesc));
00086
00087
00088
00089 outputDesc = params.outputTupleDesc;
00090 }
00091 pOutAccessor->setTupleShape(
00092 outputDesc,
00093 pInAccessor->getTupleFormat());
00094
00095 inputData.compute(inputDesc);
00096
00097 outputData.compute(outputDesc);
00098
00099
00100 pCalc->bind(&inputData,&outputData);
00101
00102
00103
00104
00105
00106 pCalc->continueOnException(false);
00107 } catch (FennelExcn e) {
00108 FENNEL_TRACE(
00109 TRACE_SEVERE,
00110 "error preparing calculator: " << e.getMessage());
00111 throw e;
00112 }
00113 }
00114
00115 void CalcExecStream::open(bool restart)
00116 {
00117 ConduitExecStream::open(restart);
00118
00119
00120 if (pCalc != NULL) {
00121 pCalc->zeroStatusRegister();
00122 }
00123 }
00124
00125 ExecStreamResult CalcExecStream::execute(ExecStreamQuantum const &quantum)
00126 {
00127 ExecStreamResult rc = precheckConduitBuffers();
00128 if (rc != EXECRC_YIELD) {
00129 return rc;
00130 }
00131
00132 #define TRACE_RETURN \
00133 FENNEL_TRACE(TRACE_FINE, "read " << nRead << " rows, wrote " << nWritten)
00134
00135 FENNEL_TRACE(TRACE_FINER, "start execute loop");
00136 uint nRead = 0;
00137 uint nWritten = 0;
00138 while (nRead < quantum.nTuplesMax) {
00139 while (!pInAccessor->isTupleConsumptionPending()) {
00140 if (!pInAccessor->demandData()) {
00141 TRACE_RETURN;
00142 return EXECRC_BUF_UNDERFLOW;
00143 }
00144
00145 FENNEL_TRACE(TRACE_FINER, "input row " << nRead);
00146 pInAccessor->unmarshalTuple(inputData);
00147 try {
00148 pCalc->exec();
00149 } catch (FennelExcn e) {
00150 FENNEL_TRACE(
00151 TRACE_SEVERE,
00152 "error executing calculator: " << e.getMessage());
00153 throw e;
00154 }
00155 bool skip = false;
00156 if (! pCalc->mWarnings.empty()) {
00157
00158
00159
00160
00161 FENNEL_TRACE(
00162 TRACE_WARNING, "calculator error " << pCalc->warnings());
00163 if (stopOnCalcError) {
00164 throw CalcExcn(pCalc->warnings(), inputDesc, inputData);
00165 }
00166 skip = true;
00167 } else if (pFilterDatum) {
00168 bool filterDiscard =
00169 *reinterpret_cast<bool const *>(pFilterDatum->pData);
00170 if (filterDiscard) {
00171 skip = true;
00172 }
00173 }
00174 if (skip) {
00175 FENNEL_TRACE(TRACE_FINER, "skip row " << nRead);
00176 pInAccessor->consumeTuple();
00177 ++nRead;
00178 }
00179 }
00180
00181 FENNEL_TRACE(TRACE_FINER, "output row " << nWritten);
00182 if (!pOutAccessor->produceTuple(outputData)) {
00183 TRACE_RETURN;
00184 return EXECRC_BUF_OVERFLOW;
00185 }
00186 ++nWritten;
00187 pInAccessor->consumeTuple();
00188 ++nRead;
00189 }
00190 TRACE_RETURN;
00191 return EXECRC_QUANTUM_EXPIRED;
00192
00193 #undef TRACE_RETURN
00194 }
00195
00196 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $");
00197
00198