MockConsumerExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/MockConsumerExecStream.cpp#13 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/MockConsumerExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/MockConsumerExecStream.cpp#13 $");
00030 
00031 ExecStreamResult MockConsumerExecStream::execute(
00032     ExecStreamQuantum const &quantum)
00033 {
00034     ExecStreamBufAccessor &inAccessor = *pInAccessor;
00035     switch (inAccessor.getState()) {
00036     case EXECBUF_EMPTY:
00037         inAccessor.requestProduction();
00038         return EXECRC_BUF_UNDERFLOW;
00039     case EXECBUF_UNDERFLOW:
00040         return EXECRC_BUF_UNDERFLOW;
00041     case EXECBUF_EOS:
00042         recvEOS = true;
00043         return EXECRC_EOS;
00044     case EXECBUF_NONEMPTY:
00045     case EXECBUF_OVERFLOW:
00046         break;
00047     default:
00048         permFail("Bad state " << inAccessor.getState());
00049     }
00050     assert(inAccessor.isConsumptionPossible());
00051 
00052     // Read rows from the input buffer until we exceed the quantum or read all
00053     // of the rows. Convert each row to a string, and append to the rows
00054     // vector.
00055     for (uint iRow = 0; iRow < quantum.nTuplesMax; ++iRow) {
00056         if (!inAccessor.demandData()) {
00057             // Convert buf return code into stream return code.
00058             switch (inAccessor.getState()) {
00059             case EXECBUF_UNDERFLOW:
00060                 return EXECRC_BUF_UNDERFLOW;
00061             case EXECBUF_EOS:
00062                 return EXECRC_EOS;
00063             default:
00064                 permAssert(false);
00065             }
00066         }
00067         inAccessor.unmarshalTuple(inputTuple);
00068         rowCount++;
00069         if (echoData) {
00070             tuplePrinter.print(
00071                 *echoData, inAccessor.getTupleDesc(), inputTuple);
00072         }
00073         if (saveData) {
00074             std::ostringstream oss;
00075             tuplePrinter.print(oss, inAccessor.getTupleDesc(), inputTuple);
00076             const string &s = oss.str();
00077             rowStrings.push_back(s);
00078         }
00079         inAccessor.consumeTuple();
00080     }
00081     return EXECRC_QUANTUM_EXPIRED;
00082 }
00083 
00084 void MockConsumerExecStream::prepare(
00085     MockConsumerExecStreamParams const &params)
00086 {
00087     SingleInputExecStream::prepare(params);
00088     saveData = params.saveData;
00089     echoData = params.echoData;
00090     recvEOS = false;
00091 }
00092 
00093 void MockConsumerExecStream::open(bool restart)
00094 {
00095     SingleInputExecStream::open(restart);
00096     rowCount = 0;
00097     rowStrings.clear();
00098     inputTuple.compute(pInAccessor->getTupleDesc());
00099     recvEOS = false;
00100 }
00101 
00102 
00103 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/MockConsumerExecStream.cpp#13 $");
00104 
00105 // End MockConsumerExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1