MergeExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/MergeExecStream.cpp#8 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/MergeExecStream.h"
00025 #include "fennel/exec/ExecStreamBufAccessor.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStreamScheduler.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/MergeExecStream.cpp#8 $");
00030 
00031 MergeExecStreamParams::MergeExecStreamParams()
00032 {
00033     isParallel = false;
00034 }
00035 
00036 void MergeExecStream::prepare(
00037     MergeExecStreamParams const &params)
00038 {
00039     ConfluenceExecStream::prepare(params);
00040     isParallel = params.isParallel;
00041     assert(!inAccessors.empty());
00042     for (uint i = 0; i < inAccessors.size(); ++i) {
00043         assert(
00044             inAccessors[i]->getTupleDesc() == pOutAccessor->getTupleDesc());
00045         assert(
00046             inAccessors[i]->getTupleFormat() == pOutAccessor->getTupleFormat());
00047     }
00048     inputEOS.resize(inAccessors.size());
00049 }
00050 
00051 void MergeExecStream::open(
00052     bool restart)
00053 {
00054     ConfluenceExecStream::open(restart);
00055     iInput = 0;
00056     pLastConsumptionEnd = NULL;
00057     nInputsEOS = 0;
00058     std::fill(inputEOS.begin(), inputEOS.end(), false);
00059     // Ignore the isParallel parameter unless we are actually running
00060     // in a parallel scheduler.
00061     if (pGraph->getScheduler()->getDegreeOfParallelism() == 1) {
00062         isParallel = false;
00063     }
00064 }
00065 
00066 ExecStreamResult MergeExecStream::execute(
00067     ExecStreamQuantum const &)
00068 {
00069     switch (pOutAccessor->getState()) {
00070     case EXECBUF_NONEMPTY:
00071     case EXECBUF_OVERFLOW:
00072         return EXECRC_BUF_OVERFLOW;
00073     case EXECBUF_UNDERFLOW:
00074     case EXECBUF_EMPTY:
00075         if (pLastConsumptionEnd) {
00076             // Since our output buf is empty, the downstream consumer
00077             // must have consumed everything up to the last byte we
00078             // told it was available; pass that information on to our
00079             // upstream producer.
00080             inAccessors[iInput]->consumeData(pLastConsumptionEnd);
00081             pLastConsumptionEnd = NULL;
00082         }
00083         break;
00084     case EXECBUF_EOS:
00085         return EXECRC_EOS;
00086     }
00087 
00088     int iInputStart = iInput;
00089     for (;;) {
00090         switch (inAccessors[iInput]->getState()) {
00091         case EXECBUF_OVERFLOW:
00092         case EXECBUF_NONEMPTY:
00093             // Pass through current input buf to our downstream consumer.
00094             pLastConsumptionEnd = inAccessors[iInput]->getConsumptionEnd();
00095             pOutAccessor->provideBufferForConsumption(
00096                 inAccessors[iInput]->getConsumptionStart(),
00097                 pLastConsumptionEnd);
00098             return EXECRC_BUF_OVERFLOW;
00099         case EXECBUF_UNDERFLOW:
00100             if (!isParallel) {
00101                 return EXECRC_BUF_UNDERFLOW;
00102             }
00103             ++iInput;
00104             break;
00105         case EXECBUF_EMPTY:
00106             inAccessors[iInput]->requestProduction();
00107             if (!isParallel) {
00108                 return EXECRC_BUF_UNDERFLOW;
00109             }
00110             ++iInput;
00111             break;
00112         case EXECBUF_EOS:
00113             if (!inputEOS[iInput]) {
00114                 inputEOS[iInput] = true;
00115                 nInputsEOS++;
00116             }
00117             // Current input is exhausted; move on to the next one.
00118             ++iInput;
00119             break;
00120         default:
00121             permAssert(false);
00122         }
00123         if (isParallel) {
00124             if (iInput == inAccessors.size()) {
00125                 iInput = 0;
00126             }
00127             if (iInput == iInputStart) {
00128                 // We've made one full loop without making any progress;
00129                 // time to give up for this quantum.
00130                 break;
00131             }
00132         } else {
00133             if (iInput == inAccessors.size()) {
00134                 break;
00135             }
00136         }
00137     }
00138     if (nInputsEOS == inAccessors.size()) {
00139         pOutAccessor->markEOS();
00140         return EXECRC_EOS;
00141     } else {
00142         assert(isParallel);
00143         return EXECRC_BUF_UNDERFLOW;
00144     }
00145 }
00146 
00147 ExecStreamBufProvision MergeExecStream::getOutputBufProvision() const
00148 {
00149     return BUFPROV_PRODUCER;
00150 }
00151 
00152 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/MergeExecStream.cpp#8 $");
00153 
00154 // End MergeExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1