CalcExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2004-2009 SQLstream, Inc.
00006 // Copyright (C) 2009-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/calculator/CalcExcn.h"
00026 #include "fennel/calculator/CalcExecStream.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 #include "fennel/exec/ExecStreamBufAccessor.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $");
00031 
00032 void CalcExecStream::prepare(CalcExecStreamParams const &params)
00033 {
00034     ConduitExecStream::prepare(params);
00035     stopOnCalcError = params.stopOnCalcError;
00036 
00037     try {
00038         // Force instantiation of the calculator's instruction tables.
00039         (void) CalcInit::instance();
00040 
00041         pCalc.reset(new Calculator(pDynamicParamManager.get()));
00042         if (isTracing()) {
00043             pCalc->initTraceSource(getSharedTraceTarget(), "calc");
00044         }
00045 
00046         pCalc->assemble(params.program.c_str());
00047 
00048         if (params.isFilter) {
00049             pFilterDatum = &((*(pCalc->getStatusRegister()))[0]);
00050         } else {
00051             pFilterDatum = NULL;
00052         }
00053 
00054         FENNEL_TRACE(
00055             TRACE_FINER,
00056             "calc program = "
00057             << std::endl << params.program);
00058 
00059         FENNEL_TRACE(
00060             TRACE_FINER,
00061             "calc input TupleDescriptor = "
00062             << pCalc->getInputRegisterDescriptor());
00063 
00064         inputDesc = pInAccessor->getTupleDesc();
00065         FENNEL_TRACE(
00066             TRACE_FINER,
00067             "xo input TupleDescriptor = "
00068             << inputDesc);
00069 
00070         FENNEL_TRACE(
00071             TRACE_FINER,
00072             "calc output TupleDescriptor = "
00073             << pCalc->getOutputRegisterDescriptor());
00074 
00075         FENNEL_TRACE(
00076             TRACE_FINER,
00077             "xo output TupleDescriptor = "
00078             << params.outputTupleDesc);
00079 
00080         assert(inputDesc.storageEqual(pCalc->getInputRegisterDescriptor()));
00081 
00082         TupleDescriptor outputDesc = pCalc->getOutputRegisterDescriptor();
00083 
00084         if (!params.outputTupleDesc.empty()) {
00085             assert(outputDesc.storageEqual(params.outputTupleDesc));
00086 
00087             // if the plan specifies an output descriptor with different
00088             // nullability, use that instead
00089             outputDesc = params.outputTupleDesc;
00090         }
00091         pOutAccessor->setTupleShape(
00092             outputDesc,
00093             pInAccessor->getTupleFormat());
00094 
00095         inputData.compute(inputDesc);
00096 
00097         outputData.compute(outputDesc);
00098 
00099         // bind calculator to tuple data (tuple data may later change)
00100         pCalc->bind(&inputData,&outputData);
00101 
00102         // Set calculator to return immediately on exception as a
00103         // workaround.  Prevents indeterminate results from an instruction
00104         // that throws an exception from causing non-deterministic
00105         // behavior later in program execution.
00106         pCalc->continueOnException(false);
00107     } catch (FennelExcn e) {
00108         FENNEL_TRACE(
00109             TRACE_SEVERE,
00110             "error preparing calculator: " << e.getMessage());
00111         throw e;
00112     }
00113 }
00114 
00115 void CalcExecStream::open(bool restart)
00116 {
00117     ConduitExecStream::open(restart);
00118 
00119     // Zero out status registers.
00120     if (pCalc != NULL) {
00121         pCalc->zeroStatusRegister();
00122     }
00123 }
00124 
00125 ExecStreamResult CalcExecStream::execute(ExecStreamQuantum const &quantum)
00126 {
00127     ExecStreamResult rc = precheckConduitBuffers();
00128     if (rc != EXECRC_YIELD) {
00129         return rc;
00130     }
00131 
00132 #define TRACE_RETURN \
00133     FENNEL_TRACE(TRACE_FINE, "read " << nRead << " rows, wrote " << nWritten)
00134 
00135     FENNEL_TRACE(TRACE_FINER, "start execute loop");
00136     uint nRead = 0;
00137     uint nWritten = 0;
00138     while (nRead < quantum.nTuplesMax) {
00139         while (!pInAccessor->isTupleConsumptionPending()) {
00140             if (!pInAccessor->demandData()) {
00141                 TRACE_RETURN;
00142                 return EXECRC_BUF_UNDERFLOW;
00143             }
00144 
00145             FENNEL_TRACE(TRACE_FINER, "input row " << nRead);
00146             pInAccessor->unmarshalTuple(inputData);
00147             try {
00148                 pCalc->exec();
00149             } catch (FennelExcn e) {
00150                 FENNEL_TRACE(
00151                     TRACE_SEVERE,
00152                     "error executing calculator: " << e.getMessage());
00153                 throw e;
00154             }
00155             bool skip = false;
00156             if (! pCalc->mWarnings.empty()) {
00157                 // calculator failed to produce a row
00158                 // REVIEW: Do we need to distinguish errors from warnings here?
00159                 // TODO: notify scheduler (interface TBD)
00160                 //  which can warn user or produce other side effects.
00161                 FENNEL_TRACE(
00162                     TRACE_WARNING, "calculator error " << pCalc->warnings());
00163                 if (stopOnCalcError) {
00164                     throw CalcExcn(pCalc->warnings(), inputDesc, inputData);
00165                 }
00166                 skip = true;
00167             } else if (pFilterDatum) {
00168                 bool filterDiscard =
00169                     *reinterpret_cast<bool const *>(pFilterDatum->pData);
00170                 if (filterDiscard) {
00171                     skip = true;
00172                 }
00173             }
00174             if (skip) {
00175                 FENNEL_TRACE(TRACE_FINER, "skip row " << nRead);
00176                 pInAccessor->consumeTuple();
00177                 ++nRead;
00178             }
00179         }
00180 
00181         FENNEL_TRACE(TRACE_FINER, "output row " << nWritten);
00182         if (!pOutAccessor->produceTuple(outputData)) {
00183             TRACE_RETURN;
00184             return EXECRC_BUF_OVERFLOW;
00185         }
00186         ++nWritten;
00187         pInAccessor->consumeTuple();
00188         ++nRead;
00189     }
00190     TRACE_RETURN;
00191     return EXECRC_QUANTUM_EXPIRED;
00192 
00193 #undef TRACE_RETURN
00194 }
00195 
00196 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $");
00197 
00198 // End CalcExecStream.cpp

Generated on Mon Jun 22 04:00:17 2009 for Fennel by  doxygen 1.5.1