00001 /* 00002 // $Id: //open/dev/fennel/exec/MockProducerExecStream.cpp#15 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/MockProducerExecStream.h" 00026 #include "fennel/tuple/TupleAccessor.h" 00027 #include "fennel/tuple/TuplePrinter.h" 00028 #include "fennel/tuple/StandardTypeDescriptor.h" 00029 #include "fennel/exec/ExecStreamBufAccessor.h" 00030 #include <boost/scoped_array.hpp> 00031 00032 00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/MockProducerExecStream.cpp#15 $"); 00034 00035 MockProducerExecStream::MockProducerExecStream() 00036 { 00037 cbTuple = 0; 00038 nRowsProduced = nRowsMax = 0; 00039 saveTuples = false; 00040 echoTuples = 0; 00041 } 00042 00043 void MockProducerExecStream::prepare(MockProducerExecStreamParams const ¶ms) 00044 { 00045 SingleOutputExecStream::prepare(params); 00046 pGenerator = params.pGenerator; 00047 pBatchGenerator = params.pBatchGenerator; 00048 for (uint i = 0; i < params.outputTupleDesc.size(); i++) { 00049 assert(!params.outputTupleDesc[i].isNullable); 00050 StandardTypeDescriptorOrdinal ordinal = 00051 StandardTypeDescriptorOrdinal( 00052 params.outputTupleDesc[i].pTypeDescriptor->getOrdinal()); 00053 assert(StandardTypeDescriptor::isIntegralNative(ordinal)); 00054 if (pGenerator) { 00055 assert(ordinal == STANDARD_TYPE_INT_64); 00056 } 00057 } 00058 outputData.compute(params.outputTupleDesc); 00059 TupleAccessor &tupleAccessor = pOutAccessor->getScratchTupleAccessor(); 00060 assert(tupleAccessor.isFixedWidth()); 00061 cbTuple = tupleAccessor.getMaxByteCount(); 00062 nRowsMax = params.nRows; 00063 saveTuples = params.saveTuples; 00064 echoTuples = params.echoTuples; 00065 if (saveTuples || echoTuples) { 00066 assert(pGenerator); 00067 } 00068 } 00069 00070 void MockProducerExecStream::open(bool restart) 00071 { 00072 SingleOutputExecStream::open(restart); 00073 nRowsProduced = 0; 00074 savedTuples.clear(); 00075 if (saveTuples) { 00076 // assume it's not too big 00077 savedTuples.reserve(nRowsMax); 00078 } 00079 } 00080 00081 ExecStreamResult MockProducerExecStream::execute( 00082 ExecStreamQuantum const &quantum) 00083 { 00084 if (pGenerator) { 00085 TuplePrinter tuplePrinter; 00086 uint nTuples = 0; 00087 boost::scoped_array<int64_t> values(new int64_t[outputData.size()]); 00088 for (int col = 0; col < outputData.size(); ++col) { 00089 outputData[col].pData = reinterpret_cast<PConstBuffer>( 00090 &(values.get()[col])); 00091 } 00092 while (nRowsProduced < nRowsMax) { 00093 if (pOutAccessor->getProductionAvailable() < cbTuple) { 00094 return EXECRC_BUF_OVERFLOW; 00095 } 00096 00097 if (pBatchGenerator) { 00098 int64_t newBatch = pBatchGenerator->next(); 00099 if (newBatch == 0) { 00100 return EXECRC_QUANTUM_EXPIRED; 00101 } 00102 } 00103 00104 for (int col = 0; col < outputData.size(); ++col) { 00105 values.get()[col] = 00106 pGenerator->generateValue(nRowsProduced, col); 00107 } 00108 00109 bool rc = pOutAccessor->produceTuple(outputData); 00110 assert(rc); 00111 ++nTuples; 00112 ++nRowsProduced; 00113 if (echoTuples) { 00114 tuplePrinter.print( 00115 *echoTuples, 00116 pOutAccessor->getTupleDesc(), outputData); 00117 } 00118 if (saveTuples) { 00119 std::ostringstream oss; 00120 tuplePrinter.print( 00121 oss, pOutAccessor->getTupleDesc(), outputData); 00122 savedTuples.push_back(oss.str()); 00123 } 00124 if (nTuples >= quantum.nTuplesMax) { 00125 return EXECRC_QUANTUM_EXPIRED; 00126 } 00127 } 00128 pOutAccessor->markEOS(); 00129 return EXECRC_EOS; 00130 } 00131 00132 // NOTE: implementation below is kept lean and mean 00133 // intentionally so that it can be used to drive other streams with minimal 00134 // overhead during profiling 00135 00136 uint cb = pOutAccessor->getProductionAvailable(); 00137 uint nRows = std::min<uint64_t>(nRowsMax - nRowsProduced, cb / cbTuple); 00138 uint cbBatch = nRows * cbTuple; 00139 00140 // TODO: pOutAccessor->validateTupleSize(?); 00141 if (cbBatch) { 00142 cb -= cbBatch; 00143 nRowsProduced += nRows; 00144 PBuffer pBuffer = pOutAccessor->getProductionStart(); 00145 memset(pBuffer,0,cbBatch); 00146 pOutAccessor->produceData(pBuffer + cbBatch); 00147 pOutAccessor->requestConsumption(); 00148 } 00149 if (nRowsProduced == nRowsMax) { 00150 pOutAccessor->markEOS(); 00151 return EXECRC_EOS; 00152 } else { 00153 return EXECRC_BUF_OVERFLOW; 00154 } 00155 } 00156 00157 uint64_t MockProducerExecStream::getProducedRowCount() 00158 { 00159 uint waitingRowCount = pOutAccessor->getConsumptionTuplesAvailable(); 00160 return nRowsProduced - waitingRowCount; 00161 } 00162 00163 MockProducerExecStreamGenerator::~MockProducerExecStreamGenerator() 00164 { 00165 } 00166 00167 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/MockProducerExecStream.cpp#15 $"); 00168 00169 // End MockProducerExecStream.cpp