00001 /* 00002 // $Id: //open/dev/fennel/exec/SplitterExecStream.cpp#7 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/SplitterExecStream.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SplitterExecStream.cpp#7 $"); 00029 00030 void SplitterExecStream::open(bool restart) 00031 { 00032 DiffluenceExecStream::open(restart); 00033 iOutput = 0; 00034 pLastConsumptionEnd = NULL; 00035 } 00036 00037 ExecStreamResult SplitterExecStream::execute(ExecStreamQuantum const &) 00038 { 00039 if (pLastConsumptionEnd) { 00040 while (iOutput < outAccessors.size()) { 00041 switch (outAccessors[iOutput]->getState()) { 00042 case EXECBUF_NONEMPTY: 00043 case EXECBUF_OVERFLOW: 00044 return EXECRC_BUF_OVERFLOW; 00045 case EXECBUF_UNDERFLOW: 00046 case EXECBUF_EMPTY: 00047 ++iOutput; 00048 break; 00049 case EXECBUF_EOS: 00050 assert(pInAccessor->getState() == EXECBUF_EOS); 00051 return EXECRC_EOS; 00052 } 00053 } 00054 00055 /* 00056 * All the output buf accessors have reached EXECBUF_EMPTY. This 00057 * means the downstream consumers must have consumed everything 00058 * up to the last byte we told them was available; pass that 00059 * information on to our upstream producer. 00060 */ 00061 pInAccessor->consumeData(pLastConsumptionEnd); 00062 pLastConsumptionEnd = NULL; 00063 iOutput = 0; 00064 } 00065 00066 switch (pInAccessor->getState()) { 00067 case EXECBUF_OVERFLOW: 00068 case EXECBUF_NONEMPTY: 00069 if (!pLastConsumptionEnd) { 00070 pLastConsumptionEnd = pInAccessor->getConsumptionEnd(); 00071 00072 /* 00073 * The same buffer is provided for consumption to all the output 00074 * buffer accessors. 00075 */ 00076 for (int i = 0; i < outAccessors.size(); i ++) { 00077 outAccessors[i]->provideBufferForConsumption( 00078 pInAccessor->getConsumptionStart(), 00079 pLastConsumptionEnd); 00080 } 00081 } 00082 return EXECRC_BUF_OVERFLOW; 00083 case EXECBUF_UNDERFLOW: 00084 return EXECRC_BUF_UNDERFLOW; 00085 case EXECBUF_EMPTY: 00086 pInAccessor->requestProduction(); 00087 return EXECRC_BUF_UNDERFLOW; 00088 case EXECBUF_EOS: 00089 for (int i = 0; i < outAccessors.size(); i ++) { 00090 outAccessors[i]->markEOS(); 00091 } 00092 return EXECRC_EOS; 00093 default: 00094 permAssert(false); 00095 } 00096 } 00097 00098 ExecStreamBufProvision SplitterExecStream::getOutputBufProvision() const 00099 { 00100 /* 00101 * Splitter does not own any buffer; however, it provides its producer's 00102 * buffer directly to its consumer(s). 00103 */ 00104 return BUFPROV_PRODUCER; 00105 } 00106 00107 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SplitterExecStream.cpp#7 $"); 00108 00109 // End SplitterExecStream.cpp