00001 /* 00002 // $Id: //open/dev/fennel/exec/MockResourceExecStream.cpp#7 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/common/FennelExcn.h" 00026 #include "fennel/tuple/StandardTypeDescriptor.h" 00027 #include "fennel/exec/MockResourceExecStream.h" 00028 #include "fennel/exec/ExecStreamBufAccessor.h" 00029 00030 #include <sstream> 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/MockResourceExecStream.cpp#7 $"); 00033 00034 void MockResourceExecStream::prepare( 00035 MockResourceExecStreamParams const ¶ms) 00036 { 00037 SingleOutputExecStream::prepare(params); 00038 minReqt = params.minReqt; 00039 optReqt = params.optReqt; 00040 expected = params.expected; 00041 00042 optTypeInput = params.optTypeInput; 00043 00044 scratchAccessor = params.scratchAccessor; 00045 scratchLock.accessSegment(scratchAccessor); 00046 00047 // setup output tuple 00048 assert(pOutAccessor->getTupleDesc().size() == 1); 00049 StandardTypeDescriptorFactory stdTypeFactory; 00050 TupleAttributeDescriptor expectedOutputDesc( 00051 stdTypeFactory.newDataType(STANDARD_TYPE_INT_8)); 00052 assert(pOutAccessor->getTupleDesc()[0] == expectedOutputDesc); 00053 outputTuple.compute(pOutAccessor->getTupleDesc()); 00054 outputTupleAccessor = &pOutAccessor->getScratchTupleAccessor(); 00055 } 00056 00057 void MockResourceExecStream::getResourceRequirements( 00058 ExecStreamResourceQuantity &minQuantity, 00059 ExecStreamResourceQuantity &optQuantity, 00060 ExecStreamResourceSettingType &optType) 00061 { 00062 minQuantity = minReqt; 00063 optQuantity = optReqt; 00064 optType = optTypeInput; 00065 } 00066 00067 void MockResourceExecStream::setResourceAllocation( 00068 ExecStreamResourceQuantity &quantity) 00069 { 00070 numToAllocate = quantity.nCachePages; 00071 } 00072 00073 void MockResourceExecStream::open(bool restart) 00074 { 00075 SingleOutputExecStream::open(restart); 00076 isDone = false; 00077 if (!restart) { 00078 outputTupleBuffer.reset( 00079 new FixedBuffer[outputTupleAccessor->getMaxByteCount()]); 00080 } 00081 } 00082 00083 ExecStreamResult MockResourceExecStream::execute( 00084 ExecStreamQuantum const &quantum) 00085 { 00086 if (isDone) { 00087 pOutAccessor->markEOS(); 00088 return EXECRC_EOS; 00089 } 00090 00091 uint numAllocated = 0; 00092 00093 if (numToAllocate == expected.nCachePages) { 00094 for (uint i = 0; i < numToAllocate; i++) { 00095 // REVIEW jvs 8-Sept--2006: The NULL_PAGE_ID case will never 00096 // happen, because allocatePage asserts; you probably meant to use 00097 // tryAllocatePage instead. But the case must never actually be 00098 // tested anyway, because on lu/dev after I changed the cache back 00099 // to retry forever on lockScratchPage, no test hangs. 00100 00101 PageId page = scratchLock.allocatePage(); 00102 // if we can't allocate a page, break out of the loop; the stream 00103 // will return 0 instead of 1 00104 if (page == NULL_PAGE_ID) { 00105 break; 00106 } 00107 numAllocated++; 00108 } 00109 } 00110 00111 int8_t outputIndicator = (numAllocated == numToAllocate) ? 1 : 0; 00112 outputTuple[0].pData = (PConstBuffer) &outputIndicator; 00113 outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get()); 00114 pOutAccessor->provideBufferForConsumption( 00115 outputTupleBuffer.get(), 00116 outputTupleBuffer.get() + outputTupleAccessor->getCurrentByteCount()); 00117 isDone = true; 00118 return EXECRC_BUF_OVERFLOW; 00119 } 00120 00121 void MockResourceExecStream::closeImpl() 00122 { 00123 SingleOutputExecStream::closeImpl(); 00124 if (scratchAccessor.pSegment) { 00125 scratchAccessor.pSegment->deallocatePageRange( 00126 NULL_PAGE_ID, NULL_PAGE_ID); 00127 } 00128 outputTupleBuffer.reset(); 00129 } 00130 00131 ExecStreamBufProvision MockResourceExecStream::getOutputBufProvision() const 00132 { 00133 return BUFPROV_PRODUCER; 00134 } 00135 00136 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/MockResourceExecStream.cpp#7 $"); 00137 00138 // End MockResourceExecStream.cpp