00001 /* 00002 // $Id: //open/dev/fennel/sorter/ExternalSortRunAccessor.cpp#1 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2009-2009 SQLstream, Inc. 00006 // Copyright (C) 2004-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/sorter/ExternalSortRunAccessor.h" 00026 #include "fennel/sorter/ExternalSortInfo.h" 00027 #include "fennel/segment/SegInputStream.h" 00028 #include "fennel/segment/SegOutputStream.h" 00029 #include "fennel/segment/SegStreamAllocation.h" 00030 #include "fennel/exec/ExecStream.h" 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunAccessor.cpp#1 $"); 00033 00034 ExternalSortRunAccessor::ExternalSortRunAccessor(ExternalSortInfo &sortInfoIn) 00035 : sortInfo(sortInfoIn) 00036 { 00037 releaseResources(); 00038 00039 tupleAccessor.compute(sortInfo.tupleDesc); 00040 } 00041 00042 ExternalSortRunAccessor::~ExternalSortRunAccessor() 00043 { 00044 releaseResources(); 00045 } 00046 00047 SharedSegStreamAllocation ExternalSortRunAccessor::getStoredRun() 00048 { 00049 return pStoredRun; 00050 } 00051 00052 void ExternalSortRunAccessor::startRead( 00053 SharedSegStreamAllocation pStoredRunInit) 00054 { 00055 pStoredRun = pStoredRunInit; 00056 pStoredRun->getInputStream()->startPrefetch(); 00057 } 00058 00059 void ExternalSortRunAccessor::resetRead() 00060 { 00061 fetchArray.nTuples = 0; 00062 } 00063 00064 void ExternalSortRunAccessor::initRead() 00065 { 00066 releaseResources(); 00067 tupleAccessor.compute(sortInfo.tupleDesc); 00068 } 00069 00070 void ExternalSortRunAccessor::releaseResources() 00071 { 00072 pStoredRun.reset(); 00073 clearFetch(); 00074 } 00075 00076 void ExternalSortRunAccessor::storeRun( 00077 ExternalSortSubStream &pObjLoad) 00078 { 00079 pStoredRun = SegStreamAllocation::newSegStreamAllocation(); 00080 00081 SharedSegOutputStream pSegOutputStream = 00082 SegOutputStream::newSegOutputStream( 00083 sortInfo.externalSegmentAccessor); 00084 pStoredRun->beginWrite(pSegOutputStream); 00085 00086 ExternalSortFetchArray &fetchArray = pObjLoad.bindFetchArray(); 00087 00088 ExternalSortRC rc; 00089 uint iTuple = 0; 00090 do { 00091 for (; iTuple < fetchArray.nTuples; iTuple++) { 00092 PBuffer pSrcBuf = fetchArray.ppTupleBuffers[iTuple]; 00093 uint cbTuple = tupleAccessor.getBufferByteCount(pSrcBuf); 00094 PBuffer pTarget = pSegOutputStream->getWritePointer(cbTuple); 00095 memcpy(pTarget,pSrcBuf,cbTuple); 00096 pSegOutputStream->consumeWritePointer(cbTuple); 00097 } 00098 iTuple = 0; 00099 00100 rc = pObjLoad.fetch(EXTSORT_FETCH_ARRAY_SIZE); 00101 } while (rc == EXTSORT_SUCCESS); 00102 00103 assert(rc == EXTSORT_ENDOFDATA); 00104 00105 pStoredRun->endWrite(); 00106 } 00107 00108 ExternalSortFetchArray &ExternalSortRunAccessor::bindFetchArray() 00109 { 00110 return fetchArray; 00111 } 00112 00113 ExternalSortRC ExternalSortRunAccessor::fetch(uint nTuplesRequested) 00114 { 00115 sortInfo.stream.checkAbort(); 00116 00117 if (nTuplesRequested > EXTSORT_FETCH_ARRAY_SIZE) { 00118 nTuplesRequested = EXTSORT_FETCH_ARRAY_SIZE; 00119 } 00120 00121 uint cb; 00122 SharedSegInputStream const &pSegInputStream = pStoredRun->getInputStream(); 00123 PConstBuffer pStart = pSegInputStream->getReadPointer(1,&cb); 00124 PConstBuffer pBuf = pStart; 00125 if (!pBuf) { 00126 return EXTSORT_ENDOFDATA; 00127 } 00128 PConstBuffer pStopMark = pBuf + cb; 00129 uint cbTuple; 00130 00131 fetchArray.nTuples = 0; 00132 while (nTuplesRequested-- && (pBuf < pStopMark)) { 00133 ppTupleBuffers[fetchArray.nTuples] = const_cast<PBuffer>(pBuf); 00134 cbTuple = tupleAccessor.getBufferByteCount(pBuf); 00135 pBuf += cbTuple; 00136 fetchArray.nTuples++; 00137 } 00138 pSegInputStream->consumeReadPointer(pBuf - pStart); 00139 00140 return EXTSORT_SUCCESS; 00141 } 00142 00143 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunAccessor.cpp#1 $"); 00144 00145 // End ExternalSortRunAccessor.cpp