ExternalSortMerger.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2004-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/sorter/ExternalSortMerger.h"
00026 #include "fennel/sorter/ExternalSortInfo.h"
00027 #include "fennel/sorter/ExternalSortRunAccessor.h"
00028 #include "fennel/exec/ExecStream.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $");
00031 
00032 ExternalSortMerger::ExternalSortMerger(ExternalSortInfo &sortInfoIn)
00033     : sortInfo(sortInfoIn)
00034 {
00035     nRuns = 0;
00036 
00037     tupleAccessor.compute(sortInfo.tupleDesc);
00038     tupleAccessor2.compute(sortInfo.tupleDesc);
00039 
00040     keyData.compute(sortInfo.keyDesc);
00041     keyData2 = keyData;
00042 
00043     keyAccessor.bind(tupleAccessor,sortInfo.keyProj);
00044     keyAccessor2.bind(tupleAccessor2,sortInfo.keyProj);
00045 
00046     nMergeMemPages = sortInfo.nSortMemPages + 1;
00047 
00048     ppRunAccessors.reset(new SharedExternalSortRunAccessor[nMergeMemPages]);
00049     ppFetchArrays.reset(new ExternalSortFetchArray *[nMergeMemPages]);
00050 
00051     pOrds.reset(new uint[nMergeMemPages]);
00052     memset(pOrds.get(),0,sizeof(uint) * nMergeMemPages);
00053 
00054     mergeInfo.reset(new ExternalSortMergeInfo[nMergeMemPages]);
00055 
00056     fetchArray.ppTupleBuffers = &(ppTupleBuffers[0]);
00057 }
00058 
00059 ExternalSortMerger::~ExternalSortMerger()
00060 {
00061     releaseResources();
00062 }
00063 
00064 void ExternalSortMerger::initRunAccess()
00065 {
00066     for (uint i = 1; i < nMergeMemPages; i++) {
00067         ppRunAccessors[i].reset(new ExternalSortRunAccessor(sortInfo));
00068         ppRunAccessors[i]->initRead();
00069         ppFetchArrays[i] = &(ppRunAccessors[i]->bindFetchArray());
00070     }
00071 }
00072 
00073 void ExternalSortMerger::releaseResources()
00074 {
00075     ppRunAccessors.reset();
00076     ppFetchArrays.reset();
00077     pOrds.reset();
00078     mergeInfo.reset();
00079 }
00080 
00081 void ExternalSortMerger::startMerge(
00082     std::vector<SharedSegStreamAllocation>::iterator pStoredRun,
00083     uint nRunsToMerge)
00084 {
00085     assert (nRunsToMerge < nMergeMemPages);
00086 
00087     nRuns = nRunsToMerge;
00088 
00089     for (uint i = 1; i < nMergeMemPages; i++) {
00090         ppRunAccessors[i]->resetRead();
00091         pOrds[i] = 0;
00092     }
00093 
00094     for (uint i = 1; i <= nRuns; i++) {
00095         ppRunAccessors[i]->startRead(pStoredRun[i - 1]);
00096         ppRunAccessors[i]->fetch(EXTSORT_FETCH_ARRAY_SIZE);
00097         mergeInfo[i].val = ppFetchArrays[i]->ppTupleBuffers[0];
00098         mergeInfo[i].runOrd = i;
00099     }
00100     fetchArray.nTuples = 0;
00101 
00102     heapBuild();
00103 }
00104 
00105 ExternalSortFetchArray &ExternalSortMerger::bindFetchArray()
00106 {
00107     return fetchArray;
00108 }
00109 
00110 inline uint ExternalSortMerger::heapParent(uint i)
00111 {
00112     return (i >> 1);
00113 }
00114 
00115 inline uint ExternalSortMerger::heapLeft(uint i)
00116 {
00117     return (i << 1);
00118 }
00119 
00120 inline uint ExternalSortMerger::heapRight(uint i)
00121 {
00122     return (i << 1) + 1;
00123 }
00124 
00125 inline void ExternalSortMerger::heapExchange(uint i,uint j)
00126 {
00127     std::swap(mergeInfo[i],mergeInfo[j]);
00128 }
00129 
00130 inline ExternalSortMergeInfo &ExternalSortMerger::getMergeHigh()
00131 {
00132     return mergeInfo[1];
00133 }
00134 
00135 void ExternalSortMerger::heapify(uint i)
00136 {
00137     uint l, r, highest = i;
00138 
00139     l = heapLeft(i);
00140     r = heapRight(i);
00141 
00142     if (l <= nRuns) {
00143         tupleAccessor.setCurrentTupleBuf(mergeInfo[l].val);
00144         tupleAccessor2.setCurrentTupleBuf(mergeInfo[i].val);
00145         keyAccessor.unmarshal(keyData);
00146         keyAccessor2.unmarshal(keyData2);
00147         if (sortInfo.compareKeys(keyData,keyData2) < 0) {
00148             highest = l;
00149         }
00150     }
00151 
00152     if (r <= nRuns) {
00153         tupleAccessor.setCurrentTupleBuf(mergeInfo[r].val);
00154         tupleAccessor2.setCurrentTupleBuf(mergeInfo[highest].val);
00155         keyAccessor.unmarshal(keyData);
00156         keyAccessor2.unmarshal(keyData2);
00157         if (sortInfo.compareKeys(keyData,keyData2) < 0) {
00158             highest = r;
00159         }
00160     }
00161 
00162     if (highest != i) {
00163         heapExchange(highest,i);
00164         heapify(highest);
00165     }
00166 }
00167 
00168 void ExternalSortMerger::heapBuild()
00169 {
00170     for (uint i = (nRuns / 2); i > 0; i--) {
00171         heapify(i);
00172     }
00173 }
00174 
00175 ExternalSortRC ExternalSortMerger::checkFetch()
00176 {
00177     if (nRuns == 0) {
00178         return EXTSORT_ENDOFDATA;
00179     }
00180     if (pOrds[getMergeHigh().runOrd]
00181         >= ppFetchArrays[getMergeHigh().runOrd]->nTuples)
00182     {
00183         ExternalSortRC rc = ppRunAccessors[getMergeHigh().runOrd]->fetch(
00184             EXTSORT_FETCH_ARRAY_SIZE);
00185         if (rc != EXTSORT_SUCCESS) {
00186             assert(rc == EXTSORT_ENDOFDATA);
00187             heapExchange(1, nRuns);
00188             if (--nRuns == 0) {
00189                 return EXTSORT_ENDOFDATA;
00190             }
00191         } else {
00192             pOrds[getMergeHigh().runOrd] = 0;
00193         }
00194     }
00195 
00196     return EXTSORT_SUCCESS;
00197 }
00198 
00199 ExternalSortRC ExternalSortMerger::fetch(uint nTuplesRequested)
00200 {
00201     sortInfo.stream.checkAbort();
00202 
00203     if (nTuplesRequested > EXTSORT_FETCH_ARRAY_SIZE) {
00204         nTuplesRequested = EXTSORT_FETCH_ARRAY_SIZE;
00205     }
00206 
00207     ExternalSortRC rc = checkFetch();
00208     if (rc != EXTSORT_SUCCESS) {
00209         // error handling will be done by checkFetch here
00210         return rc;
00211     }
00212 
00213     fetchArray.nTuples = 0;
00214     do {
00215         getMergeHigh().val =
00216             ppFetchArrays[getMergeHigh().runOrd]->ppTupleBuffers[
00217                 pOrds[getMergeHigh().runOrd]];
00218 
00219         heapify(1);
00220 
00221         fetchArray.ppTupleBuffers[fetchArray.nTuples] = getMergeHigh().val;
00222         fetchArray.nTuples++;
00223         nTuplesRequested--;
00224 
00225         pOrds[getMergeHigh().runOrd]++;
00226     } while (nTuplesRequested
00227               && (pOrds[getMergeHigh().runOrd]
00228                   < ppFetchArrays[getMergeHigh().runOrd]->nTuples));
00229 
00230     return EXTSORT_SUCCESS;
00231 }
00232 
00233 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $");
00234 
00235 // End ExternalSortMerger.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1