00001 /* 00002 // $Id: //open/dev/fennel/exec/ScratchBufferExecStream.cpp#14 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/ScratchBufferExecStream.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ScratchBufferExecStream.cpp#14 $"); 00029 00030 void ScratchBufferExecStream::prepare( 00031 ScratchBufferExecStreamParams const ¶ms) 00032 { 00033 ConduitExecStream::prepare(params); 00034 scratchAccessor = params.scratchAccessor; 00035 bufferLock.accessSegment(scratchAccessor); 00036 } 00037 00038 void ScratchBufferExecStream::getResourceRequirements( 00039 ExecStreamResourceQuantity &minQuantity, 00040 ExecStreamResourceQuantity &optQuantity) 00041 { 00042 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00043 00044 // one scratch page 00045 minQuantity.nCachePages += 1; 00046 00047 optQuantity = minQuantity; 00048 } 00049 00050 void ScratchBufferExecStream::open(bool restart) 00051 { 00052 ConduitExecStream::open(restart); 00053 00054 assert(pInAccessor); 00055 assert(pInAccessor->getProvision() == BUFPROV_CONSUMER); 00056 00057 assert(pOutAccessor); 00058 assert(pOutAccessor->getProvision() == BUFPROV_PRODUCER); 00059 00060 if (!bufferLock.isLocked()) { 00061 bufferLock.allocatePage(); 00062 } 00063 00064 pInAccessor->provideBufferForProduction( 00065 bufferLock.getPage().getWritableData(), 00066 bufferLock.getPage().getWritableData() 00067 + bufferLock.getPage().getCache().getPageSize(), 00068 true); 00069 00070 pLastConsumptionEnd = NULL; 00071 } 00072 00073 ExecStreamResult ScratchBufferExecStream::execute(ExecStreamQuantum const &) 00074 { 00075 switch (pOutAccessor->getState()) { 00076 case EXECBUF_NONEMPTY: 00077 case EXECBUF_OVERFLOW: 00078 return EXECRC_BUF_OVERFLOW; 00079 case EXECBUF_UNDERFLOW: 00080 case EXECBUF_EMPTY: 00081 if (pLastConsumptionEnd) { 00082 // Since our output buf is empty, the downstream consumer 00083 // must have consumed everything up to the last byte we 00084 // told it was available; pass that information on to our 00085 // upstream producer. 00086 pInAccessor->consumeData(pLastConsumptionEnd); 00087 pLastConsumptionEnd = NULL; 00088 } 00089 break; 00090 case EXECBUF_EOS: 00091 assert(pInAccessor->getState() == EXECBUF_EOS); 00092 return EXECRC_EOS; 00093 } 00094 switch (pInAccessor->getState()) { 00095 case EXECBUF_OVERFLOW: 00096 case EXECBUF_NONEMPTY: 00097 if (!pLastConsumptionEnd) { 00098 pLastConsumptionEnd = pInAccessor->getConsumptionEnd(); 00099 pOutAccessor->provideBufferForConsumption( 00100 pInAccessor->getConsumptionStart(), 00101 pLastConsumptionEnd); 00102 } 00103 return EXECRC_BUF_OVERFLOW; 00104 case EXECBUF_UNDERFLOW: 00105 return EXECRC_BUF_UNDERFLOW; 00106 case EXECBUF_EMPTY: 00107 pInAccessor->requestProduction(); 00108 return EXECRC_BUF_UNDERFLOW; 00109 case EXECBUF_EOS: 00110 pOutAccessor->markEOS(); 00111 return EXECRC_EOS; 00112 default: 00113 permAssert(false); 00114 } 00115 } 00116 00117 void ScratchBufferExecStream::closeImpl() 00118 { 00119 bufferLock.unlock(); 00120 ConduitExecStream::closeImpl(); 00121 } 00122 00123 ExecStreamBufProvision ScratchBufferExecStream::getOutputBufProvision() const 00124 { 00125 return BUFPROV_PRODUCER; 00126 } 00127 00128 ExecStreamBufProvision ScratchBufferExecStream::getInputBufProvision() const 00129 { 00130 return BUFPROV_CONSUMER; 00131 } 00132 00133 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ScratchBufferExecStream.cpp#14 $"); 00134 00135 // End ScratchBufferExecStream.cpp