ExternalSortExecStreamImpl.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2004-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/sorter/ExternalSortExecStreamImpl.h"
00026 #include "fennel/segment/Segment.h"
00027 #include "fennel/segment/SegStreamAllocation.h"
00028 #include "fennel/exec/ExecStreamGraphImpl.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $");
00032 
00033 ExternalSortExecStream *ExternalSortExecStream::newExternalSortExecStream()
00034 {
00035     return new ExternalSortExecStreamImpl();
00036 }
00037 
00038 ExternalSortInfo::ExternalSortInfo(ExecStream &streamInit)
00039     : stream(streamInit)
00040 {
00041     nSortMemPages = 0;
00042     nSortMemPagesPerRun = 0;
00043     cbPage = 0;
00044 }
00045 
00046 int ExternalSortInfo::compareKeys(TupleData const &key1, TupleData const &key2)
00047 {
00048     int c = keyDesc.compareTuples(key1, key2);
00049     if (!c) {
00050         return 0;
00051     }
00052     // abs(c) is 1-based column ordinal
00053     int i = (c > 0) ? c : -c;
00054     // shift to 0-based
00055     --i;
00056     if (descendingKeyColumns[i]) {
00057         // flip comparison result for DESC
00058         return -c;
00059     } else {
00060         return c;
00061     }
00062 }
00063 
00064 ExternalSortExecStreamImpl::ExternalSortExecStreamImpl()
00065     : sortInfo(*this)
00066 {
00067 }
00068 
00069 void ExternalSortExecStreamImpl::prepare(
00070     ExternalSortExecStreamParams const &params)
00071 {
00072     ConduitExecStream::prepare(params);
00073 
00074     pTempSegment = params.pTempSegment;
00075     resultsReady = false;
00076     nParallel = 1;
00077     storeFinalRun = params.storeFinalRun;
00078     estimatedNumRows = params.estimatedNumRows;
00079     earlyClose = params.earlyClose;
00080 
00081     switch (params.distinctness) {
00082     case DUP_ALLOW:
00083         break;
00084     case DUP_DISCARD:
00085         // TODO
00086         permAssert(false);
00087     case DUP_FAIL:
00088         // TODO
00089         permAssert(false);
00090     }
00091 
00092     TupleDescriptor const &srcRecDef = pInAccessor->getTupleDesc();
00093     sortInfo.keyProj = params.keyProj;
00094     assert(params.outputTupleDesc == srcRecDef);
00095     sortInfo.tupleDesc = srcRecDef;
00096     sortInfo.keyDesc.projectFrom(sortInfo.tupleDesc,params.keyProj);
00097     sortInfo.descendingKeyColumns = params.descendingKeyColumns;
00098     if (sortInfo.descendingKeyColumns.empty()) {
00099         // default is all ascending
00100         sortInfo.descendingKeyColumns.resize(sortInfo.keyProj.size(), false);
00101     }
00102     sortInfo.cbPage = params.pTempSegment->getFullPageSize();
00103     sortInfo.memSegmentAccessor = params.scratchAccessor;
00104     sortInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00105     sortInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00106     sortInfo.nSortMemPages = 0;
00107 }
00108 
00109 void ExternalSortExecStreamImpl::getResourceRequirements(
00110     ExecStreamResourceQuantity &minQuantity,
00111     ExecStreamResourceQuantity &optQuantity,
00112     ExecStreamResourceSettingType &optType)
00113 {
00114     ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00115 
00116     // REVIEW
00117     uint minPages = 3;
00118     minQuantity.nCachePages += minPages;
00119 
00120     // if no estimated row count is available, request an unbounded amount
00121     // from the resource governor; otherwise, estimate the number of pages
00122     // for an in-memory sort
00123     if (isMAXU(estimatedNumRows)) {
00124         optType = EXEC_RESOURCE_UNBOUNDED;
00125     } else {
00126         // use the average of the min and max rowsizes
00127         // TODO - use stats to come up with a more accurate average
00128         RecordNum nPages =
00129             estimatedNumRows *
00130             ((pOutAccessor->getScratchTupleAccessor().getMaxByteCount() +
00131             pOutAccessor->getScratchTupleAccessor().getMinByteCount()) / 2) /
00132             sortInfo.memSegmentAccessor.pSegment->getUsablePageSize();
00133         uint numPages;
00134         if (nPages >= uint(MAXU)) {
00135             numPages = uint(MAXU) - 1;
00136         } else {
00137             numPages = uint(nPages);
00138         }
00139         // make sure the opt is bigger than the min; otherwise, the
00140         // resource governor won't try to give it extra
00141         optQuantity.nCachePages += std::max(minPages + 1, numPages);
00142         optType = EXEC_RESOURCE_ESTIMATE;
00143     }
00144 }
00145 
00146 void ExternalSortExecStreamImpl::setResourceAllocation(
00147     ExecStreamResourceQuantity &quantity)
00148 {
00149     // REVIEW
00150     ConduitExecStream::setResourceAllocation(quantity);
00151     sortInfo.nSortMemPages = quantity.nCachePages;
00152     nParallel = quantity.nThreads + 1;
00153 
00154     // NOTE jvs 10-Nov-2004:  parallel sort is currently disabled
00155     // as an effect of the scheduler-revamp.  We may resurrect it, or
00156     // we may decide to handle parallelism up at the scheduler level.
00157     assert(nParallel == 1);
00158 }
00159 
00160 void ExternalSortExecStreamImpl::open(bool restart)
00161 {
00162     if (restart) {
00163         releaseResources();
00164     }
00165 
00166     ConduitExecStream::open(restart);
00167 
00168     // divvy up available memory by degree of parallelism
00169     sortInfo.nSortMemPagesPerRun = (sortInfo.nSortMemPages / nParallel);
00170 
00171     // subtract off one page per run for I/O buffering
00172     assert(sortInfo.nSortMemPagesPerRun > 0);
00173     sortInfo.nSortMemPagesPerRun--;
00174 
00175     // need at least two non-I/O pages per run: one for keys and one for data
00176     assert(sortInfo.nSortMemPagesPerRun > 1);
00177 
00178     runLoaders.reset(new SharedExternalSortRunLoader[nParallel]);
00179     for (uint i = 0; i < nParallel; ++i) {
00180         runLoaders[i].reset(new ExternalSortRunLoader(sortInfo));
00181     }
00182 
00183     pOutputWriter.reset(new ExternalSortOutput(sortInfo));
00184 
00185     for (uint i = 0; i < nParallel; ++i) {
00186         runLoaders[i]->startRun();
00187     }
00188 
00189     // default to local sort as output obj
00190     pOutputWriter->setSubStream(*(runLoaders[0]));
00191 
00192     resultsReady = false;
00193 }
00194 
00195 ExecStreamResult ExternalSortExecStreamImpl::execute(
00196     ExecStreamQuantum const &quantum)
00197 {
00198     if (!resultsReady) {
00199         if (pInAccessor->getState() != EXECBUF_EOS) {
00200             ExecStreamResult rc = precheckConduitBuffers();
00201             if (rc != EXECRC_YIELD) {
00202                 return rc;
00203             }
00204             if (nParallel > 1) {
00205                 // FIXME
00206                 computeFirstResultParallel();
00207             } else {
00208                 computeFirstResult();
00209                 return EXECRC_BUF_UNDERFLOW;
00210             }
00211         } else {
00212             ExternalSortRunLoader &runLoader = *(runLoaders[0]);
00213             if (runLoader.isStarted()) {
00214                 sortRun(runLoader);
00215                 if (storedRuns.size() || storeFinalRun) {
00216                     // store last run
00217                     storeRun(runLoader);
00218                 }
00219             }
00220             mergeFirstResult();
00221 
00222             // close the producers now that we've read all input
00223             if (earlyClose) {
00224                 ExecStreamGraphImpl &graphImpl =
00225                     dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00226                 graphImpl.closeProducers(getStreamId());
00227             }
00228 
00229             resultsReady = true;
00230         }
00231     }
00232 
00233     return pOutputWriter->fetch(*pOutAccessor);
00234 }
00235 
00236 void ExternalSortExecStreamImpl::closeImpl()
00237 {
00238     releaseResources();
00239     ConduitExecStream::closeImpl();
00240 }
00241 
00242 void ExternalSortExecStreamImpl::releaseResources()
00243 {
00244     if (pFinalRunAccessor) {
00245         pFinalRunAccessor->releaseResources();
00246     }
00247 
00248     runLoaders.reset();
00249     pMerger.reset();
00250     pOutputWriter.reset();
00251     pFinalRunAccessor.reset();
00252     storedRuns.clear();
00253 }
00254 
00255 void ExternalSortExecStreamImpl::computeFirstResult()
00256 {
00257     ExternalSortRunLoader &runLoader = *(runLoaders[0]);
00258     for (;;) {
00259         if (!runLoader.isStarted()) {
00260             runLoader.startRun();
00261         }
00262         ExternalSortRC rc = runLoader.loadRun(*pInAccessor);
00263         if (rc == EXTSORT_OVERFLOW) {
00264             sortRun(runLoader);
00265             storeRun(runLoader);
00266         } else {
00267             return;
00268         }
00269     }
00270 }
00271 
00272 void ExternalSortExecStreamImpl::storeRun(ExternalSortSubStream &subStream)
00273 {
00274     FENNEL_TRACE(
00275         TRACE_FINE,
00276         "storing run " << storedRuns.size());
00277 
00278     boost::scoped_ptr<ExternalSortRunAccessor> pRunAccessor;
00279     pRunAccessor.reset(new ExternalSortRunAccessor(sortInfo));
00280     pRunAccessor->storeRun(subStream);
00281 
00282     StrictMutexGuard mutexGuard(storedRunMutex);
00283     storedRuns.push_back(pRunAccessor->getStoredRun());
00284 }
00285 
00286 void ExternalSortExecStreamImpl::mergeFirstResult()
00287 {
00288     if (storedRuns.size()) {
00289         for (uint i = 0; i < nParallel; i++) {
00290             runLoaders[i]->releaseResources();
00291         }
00292 
00293         if (!pMerger) {
00294             pMerger.reset(new ExternalSortMerger(sortInfo));
00295             pMerger->initRunAccess();
00296         }
00297 
00298         uint iFirstRun = storedRuns.size() - 1;
00299         while (iFirstRun > 0) {
00300             uint nRunsToMerge;
00301 
00302             // REVIEW jvs 13-June-2004:  I had to change this to account for
00303             // the output buffer needed during merge.  Not sure why it worked
00304             // in BB?
00305             uint nMergePages = sortInfo.nSortMemPages - 1;
00306             if (storedRuns.size() <= nMergePages) {
00307                 nRunsToMerge = storedRuns.size();
00308             } else {
00309                 nRunsToMerge = std::min<uint>(
00310                     storedRuns.size() - nMergePages + 1,
00311                     nMergePages);
00312             }
00313 
00314             optimizeRunOrder();
00315             iFirstRun = storedRuns.size() - nRunsToMerge;
00316 
00317             FENNEL_TRACE(
00318                 TRACE_FINE,
00319                 "merging from run " << iFirstRun
00320                 << " with run count = " << nRunsToMerge);
00321 
00322             pMerger->startMerge(
00323                 storedRuns.begin() + iFirstRun, nRunsToMerge);
00324             if ((iFirstRun > 0) || storeFinalRun) {
00325                 storeRun(*pMerger);
00326                 deleteStoredRunInfo(iFirstRun,nRunsToMerge);
00327             }
00328         }
00329 
00330         if (storedRuns.size() == 1) {
00331             if (!pFinalRunAccessor) {
00332                 pFinalRunAccessor.reset(new ExternalSortRunAccessor(sortInfo));
00333             }
00334 
00335             FENNEL_TRACE(
00336                 TRACE_FINE,
00337                 "fetching from final run");
00338 
00339             pFinalRunAccessor->initRead();
00340             pFinalRunAccessor->startRead(storedRuns[0]);
00341             pMerger->releaseResources();
00342             pOutputWriter->setSubStream(*pFinalRunAccessor);
00343         } else {
00344             FENNEL_TRACE(
00345                 TRACE_FINE,
00346                 "fetching from final merge with run count = "
00347                 << storedRuns.size());
00348 
00349             pOutputWriter->setSubStream(*pMerger);
00350         }
00351     }
00352 }
00353 
00354 void ExternalSortExecStreamImpl::optimizeRunOrder()
00355 {
00356     uint i = storedRuns.size() - 1;
00357     while ((i > 0)
00358            && (storedRuns[i]->getWrittenPageCount()
00359                > storedRuns[i - 1]->getWrittenPageCount()))
00360     {
00361         std::swap(storedRuns[i],storedRuns[i - 1]);
00362         i--;
00363     }
00364 }
00365 
00366 void ExternalSortExecStreamImpl::deleteStoredRunInfo(uint iFirstRun,uint nRuns)
00367 {
00368     StrictMutexGuard mutexGuard(storedRunMutex);
00369     storedRuns.erase(
00370         storedRuns.begin() + iFirstRun,
00371         storedRuns.begin() + iFirstRun + nRuns);
00372 }
00373 
00374 void ExternalSortExecStreamImpl::computeFirstResultParallel()
00375 {
00376     // FIXME jvs 19-June-2004:  ThreadPool needs to propagate excns!
00377 
00378     assert(nParallel > 1);
00379 
00380     // minus one because the main dispatcher thread runs in parallel with the
00381     // pooled threads
00382     threadPool.start(nParallel - 1);
00383     try {
00384         for (;;) {
00385             ExternalSortRunLoader &runLoader = reserveRunLoader();
00386             runLoader.startRun();
00387             // FIXME
00388 #if 0
00389             ExternalSortRC rc = runLoader.loadRun(*pInputStream);
00390 #else
00391             ExternalSortRC rc = EXTSORT_ENDOFDATA;
00392 #endif
00393             if (rc == EXTSORT_ENDOFDATA) {
00394                 // the entire input has been processed, so we're ready
00395                 // for merge
00396                 unreserveRunLoader(runLoader);
00397                 break;
00398             }
00399             // otherwise, schedule a new sort task
00400             // FIXME
00401 #if 0
00402             ExternalSortTask task(*this,runLoader);
00403             threadPool.submitTask(task);
00404 #endif
00405         }
00406     } catch (...) {
00407         // REVEW jvs 19-June-2004:  signal a request to expedite cleanup?
00408 
00409         // wait for all tasks to clean up
00410         threadPool.stop();
00411         throw;
00412     }
00413 
00414     // wait for all tasks to complete before beginning merge
00415     threadPool.stop();
00416 
00417     mergeFirstResult();
00418     resultsReady = true;
00419 }
00420 
00421 void ExternalSortTask::execute()
00422 {
00423     sortStream.sortRun(runLoader);
00424     sortStream.storeRun(runLoader);
00425     sortStream.unreserveRunLoader(runLoader);
00426 }
00427 
00428 void ExternalSortExecStreamImpl::sortRun(ExternalSortRunLoader &runLoader)
00429 {
00430     FENNEL_TRACE(
00431         TRACE_FINE,
00432         "sorting run with tuple count = "
00433         << runLoader.getLoadedTupleCount());
00434     runLoader.sort();
00435 }
00436 
00437 ExternalSortRunLoader &ExternalSortExecStreamImpl::reserveRunLoader()
00438 {
00439     StrictMutexGuard mutexGuard(runLoaderMutex);
00440     for (;;) {
00441         for (uint i = 0; i < nParallel; ++i) {
00442             ExternalSortRunLoader &runLoader = *(runLoaders[i]);
00443             if (!runLoader.runningParallelTask) {
00444                 runLoader.runningParallelTask = true;
00445                 return runLoader;
00446             }
00447         }
00448         runLoaderAvailable.wait(mutexGuard);
00449     }
00450 }
00451 
00452 void ExternalSortExecStreamImpl::unreserveRunLoader(
00453     ExternalSortRunLoader &runLoader)
00454 {
00455     StrictMutexGuard mutexGuard(runLoaderMutex);
00456     runLoader.runningParallelTask = false;
00457     runLoaderAvailable.notify_all();
00458 }
00459 
00460 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $");
00461 
00462 // End ExternalSortExecStreamImpl.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1