SplitterExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SplitterExecStream.cpp#7 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/SplitterExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SplitterExecStream.cpp#7 $");
00029 
00030 void SplitterExecStream::open(bool restart)
00031 {
00032     DiffluenceExecStream::open(restart);
00033     iOutput = 0;
00034     pLastConsumptionEnd = NULL;
00035 }
00036 
00037 ExecStreamResult SplitterExecStream::execute(ExecStreamQuantum const &)
00038 {
00039     if (pLastConsumptionEnd) {
00040         while (iOutput < outAccessors.size()) {
00041             switch (outAccessors[iOutput]->getState()) {
00042             case EXECBUF_NONEMPTY:
00043             case EXECBUF_OVERFLOW:
00044                 return EXECRC_BUF_OVERFLOW;
00045             case EXECBUF_UNDERFLOW:
00046             case EXECBUF_EMPTY:
00047                 ++iOutput;
00048                 break;
00049             case EXECBUF_EOS:
00050                 assert(pInAccessor->getState() == EXECBUF_EOS);
00051                 return EXECRC_EOS;
00052             }
00053         }
00054 
00055         /*
00056          * All the output buf accessors have reached EXECBUF_EMPTY. This
00057          * means the downstream consumers must have consumed everything
00058          * up to the last byte we told them was available; pass that
00059          * information on to our upstream producer.
00060          */
00061         pInAccessor->consumeData(pLastConsumptionEnd);
00062         pLastConsumptionEnd = NULL;
00063         iOutput = 0;
00064     }
00065 
00066     switch (pInAccessor->getState()) {
00067     case EXECBUF_OVERFLOW:
00068     case EXECBUF_NONEMPTY:
00069         if (!pLastConsumptionEnd) {
00070             pLastConsumptionEnd = pInAccessor->getConsumptionEnd();
00071 
00072             /*
00073              * The same buffer is provided for consumption to all the output
00074              * buffer accessors.
00075              */
00076             for (int i = 0; i < outAccessors.size(); i ++) {
00077                 outAccessors[i]->provideBufferForConsumption(
00078                     pInAccessor->getConsumptionStart(),
00079                     pLastConsumptionEnd);
00080             }
00081         }
00082         return EXECRC_BUF_OVERFLOW;
00083     case EXECBUF_UNDERFLOW:
00084         return EXECRC_BUF_UNDERFLOW;
00085     case EXECBUF_EMPTY:
00086         pInAccessor->requestProduction();
00087         return EXECRC_BUF_UNDERFLOW;
00088     case EXECBUF_EOS:
00089         for (int i = 0; i < outAccessors.size(); i ++) {
00090             outAccessors[i]->markEOS();
00091         }
00092         return EXECRC_EOS;
00093     default:
00094         permAssert(false);
00095     }
00096 }
00097 
00098 ExecStreamBufProvision SplitterExecStream::getOutputBufProvision() const
00099 {
00100     /*
00101      * Splitter does not own any buffer; however, it provides its producer's
00102      * buffer directly to its consumer(s).
00103      */
00104     return BUFPROV_PRODUCER;
00105 }
00106 
00107 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SplitterExecStream.cpp#7 $");
00108 
00109 // End SplitterExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1