00001 /* 00002 // $Id: //open/dev/fennel/ftrs/BTreeSortExecStream.cpp#12 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/ftrs/BTreeSortExecStream.h" 00026 #include "fennel/btree/BTreeWriter.h" 00027 #include "fennel/tuple/TupleDescriptor.h" 00028 #include "fennel/exec/ExecStreamBufAccessor.h" 00029 00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreeSortExecStream.cpp#12 $"); 00031 00032 void BTreeSortExecStream::prepare(BTreeSortExecStreamParams const ¶ms) 00033 { 00034 assert(params.rootPageId == NULL_PAGE_ID); 00035 assert(!params.pRootMap); 00036 00037 BTreeInsertExecStream::prepare(params); 00038 dynamicBTree = true; 00039 truncateOnRestart = true; 00040 } 00041 00042 // REVIEW: do we ever want to save results on restart? 00043 void BTreeSortExecStream::open(bool restart) 00044 { 00045 sorted = false; 00046 BTreeInsertExecStream::open(restart); 00047 } 00048 00049 ExecStreamResult BTreeSortExecStream::execute( 00050 ExecStreamQuantum const &quantum) 00051 { 00052 if (!sorted) { 00053 if (pInAccessor->getState() == EXECBUF_EOS) { 00054 sorted = true; 00055 bool found = pWriter->searchFirst(); 00056 if (!found) { 00057 pWriter->endSearch(); 00058 } 00059 } else { 00060 return BTreeInsertExecStream::execute(quantum); 00061 } 00062 } 00063 00064 if (!pWriter->isPositioned()) { 00065 pOutAccessor->markEOS(); 00066 return EXECRC_EOS; 00067 } 00068 00069 if (pOutAccessor->getState() == EXECBUF_OVERFLOW) { 00070 return EXECRC_BUF_OVERFLOW; 00071 } 00072 00073 uint nTuples = 0; 00074 TupleAccessor const &readAccessor = pWriter->getTupleAccessorForRead(); 00075 00076 do { 00077 uint cbBuffer = pOutAccessor->getProductionAvailable(); 00078 PBuffer pBuffer = pOutAccessor->getProductionStart(); 00079 uint cbTuple = readAccessor.getCurrentByteCount(); 00080 if (cbBuffer < cbTuple) { 00081 pOutAccessor->requestConsumption(); 00082 return EXECRC_BUF_OVERFLOW; 00083 } 00084 memcpy( 00085 pBuffer, 00086 readAccessor.getCurrentTupleBuf(), 00087 cbTuple); 00088 pOutAccessor->produceData(pBuffer + cbTuple); 00089 ++nTuples; 00090 if (!pWriter->searchNext()) { 00091 pWriter->endSearch(); 00092 } 00093 if (nTuples >= quantum.nTuplesMax) { 00094 return EXECRC_QUANTUM_EXPIRED; 00095 } 00096 } while (pWriter->isPositioned()); 00097 return EXECRC_EOS; 00098 } 00099 00100 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreeSortExecStream.cpp#12 $"); 00101 00102 // End BTreeSortExecStream.cpp