MockProducerExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/MockProducerExecStream.cpp#15 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/MockProducerExecStream.h"
00026 #include "fennel/tuple/TupleAccessor.h"
00027 #include "fennel/tuple/TuplePrinter.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include <boost/scoped_array.hpp>
00031 
00032 
00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/MockProducerExecStream.cpp#15 $");
00034 
00035 MockProducerExecStream::MockProducerExecStream()
00036 {
00037     cbTuple = 0;
00038     nRowsProduced = nRowsMax = 0;
00039     saveTuples = false;
00040     echoTuples = 0;
00041 }
00042 
00043 void MockProducerExecStream::prepare(MockProducerExecStreamParams const &params)
00044 {
00045     SingleOutputExecStream::prepare(params);
00046     pGenerator = params.pGenerator;
00047     pBatchGenerator = params.pBatchGenerator;
00048     for (uint i = 0; i < params.outputTupleDesc.size(); i++) {
00049         assert(!params.outputTupleDesc[i].isNullable);
00050         StandardTypeDescriptorOrdinal ordinal =
00051             StandardTypeDescriptorOrdinal(
00052                 params.outputTupleDesc[i].pTypeDescriptor->getOrdinal());
00053         assert(StandardTypeDescriptor::isIntegralNative(ordinal));
00054         if (pGenerator) {
00055             assert(ordinal == STANDARD_TYPE_INT_64);
00056         }
00057     }
00058     outputData.compute(params.outputTupleDesc);
00059     TupleAccessor &tupleAccessor = pOutAccessor->getScratchTupleAccessor();
00060     assert(tupleAccessor.isFixedWidth());
00061     cbTuple = tupleAccessor.getMaxByteCount();
00062     nRowsMax = params.nRows;
00063     saveTuples = params.saveTuples;
00064     echoTuples = params.echoTuples;
00065     if (saveTuples || echoTuples) {
00066         assert(pGenerator);
00067     }
00068 }
00069 
00070 void MockProducerExecStream::open(bool restart)
00071 {
00072     SingleOutputExecStream::open(restart);
00073     nRowsProduced = 0;
00074     savedTuples.clear();
00075     if (saveTuples) {
00076         // assume it's not too big
00077         savedTuples.reserve(nRowsMax);
00078     }
00079 }
00080 
00081 ExecStreamResult MockProducerExecStream::execute(
00082     ExecStreamQuantum const &quantum)
00083 {
00084     if (pGenerator) {
00085         TuplePrinter tuplePrinter;
00086         uint nTuples = 0;
00087         boost::scoped_array<int64_t> values(new int64_t[outputData.size()]);
00088         for (int col = 0; col < outputData.size(); ++col) {
00089             outputData[col].pData = reinterpret_cast<PConstBuffer>(
00090                 &(values.get()[col]));
00091         }
00092         while (nRowsProduced < nRowsMax) {
00093             if (pOutAccessor->getProductionAvailable() < cbTuple) {
00094                 return EXECRC_BUF_OVERFLOW;
00095             }
00096 
00097             if (pBatchGenerator) {
00098                 int64_t newBatch = pBatchGenerator->next();
00099                 if (newBatch == 0) {
00100                     return EXECRC_QUANTUM_EXPIRED;
00101                 }
00102             }
00103 
00104             for (int col = 0; col < outputData.size(); ++col) {
00105                 values.get()[col] =
00106                     pGenerator->generateValue(nRowsProduced, col);
00107             }
00108 
00109             bool rc = pOutAccessor->produceTuple(outputData);
00110             assert(rc);
00111             ++nTuples;
00112             ++nRowsProduced;
00113             if (echoTuples) {
00114                 tuplePrinter.print(
00115                     *echoTuples,
00116                     pOutAccessor->getTupleDesc(), outputData);
00117             }
00118             if (saveTuples) {
00119                 std::ostringstream oss;
00120                 tuplePrinter.print(
00121                     oss, pOutAccessor->getTupleDesc(), outputData);
00122                 savedTuples.push_back(oss.str());
00123             }
00124             if (nTuples >= quantum.nTuplesMax) {
00125                 return EXECRC_QUANTUM_EXPIRED;
00126             }
00127         }
00128         pOutAccessor->markEOS();
00129         return EXECRC_EOS;
00130     }
00131 
00132     // NOTE: implementation below is kept lean and mean
00133     // intentionally so that it can be used to drive other streams with minimal
00134     // overhead during profiling
00135 
00136     uint cb = pOutAccessor->getProductionAvailable();
00137     uint nRows = std::min<uint64_t>(nRowsMax - nRowsProduced, cb / cbTuple);
00138     uint cbBatch = nRows * cbTuple;
00139 
00140     // TODO:  pOutAccessor->validateTupleSize(?);
00141     if (cbBatch) {
00142         cb -= cbBatch;
00143         nRowsProduced += nRows;
00144         PBuffer pBuffer = pOutAccessor->getProductionStart();
00145         memset(pBuffer,0,cbBatch);
00146         pOutAccessor->produceData(pBuffer + cbBatch);
00147         pOutAccessor->requestConsumption();
00148     }
00149     if (nRowsProduced == nRowsMax) {
00150         pOutAccessor->markEOS();
00151         return EXECRC_EOS;
00152     } else {
00153         return EXECRC_BUF_OVERFLOW;
00154     }
00155 }
00156 
00157 uint64_t MockProducerExecStream::getProducedRowCount()
00158 {
00159     uint waitingRowCount = pOutAccessor->getConsumptionTuplesAvailable();
00160     return nRowsProduced - waitingRowCount;
00161 }
00162 
00163 MockProducerExecStreamGenerator::~MockProducerExecStreamGenerator()
00164 {
00165 }
00166 
00167 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/MockProducerExecStream.cpp#15 $");
00168 
00169 // End MockProducerExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1