00001 /* 00002 // $Id: //open/dev/fennel/exec/BarrierExecStream.cpp#12 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/BarrierExecStream.h" 00026 #include "fennel/exec/ExecStreamGraphImpl.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/BarrierExecStream.cpp#12 $"); 00030 void BarrierExecStream:: prepare(BarrierExecStreamParams const ¶ms) 00031 { 00032 TupleDescriptor outputTupleDesc; 00033 00034 ConfluenceExecStream::prepare(params); 00035 returnMode = params.returnMode; 00036 parameterIds = params.parameterIds; 00037 outputTupleDesc = inAccessors[0]->getTupleDesc(); 00038 00039 // validate the input and output descriptors 00040 assert(outputTupleDesc == pOutAccessor->getTupleDesc()); 00041 if (parameterIds.size() > 0) { 00042 assert(outputTupleDesc.size() == 1); 00043 dynParamVal.compute(outputTupleDesc); 00044 } 00045 if (!returnFirstInput()) { 00046 for (uint i = 1; i < inAccessors.size(); i ++) { 00047 assert(outputTupleDesc == inAccessors[i]->getTupleDesc()); 00048 } 00049 } 00050 00051 if (returnAnyInput()) { 00052 inputTuple.compute(outputTupleDesc); 00053 compareTuple.compute(outputTupleDesc); 00054 } 00055 00056 outputTupleAccessor = &pOutAccessor->getScratchTupleAccessor(); 00057 outputBufSize = outputTupleAccessor->getMaxByteCount(); 00058 uint nRows = parameterIds.size(); 00059 if (returnAllInputs()) { 00060 nRows += inAccessors.size(); 00061 } else { 00062 nRows += 1; 00063 } 00064 outputBufSize *= nRows; 00065 } 00066 00067 void BarrierExecStream::open(bool restart) 00068 { 00069 ConfluenceExecStream::open(restart); 00070 iInput = 0; 00071 00072 if (!restart) { 00073 outputTupleBuffer.reset(new FixedBuffer[outputBufSize]); 00074 } 00075 curOutputPos = 0; 00076 isDone = false; 00077 } 00078 00079 ExecStreamResult BarrierExecStream::execute(ExecStreamQuantum const &quantum) 00080 { 00081 if (isDone) { 00082 // already returned final result 00083 pOutAccessor->markEOS(); 00084 return EXECRC_EOS; 00085 } 00086 00087 switch (pOutAccessor->getState()) { 00088 case EXECBUF_NONEMPTY: 00089 case EXECBUF_OVERFLOW: 00090 return EXECRC_BUF_OVERFLOW; 00091 case EXECBUF_UNDERFLOW: 00092 case EXECBUF_EMPTY: 00093 break; 00094 case EXECBUF_EOS: 00095 return EXECRC_EOS; 00096 } 00097 00098 while (iInput < inAccessors.size()) { 00099 switch (inAccessors[iInput]->getState()) { 00100 case EXECBUF_OVERFLOW: 00101 case EXECBUF_NONEMPTY: 00102 processInputTuple(); 00103 // fall through 00104 case EXECBUF_UNDERFLOW: 00105 return EXECRC_BUF_UNDERFLOW; 00106 case EXECBUF_EMPTY: 00107 inAccessors[iInput]->requestProduction(); 00108 return EXECRC_BUF_UNDERFLOW; 00109 case EXECBUF_EOS: 00110 ++iInput; 00111 break; 00112 default: 00113 permAssert(false); 00114 } 00115 } 00116 00117 // write out the data passed in via dynamic parameters 00118 for (uint i = 0; i < parameterIds.size(); i++) { 00119 DynamicParam const ¶m = 00120 pDynamicParamManager->getParam(parameterIds[i]); 00121 assert(param.getDesc() == inAccessors[0]->getTupleDesc()[0]); 00122 dynParamVal[0] = param.getDatum(); 00123 outputTupleAccessor->marshal( 00124 dynParamVal, 00125 outputTupleBuffer.get() + curOutputPos); 00126 curOutputPos += outputTupleAccessor->getCurrentByteCount(); 00127 } 00128 00129 // Write out the output buffer and indicate OVERFLOW. 00130 pOutAccessor->provideBufferForConsumption( 00131 outputTupleBuffer.get(), 00132 outputTupleBuffer.get() + outputBufSize); 00133 00134 // close the producers to free up resources 00135 ExecStreamGraphImpl &graphImpl = 00136 dynamic_cast<ExecStreamGraphImpl&>(getGraph()); 00137 graphImpl.closeProducers(getStreamId()); 00138 isDone = true; 00139 00140 return EXECRC_BUF_OVERFLOW; 00141 } 00142 00143 ExecStreamBufProvision 00144 BarrierExecStream::getOutputBufProvision() const 00145 { 00146 return BUFPROV_PRODUCER; 00147 } 00148 00149 void BarrierExecStream::closeImpl() 00150 { 00151 ConfluenceExecStream::closeImpl(); 00152 outputTupleBuffer.reset(); 00153 } 00154 00155 void BarrierExecStream::processInputTuple() 00156 { 00157 switch (returnMode) { 00158 case BARRIER_RET_FIRST_INPUT: 00159 case BARRIER_RET_ANY_INPUT: 00160 // copy input to output if first input 00161 if (iInput == 0) { 00162 curOutputPos += 00163 copyInputData(outputTupleBuffer.get(), inAccessors[iInput]); 00164 outputTupleAccessor->setCurrentTupleBuf(outputTupleBuffer.get()); 00165 outputTupleAccessor->unmarshal(compareTuple); 00166 } else if (returnAnyInput()) { 00167 // sanity check in the case where all inputs are supposed to 00168 // return the same output -- make sure that is the case 00169 inAccessors[iInput]->unmarshalTuple(inputTuple); 00170 permAssert( 00171 (inAccessors[iInput]->getTupleDesc()).compareTuples( 00172 inputTuple, compareTuple) == 0); 00173 } else { 00174 inAccessors[iInput]->accessConsumptionTuple(); 00175 } 00176 break; 00177 00178 case BARRIER_RET_ALL_INPUTS: 00179 // copy the entire input tuple to the apppropriate position in 00180 // the output buffer 00181 curOutputPos += 00182 copyInputData( 00183 outputTupleBuffer.get() + curOutputPos, 00184 inAccessors[iInput]); 00185 break; 00186 00187 default: 00188 permAssert(false); 00189 } 00190 00191 inAccessors[iInput]->consumeTuple(); 00192 } 00193 00194 uint BarrierExecStream::copyInputData( 00195 PBuffer destBuffer, 00196 SharedExecStreamBufAccessor &pInAccessor) 00197 { 00198 uint nBytes = pInAccessor->accessConsumptionTuple().getCurrentByteCount(); 00199 memcpy( 00200 destBuffer, 00201 pInAccessor->getConsumptionStart(), 00202 nBytes); 00203 return nBytes; 00204 } 00205 00206 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/BarrierExecStream.cpp#12 $"); 00207 00208 // End BarrierExecStream.cpp