ExternalSortRunLoader.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2004-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/sorter/ExternalSortRunLoader.h"
00026 #include "fennel/sorter/ExternalSortInfo.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $");
00030 
00031 ExternalSortRunLoader::ExternalSortRunLoader(ExternalSortInfo &sortInfoIn)
00032     : sortInfo(sortInfoIn)
00033 {
00034     nMemPagesMax = 0;
00035     pDataBuffer = NULL;
00036     pIndexBuffer = NULL;
00037 
00038     nTuplesLoaded = nTuplesFetched = 0;
00039 
00040     runningParallelTask = false;
00041 
00042     bufferLock.accessSegment(sortInfo.memSegmentAccessor);
00043 
00044     tupleAccessor.compute(sortInfo.tupleDesc);
00045     tupleAccessor2.compute(sortInfo.tupleDesc);
00046 
00047     keyData.compute(sortInfo.keyDesc);
00048     keyData2 = keyData;
00049 
00050     keyAccessor.bind(tupleAccessor,sortInfo.keyProj);
00051     keyAccessor2.bind(tupleAccessor2,sortInfo.keyProj);
00052 
00053     // TODO:  utility methods for calculations below, and assert block
00054     // size is power of 2
00055     uint nKeysPerPage = sortInfo.cbPage / sizeof(PBuffer);
00056     assert(nKeysPerPage > 1);
00057     nKeysPerPage >>= 1;
00058     indexToPageShift = 1;
00059     indexPageMask = 1;
00060     while ((nKeysPerPage & indexPageMask) == 0) {
00061         ++indexToPageShift;
00062         indexPageMask <<= 1;
00063         ++indexPageMask;
00064     }
00065 }
00066 
00067 ExternalSortRunLoader::~ExternalSortRunLoader()
00068 {
00069     releaseResources();
00070 }
00071 
00072 inline PBuffer &ExternalSortRunLoader::getPointerArrayEntry(uint iTuple)
00073 {
00074     // REVIEW jvs 12-June-2004:  This is the price we pay for not using a big
00075     // linear array.  Is it too expensive?
00076     uint iPage = iTuple >> indexToPageShift;
00077     uint iSubKey = iTuple & indexPageMask;
00078     PBuffer *pPage = reinterpret_cast<PBuffer *>(indexBuffers[iPage]);
00079     return pPage[iSubKey];
00080 }
00081 
00082 void ExternalSortRunLoader::startRun()
00083 {
00084     if (!nMemPagesMax) {
00085         nMemPagesMax = sortInfo.nSortMemPagesPerRun;
00086     }
00087 
00088     freeBuffers.insert(
00089         freeBuffers.end(),indexBuffers.begin(),indexBuffers.end());
00090     freeBuffers.insert(
00091         freeBuffers.end(),dataBuffers.begin(),dataBuffers.end());
00092     indexBuffers.clear();
00093     dataBuffers.clear();
00094     pDataBuffer = NULL;
00095     pIndexBuffer = NULL;
00096 
00097     if (!allocateDataBuffer()) {
00098         permAssert(false);
00099     }
00100 
00101     if (!allocateIndexBuffer()) {
00102         permAssert(false);
00103     }
00104 
00105     nTuplesLoaded = nTuplesFetched = 0;
00106 
00107     fetchArray.nTuples = 0;
00108 }
00109 
00110 PBuffer ExternalSortRunLoader::allocateBuffer()
00111 {
00112     PBuffer pBuffer;
00113 
00114     if (!freeBuffers.empty()) {
00115         pBuffer = freeBuffers.back();
00116         freeBuffers.pop_back();
00117         return pBuffer;
00118     }
00119 
00120     if ((indexBuffers.size() + dataBuffers.size()) >= nMemPagesMax) {
00121         return NULL;
00122     }
00123 
00124     bufferLock.allocatePage();
00125     pBuffer = bufferLock.getPage().getWritableData();
00126 
00127     // REVIEW jvs 12-June-2004:  we rely on the fact that the underlying
00128     // ScratchSegment keeps the page pinned for us; need to make this
00129     // official.
00130     bufferLock.unlock();
00131 
00132     return pBuffer;
00133 }
00134 
00135 bool ExternalSortRunLoader::allocateDataBuffer()
00136 {
00137     pDataBuffer = allocateBuffer();
00138     if (pDataBuffer) {
00139         dataBuffers.push_back(pDataBuffer);
00140         pDataBufferEnd = pDataBuffer + sortInfo.cbPage;
00141         return true;
00142     } else {
00143         return false;
00144     }
00145 }
00146 
00147 bool ExternalSortRunLoader::allocateIndexBuffer()
00148 {
00149     pIndexBuffer = allocateBuffer();
00150     if (pIndexBuffer) {
00151         indexBuffers.push_back(pIndexBuffer);
00152         pIndexBufferEnd = pIndexBuffer + sortInfo.cbPage;
00153         return true;
00154     } else {
00155         return false;
00156     }
00157 }
00158 
00159 void ExternalSortRunLoader::releaseResources()
00160 {
00161     freeBuffers.clear();
00162     indexBuffers.clear();
00163     dataBuffers.clear();
00164     nMemPagesMax = 0;
00165 
00166     // REVIEW jvs 12-June-2004:  see corresponding comment above in
00167     // allocateBuffer()
00168 
00169     sortInfo.memSegmentAccessor.pSegment->deallocatePageRange(
00170         NULL_PAGE_ID,NULL_PAGE_ID);
00171 }
00172 
00173 ExternalSortRC ExternalSortRunLoader::loadRun(
00174     ExecStreamBufAccessor &bufAccessor)
00175 {
00176     for (;;) {
00177         if (!bufAccessor.demandData()) {
00178             break;
00179         }
00180         uint cbAvailable = bufAccessor.getConsumptionAvailable();
00181         assert(cbAvailable);
00182         PConstBuffer pSrc = bufAccessor.getConsumptionStart();
00183         bool overflow = false;
00184         uint cbCopy = 0;
00185         while (cbCopy < cbAvailable) {
00186             PConstBuffer pSrcTuple = pSrc + cbCopy;
00187             uint cbTuple = tupleAccessor.getBufferByteCount(pSrcTuple);
00188             assert(cbTuple);
00189             assert(cbTuple <= tupleAccessor.getMaxByteCount());
00190 
00191             // first make sure we have room for the key pointer
00192             if (pIndexBuffer >= pIndexBufferEnd) {
00193                 if (!allocateIndexBuffer()) {
00194                     overflow = true;
00195                     break;
00196                 }
00197             }
00198 
00199             // now make sure we have enough room for the data
00200             if (pDataBuffer + cbCopy + cbTuple > pDataBufferEnd) {
00201                 if (!cbCopy) {
00202                     // first tuple:  try to allocate a new buffer
00203                     if (!allocateDataBuffer()) {
00204                         // since cbCopy is zero, we can return right now
00205                         return EXTSORT_OVERFLOW;
00206                     }
00207                 }
00208                 // copy whatever we've calculated so far;
00209                 // next time through the for loop, we'll allocate a fresh buffer
00210                 break;
00211             }
00212 
00213             *((PBuffer *) pIndexBuffer) = pDataBuffer + cbCopy;
00214             pIndexBuffer += sizeof(PBuffer);
00215             nTuplesLoaded++;
00216             cbCopy += cbTuple;
00217         }
00218         if (cbCopy) {
00219             memcpy(pDataBuffer,pSrc,cbCopy);
00220             pDataBuffer += cbCopy;
00221             bufAccessor.consumeData(pSrc + cbCopy);
00222         }
00223         if (overflow) {
00224             return EXTSORT_OVERFLOW;
00225         }
00226     }
00227 
00228     assert(nTuplesLoaded);
00229     return EXTSORT_SUCCESS;
00230 }
00231 
00232 void ExternalSortRunLoader::sort()
00233 {
00234     assert (nTuplesLoaded);
00235 
00236     quickSort(0,nTuplesLoaded - 1);
00237 }
00238 
00239 ExternalSortFetchArray &ExternalSortRunLoader::bindFetchArray()
00240 {
00241     return fetchArray;
00242 }
00243 
00244 ExternalSortRC ExternalSortRunLoader::fetch(uint nTuplesRequested)
00245 {
00246     if (nTuplesFetched >= nTuplesLoaded) {
00247         return EXTSORT_ENDOFDATA;
00248     }
00249 
00250     fetchArray.ppTupleBuffers = (PBuffer *)
00251         &(getPointerArrayEntry(nTuplesFetched));
00252     uint pageEnd = (nTuplesFetched | indexPageMask) + 1;
00253     pageEnd = std::min(pageEnd,nTuplesLoaded);
00254     fetchArray.nTuples = std::min(nTuplesRequested,pageEnd - nTuplesFetched);
00255     nTuplesFetched += fetchArray.nTuples;
00256 
00257     return EXTSORT_SUCCESS;
00258 }
00259 
00260 inline void ExternalSortRunLoader::quickSortSwap(uint l,uint r)
00261 {
00262     std::swap(getPointerArrayEntry(l),getPointerArrayEntry(r));
00263 }
00264 
00265 // TODO:  move this
00266 const uint step_factor = 7;
00267 
00268 PBuffer ExternalSortRunLoader::quickSortFindPivot(uint l, uint r)
00269 {
00270     uint i, j, cnt, step;
00271     PBuffer vals[step_factor];
00272 
00273     if (r <= l) {
00274         return NULL;
00275     }
00276 
00277     cnt = 0;
00278     step = ((r - l) / step_factor) + 1;
00279     for (i = l; i <= r; i += step) {
00280         vals[cnt] = getPointerArrayEntry(i);
00281         j = cnt++;
00282         while (j > 0) {
00283             tupleAccessor.setCurrentTupleBuf(vals[j]);
00284             tupleAccessor2.setCurrentTupleBuf(vals[j - 1]);
00285             keyAccessor.unmarshal(keyData);
00286             keyAccessor2.unmarshal(keyData2);
00287             if (sortInfo.compareKeys(keyData,keyData2) >= 0) {
00288                 break;
00289             }
00290             std::swap(vals[j],vals[j - 1]);
00291             j--;
00292         }
00293     }
00294     if (step == 1) {
00295         for (i = 0; i < cnt; ++i) {
00296             getPointerArrayEntry(l + i) = vals[i];
00297         }
00298         return NULL;
00299     }
00300     return vals[(cnt >> 1)];
00301 }
00302 
00303 uint ExternalSortRunLoader::quickSortPartition(uint l,uint r,PBuffer pivot)
00304 {
00305     l--;
00306     r++;
00307 
00308     tupleAccessor.setCurrentTupleBuf(pivot);
00309     keyAccessor.unmarshal(keyData);
00310     for (;;) {
00311         for (;;) {
00312             ++l;
00313             tupleAccessor2.setCurrentTupleBuf(getPointerArrayEntry(l));
00314             keyAccessor2.unmarshal(keyData2);
00315             if (sortInfo.compareKeys(keyData2,keyData) >= 0) {
00316                 break;
00317             }
00318         }
00319         for (;;) {
00320             --r;
00321             tupleAccessor2.setCurrentTupleBuf(getPointerArrayEntry(r));
00322             keyAccessor2.unmarshal(keyData2);
00323             if (sortInfo.compareKeys(keyData2,keyData) <= 0) {
00324                 break;
00325             }
00326         }
00327         if (l < r) {
00328             quickSortSwap(l, r);
00329         } else {
00330             return l;
00331         }
00332     }
00333 }
00334 
00335 void ExternalSortRunLoader::quickSort(uint l,uint r)
00336 {
00337     PBuffer pPivotTuple;
00338     uint x;
00339 
00340     pPivotTuple = quickSortFindPivot(l,r);
00341     if (pPivotTuple) {
00342         x = quickSortPartition(l,r,pPivotTuple);
00343         if (x == l) {
00344             // pPivotTuple was lowest value in partition and was at position l
00345             // - move off of it and keep going
00346             x++;
00347         }
00348         quickSort(l,x - 1);
00349         quickSort(x,r);
00350     }
00351 }
00352 
00353 uint ExternalSortRunLoader::getLoadedTupleCount()
00354 {
00355     return nTuplesLoaded;
00356 }
00357 
00358 bool ExternalSortRunLoader::isStarted()
00359 {
00360     return nTuplesLoaded > nTuplesFetched;
00361 }
00362 
00363 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $");
00364 
00365 // End ExternalSortRunLoader.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1