00001 /* 00002 // $Id: //open/dev/fennel/exec/BernoulliSamplingExecStream.cpp#4 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2007-2009 The Eigenbase Project 00005 // Copyright (C) 2007-2009 SQLstream, Inc. 00006 // Copyright (C) 2007-2009 LucidEra, Inc. 00007 // 00008 // This program is free software; you can redistribute it and/or modify it 00009 // under the terms of the GNU General Public License as published by the Free 00010 // Software Foundation; either version 2 of the License, or (at your option) 00011 // any later version approved by The Eigenbase Project. 00012 // 00013 // This program is distributed in the hope that it will be useful, 00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 // GNU General Public License for more details. 00017 // 00018 // You should have received a copy of the GNU General Public License 00019 // along with this program; if not, write to the Free Software 00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00021 */ 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/exec/BernoulliSamplingExecStream.h" 00025 #include "fennel/tuple/TupleDescriptor.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/BernoulliSamplingExecStream.cpp#4 $"); 00029 00030 void BernoulliSamplingExecStream::prepare( 00031 BernoulliSamplingExecStreamParams const ¶ms) 00032 { 00033 ConduitExecStream::prepare(params); 00034 00035 samplingRate = params.samplingRate; 00036 isRepeatable = params.isRepeatable; 00037 repeatableSeed = params.repeatableSeed; 00038 00039 samplingRng.reset(new BernoulliRng(samplingRate)); 00040 00041 assert(pInAccessor->getTupleDesc() == pOutAccessor->getTupleDesc()); 00042 00043 data.compute(pOutAccessor->getTupleDesc()); 00044 } 00045 00046 void BernoulliSamplingExecStream::open(bool restart) 00047 { 00048 ConduitExecStream::open(restart); 00049 00050 if (isRepeatable) { 00051 samplingRng->reseed(repeatableSeed); 00052 } else if (!restart) { 00053 samplingRng->reseed(static_cast<uint32_t>(time(0))); 00054 } 00055 00056 producePending = false; 00057 } 00058 00059 ExecStreamResult BernoulliSamplingExecStream::execute( 00060 ExecStreamQuantum const &quantum) 00061 { 00062 ExecStreamResult rc = precheckConduitBuffers(); 00063 if (rc != EXECRC_YIELD) { 00064 return rc; 00065 } 00066 00067 if (producePending) { 00068 if (!pOutAccessor->produceTuple(data)) { 00069 return EXECRC_BUF_OVERFLOW; 00070 } 00071 pInAccessor->consumeTuple(); 00072 producePending = false; 00073 } 00074 00075 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00076 if (!pInAccessor->demandData()) { 00077 return EXECRC_BUF_UNDERFLOW; 00078 } 00079 00080 pInAccessor->accessConsumptionTuple(); 00081 00082 if (!samplingRng->nextValue()) { 00083 pInAccessor->consumeTuple(); 00084 continue; 00085 } 00086 00087 pInAccessor->getConsumptionTupleAccessor().unmarshal(data); 00088 00089 producePending = true; 00090 if (!pOutAccessor->produceTuple(data)) { 00091 return EXECRC_BUF_OVERFLOW; 00092 } 00093 producePending = false; 00094 pInAccessor->consumeTuple(); 00095 } 00096 00097 return EXECRC_QUANTUM_EXPIRED; 00098 } 00099 00100 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/BernoulliSamplingExecStream.cpp#4 $"); 00101 00102 // End BernoulliSamplingExecStream.cpp