00001 /* 00002 // $Id: //open/dev/fennel/exec/MergeExecStream.cpp#8 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // 00008 // This program is free software; you can redistribute it and/or modify it 00009 // under the terms of the GNU General Public License as published by the Free 00010 // Software Foundation; either version 2 of the License, or (at your option) 00011 // any later version approved by The Eigenbase Project. 00012 // 00013 // This program is distributed in the hope that it will be useful, 00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 // GNU General Public License for more details. 00017 // 00018 // You should have received a copy of the GNU General Public License 00019 // along with this program; if not, write to the Free Software 00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00021 */ 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/exec/MergeExecStream.h" 00025 #include "fennel/exec/ExecStreamBufAccessor.h" 00026 #include "fennel/exec/ExecStreamGraph.h" 00027 #include "fennel/exec/ExecStreamScheduler.h" 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/MergeExecStream.cpp#8 $"); 00030 00031 MergeExecStreamParams::MergeExecStreamParams() 00032 { 00033 isParallel = false; 00034 } 00035 00036 void MergeExecStream::prepare( 00037 MergeExecStreamParams const ¶ms) 00038 { 00039 ConfluenceExecStream::prepare(params); 00040 isParallel = params.isParallel; 00041 assert(!inAccessors.empty()); 00042 for (uint i = 0; i < inAccessors.size(); ++i) { 00043 assert( 00044 inAccessors[i]->getTupleDesc() == pOutAccessor->getTupleDesc()); 00045 assert( 00046 inAccessors[i]->getTupleFormat() == pOutAccessor->getTupleFormat()); 00047 } 00048 inputEOS.resize(inAccessors.size()); 00049 } 00050 00051 void MergeExecStream::open( 00052 bool restart) 00053 { 00054 ConfluenceExecStream::open(restart); 00055 iInput = 0; 00056 pLastConsumptionEnd = NULL; 00057 nInputsEOS = 0; 00058 std::fill(inputEOS.begin(), inputEOS.end(), false); 00059 // Ignore the isParallel parameter unless we are actually running 00060 // in a parallel scheduler. 00061 if (pGraph->getScheduler()->getDegreeOfParallelism() == 1) { 00062 isParallel = false; 00063 } 00064 } 00065 00066 ExecStreamResult MergeExecStream::execute( 00067 ExecStreamQuantum const &) 00068 { 00069 switch (pOutAccessor->getState()) { 00070 case EXECBUF_NONEMPTY: 00071 case EXECBUF_OVERFLOW: 00072 return EXECRC_BUF_OVERFLOW; 00073 case EXECBUF_UNDERFLOW: 00074 case EXECBUF_EMPTY: 00075 if (pLastConsumptionEnd) { 00076 // Since our output buf is empty, the downstream consumer 00077 // must have consumed everything up to the last byte we 00078 // told it was available; pass that information on to our 00079 // upstream producer. 00080 inAccessors[iInput]->consumeData(pLastConsumptionEnd); 00081 pLastConsumptionEnd = NULL; 00082 } 00083 break; 00084 case EXECBUF_EOS: 00085 return EXECRC_EOS; 00086 } 00087 00088 int iInputStart = iInput; 00089 for (;;) { 00090 switch (inAccessors[iInput]->getState()) { 00091 case EXECBUF_OVERFLOW: 00092 case EXECBUF_NONEMPTY: 00093 // Pass through current input buf to our downstream consumer. 00094 pLastConsumptionEnd = inAccessors[iInput]->getConsumptionEnd(); 00095 pOutAccessor->provideBufferForConsumption( 00096 inAccessors[iInput]->getConsumptionStart(), 00097 pLastConsumptionEnd); 00098 return EXECRC_BUF_OVERFLOW; 00099 case EXECBUF_UNDERFLOW: 00100 if (!isParallel) { 00101 return EXECRC_BUF_UNDERFLOW; 00102 } 00103 ++iInput; 00104 break; 00105 case EXECBUF_EMPTY: 00106 inAccessors[iInput]->requestProduction(); 00107 if (!isParallel) { 00108 return EXECRC_BUF_UNDERFLOW; 00109 } 00110 ++iInput; 00111 break; 00112 case EXECBUF_EOS: 00113 if (!inputEOS[iInput]) { 00114 inputEOS[iInput] = true; 00115 nInputsEOS++; 00116 } 00117 // Current input is exhausted; move on to the next one. 00118 ++iInput; 00119 break; 00120 default: 00121 permAssert(false); 00122 } 00123 if (isParallel) { 00124 if (iInput == inAccessors.size()) { 00125 iInput = 0; 00126 } 00127 if (iInput == iInputStart) { 00128 // We've made one full loop without making any progress; 00129 // time to give up for this quantum. 00130 break; 00131 } 00132 } else { 00133 if (iInput == inAccessors.size()) { 00134 break; 00135 } 00136 } 00137 } 00138 if (nInputsEOS == inAccessors.size()) { 00139 pOutAccessor->markEOS(); 00140 return EXECRC_EOS; 00141 } else { 00142 assert(isParallel); 00143 return EXECRC_BUF_UNDERFLOW; 00144 } 00145 } 00146 00147 ExecStreamBufProvision MergeExecStream::getOutputBufProvision() const 00148 { 00149 return BUFPROV_PRODUCER; 00150 } 00151 00152 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/MergeExecStream.cpp#8 $"); 00153 00154 // End MergeExecStream.cpp