BarrierExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/BarrierExecStream.cpp#12 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/BarrierExecStream.h"
00026 #include "fennel/exec/ExecStreamGraphImpl.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/BarrierExecStream.cpp#12 $");
00030 void BarrierExecStream:: prepare(BarrierExecStreamParams const &params)
00031 {
00032     TupleDescriptor outputTupleDesc;
00033 
00034     ConfluenceExecStream::prepare(params);
00035     returnMode = params.returnMode;
00036     parameterIds = params.parameterIds;
00037     outputTupleDesc = inAccessors[0]->getTupleDesc();
00038 
00039     // validate the input and output descriptors
00040     assert(outputTupleDesc == pOutAccessor->getTupleDesc());
00041     if (parameterIds.size() > 0) {
00042         assert(outputTupleDesc.size() == 1);
00043         dynParamVal.compute(outputTupleDesc);
00044     }
00045     if (!returnFirstInput()) {
00046         for (uint i = 1; i < inAccessors.size(); i ++) {
00047             assert(outputTupleDesc == inAccessors[i]->getTupleDesc());
00048         }
00049     }
00050 
00051     if (returnAnyInput()) {
00052         inputTuple.compute(outputTupleDesc);
00053         compareTuple.compute(outputTupleDesc);
00054     }
00055 
00056     outputTupleAccessor = &pOutAccessor->getScratchTupleAccessor();
00057     outputBufSize = outputTupleAccessor->getMaxByteCount();
00058     uint nRows = parameterIds.size();
00059     if (returnAllInputs()) {
00060         nRows += inAccessors.size();
00061     } else {
00062         nRows += 1;
00063     }
00064     outputBufSize *= nRows;
00065 }
00066 
00067 void BarrierExecStream::open(bool restart)
00068 {
00069     ConfluenceExecStream::open(restart);
00070     iInput = 0;
00071 
00072     if (!restart) {
00073         outputTupleBuffer.reset(new FixedBuffer[outputBufSize]);
00074     }
00075     curOutputPos = 0;
00076     isDone = false;
00077 }
00078 
00079 ExecStreamResult BarrierExecStream::execute(ExecStreamQuantum const &quantum)
00080 {
00081     if (isDone) {
00082         // already returned final result
00083         pOutAccessor->markEOS();
00084         return EXECRC_EOS;
00085     }
00086 
00087     switch (pOutAccessor->getState()) {
00088     case EXECBUF_NONEMPTY:
00089     case EXECBUF_OVERFLOW:
00090         return EXECRC_BUF_OVERFLOW;
00091     case EXECBUF_UNDERFLOW:
00092     case EXECBUF_EMPTY:
00093         break;
00094     case EXECBUF_EOS:
00095         return EXECRC_EOS;
00096     }
00097 
00098     while (iInput < inAccessors.size()) {
00099         switch (inAccessors[iInput]->getState()) {
00100         case EXECBUF_OVERFLOW:
00101         case EXECBUF_NONEMPTY:
00102             processInputTuple();
00103             // fall through
00104         case EXECBUF_UNDERFLOW:
00105             return EXECRC_BUF_UNDERFLOW;
00106         case EXECBUF_EMPTY:
00107             inAccessors[iInput]->requestProduction();
00108             return EXECRC_BUF_UNDERFLOW;
00109         case EXECBUF_EOS:
00110             ++iInput;
00111             break;
00112         default:
00113             permAssert(false);
00114         }
00115     }
00116 
00117     // write out the data passed in via dynamic parameters
00118     for (uint i = 0; i < parameterIds.size(); i++) {
00119         DynamicParam const &param =
00120             pDynamicParamManager->getParam(parameterIds[i]);
00121         assert(param.getDesc() == inAccessors[0]->getTupleDesc()[0]);
00122         dynParamVal[0] = param.getDatum();
00123         outputTupleAccessor->marshal(
00124             dynParamVal,
00125             outputTupleBuffer.get() + curOutputPos);
00126         curOutputPos += outputTupleAccessor->getCurrentByteCount();
00127     }
00128 
00129     // Write out the output buffer and indicate OVERFLOW.
00130     pOutAccessor->provideBufferForConsumption(
00131         outputTupleBuffer.get(),
00132         outputTupleBuffer.get() + outputBufSize);
00133 
00134     // close the producers to free up resources
00135     ExecStreamGraphImpl &graphImpl =
00136         dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00137     graphImpl.closeProducers(getStreamId());
00138     isDone = true;
00139 
00140     return EXECRC_BUF_OVERFLOW;
00141 }
00142 
00143 ExecStreamBufProvision
00144     BarrierExecStream::getOutputBufProvision() const
00145 {
00146     return BUFPROV_PRODUCER;
00147 }
00148 
00149 void BarrierExecStream::closeImpl()
00150 {
00151     ConfluenceExecStream::closeImpl();
00152     outputTupleBuffer.reset();
00153 }
00154 
00155 void BarrierExecStream::processInputTuple()
00156 {
00157     switch (returnMode) {
00158     case BARRIER_RET_FIRST_INPUT:
00159     case BARRIER_RET_ANY_INPUT:
00160         // copy input to output if first input
00161         if (iInput == 0) {
00162             curOutputPos +=
00163                 copyInputData(outputTupleBuffer.get(), inAccessors[iInput]);
00164             outputTupleAccessor->setCurrentTupleBuf(outputTupleBuffer.get());
00165             outputTupleAccessor->unmarshal(compareTuple);
00166         } else if (returnAnyInput()) {
00167             // sanity check in the case where all inputs are supposed to
00168             // return the same output -- make sure that is the case
00169             inAccessors[iInput]->unmarshalTuple(inputTuple);
00170             permAssert(
00171                 (inAccessors[iInput]->getTupleDesc()).compareTuples(
00172                     inputTuple, compareTuple) == 0);
00173         } else {
00174             inAccessors[iInput]->accessConsumptionTuple();
00175         }
00176         break;
00177 
00178     case BARRIER_RET_ALL_INPUTS:
00179         // copy the entire input tuple to the apppropriate position in
00180         // the output buffer
00181         curOutputPos +=
00182             copyInputData(
00183                 outputTupleBuffer.get() + curOutputPos,
00184                 inAccessors[iInput]);
00185         break;
00186 
00187     default:
00188         permAssert(false);
00189     }
00190 
00191     inAccessors[iInput]->consumeTuple();
00192 }
00193 
00194 uint BarrierExecStream::copyInputData(
00195     PBuffer destBuffer,
00196     SharedExecStreamBufAccessor &pInAccessor)
00197 {
00198     uint nBytes = pInAccessor->accessConsumptionTuple().getCurrentByteCount();
00199     memcpy(
00200         destBuffer,
00201         pInAccessor->getConsumptionStart(),
00202         nBytes);
00203     return nBytes;
00204 }
00205 
00206 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/BarrierExecStream.cpp#12 $");
00207 
00208 // End BarrierExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1