00001 /* 00002 // $Id: //open/dev/fennel/exec/DoubleBufferExecStream.cpp#5 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/DoubleBufferExecStream.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/DoubleBufferExecStream.cpp#5 $"); 00029 00030 void DoubleBufferExecStream::prepare( 00031 DoubleBufferExecStreamParams const ¶ms) 00032 { 00033 ConduitExecStream::prepare(params); 00034 scratchAccessor = params.scratchAccessor; 00035 bufferLock1.accessSegment(scratchAccessor); 00036 bufferLock2.accessSegment(scratchAccessor); 00037 } 00038 00039 void DoubleBufferExecStream::getResourceRequirements( 00040 ExecStreamResourceQuantity &minQuantity, 00041 ExecStreamResourceQuantity &optQuantity) 00042 { 00043 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00044 00045 // two scratch pages 00046 minQuantity.nCachePages += 2; 00047 00048 optQuantity = minQuantity; 00049 } 00050 00051 void DoubleBufferExecStream::open(bool restart) 00052 { 00053 ConduitExecStream::open(restart); 00054 00055 assert(pInAccessor); 00056 assert(pInAccessor->getProvision() == BUFPROV_CONSUMER); 00057 00058 assert(pOutAccessor); 00059 assert(pOutAccessor->getProvision() == BUFPROV_PRODUCER); 00060 00061 if (!bufferLock1.isLocked()) { 00062 bufferLock1.allocatePage(); 00063 } 00064 00065 if (!bufferLock2.isLocked()) { 00066 bufferLock2.allocatePage(); 00067 } 00068 00069 // until the first swap, only the back buffer is active 00070 pFrontBuffer = NULL; 00071 pBackBuffer = bufferLock1.getPage().getWritableData(); 00072 00073 pInAccessor->provideBufferForProduction( 00074 pBackBuffer, 00075 pBackBuffer + bufferLock1.getPage().getCache().getPageSize(), 00076 false); 00077 } 00078 00079 ExecStreamResult DoubleBufferExecStream::execute(ExecStreamQuantum const &) 00080 { 00081 if (pFrontBuffer) { 00082 // both front and back buffers are active 00083 switch (pOutAccessor->getState()) { 00084 case EXECBUF_NONEMPTY: 00085 case EXECBUF_OVERFLOW: 00086 // consumer isn't done with front buffer, so we can't swap yet 00087 return EXECRC_BUF_OVERFLOW; 00088 case EXECBUF_UNDERFLOW: 00089 case EXECBUF_EMPTY: 00090 // consumer has finished reading, so fall through to check 00091 // on producer 00092 break; 00093 case EXECBUF_EOS: 00094 assert(pInAccessor->getState() == EXECBUF_EOS); 00095 return EXECRC_EOS; 00096 } 00097 } 00098 switch (pInAccessor->getState()) { 00099 case EXECBUF_NONEMPTY: 00100 case EXECBUF_OVERFLOW: 00101 // producer has given us data, so fall through to swap 00102 break; 00103 case EXECBUF_UNDERFLOW: 00104 // producer hasn't written yet 00105 return EXECRC_BUF_UNDERFLOW; 00106 case EXECBUF_EMPTY: 00107 pInAccessor->requestProduction(); 00108 return EXECRC_BUF_UNDERFLOW; 00109 case EXECBUF_EOS: 00110 if (pOutAccessor->getState() == EXECBUF_EOS) { 00111 return EXECRC_EOS; 00112 } else { 00113 // done with back buffer now, but we're not really EOS until 00114 // front buffer is consumed too 00115 pBackBuffer = NULL; 00116 pOutAccessor->markEOS(); 00117 return EXECRC_BUF_OVERFLOW; 00118 } 00119 default: 00120 permAssert(false); 00121 } 00122 00123 if (!pFrontBuffer) { 00124 // from here on both front buffer and back buffer are active 00125 pFrontBuffer = bufferLock2.getPage().getWritableData(); 00126 } 00127 00128 std::swap(pFrontBuffer, pBackBuffer); 00129 pOutAccessor->provideBufferForConsumption( 00130 pFrontBuffer, 00131 pInAccessor->getConsumptionEnd()); 00132 pInAccessor->consumeData(pInAccessor->getConsumptionEnd()); 00133 if (pInAccessor->getState() == EXECBUF_EOS) { 00134 pBackBuffer = NULL; 00135 pOutAccessor->markEOS(); 00136 return EXECRC_BUF_OVERFLOW; 00137 } 00138 pInAccessor->provideBufferForProduction( 00139 pBackBuffer, 00140 pBackBuffer + bufferLock1.getPage().getCache().getPageSize(), 00141 false); 00142 return EXECRC_BUF_UNDERFLOW; 00143 } 00144 00145 void DoubleBufferExecStream::closeImpl() 00146 { 00147 pFrontBuffer = NULL; 00148 pBackBuffer = NULL; 00149 bufferLock1.unlock(); 00150 bufferLock2.unlock(); 00151 ConduitExecStream::closeImpl(); 00152 } 00153 00154 ExecStreamBufProvision DoubleBufferExecStream::getOutputBufProvision() const 00155 { 00156 return BUFPROV_PRODUCER; 00157 } 00158 00159 ExecStreamBufProvision DoubleBufferExecStream::getInputBufProvision() const 00160 { 00161 return BUFPROV_CONSUMER; 00162 } 00163 00164 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/DoubleBufferExecStream.cpp#5 $"); 00165 00166 // End DoubleBufferExecStream.cpp