00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/sorter/ExternalSortExecStreamImpl.h"
00026 #include "fennel/segment/Segment.h"
00027 #include "fennel/segment/SegStreamAllocation.h"
00028 #include "fennel/exec/ExecStreamGraphImpl.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $");
00032
00033 ExternalSortExecStream *ExternalSortExecStream::newExternalSortExecStream()
00034 {
00035 return new ExternalSortExecStreamImpl();
00036 }
00037
00038 ExternalSortInfo::ExternalSortInfo(ExecStream &streamInit)
00039 : stream(streamInit)
00040 {
00041 nSortMemPages = 0;
00042 nSortMemPagesPerRun = 0;
00043 cbPage = 0;
00044 }
00045
00046 int ExternalSortInfo::compareKeys(TupleData const &key1, TupleData const &key2)
00047 {
00048 int c = keyDesc.compareTuples(key1, key2);
00049 if (!c) {
00050 return 0;
00051 }
00052
00053 int i = (c > 0) ? c : -c;
00054
00055 --i;
00056 if (descendingKeyColumns[i]) {
00057
00058 return -c;
00059 } else {
00060 return c;
00061 }
00062 }
00063
00064 ExternalSortExecStreamImpl::ExternalSortExecStreamImpl()
00065 : sortInfo(*this)
00066 {
00067 }
00068
00069 void ExternalSortExecStreamImpl::prepare(
00070 ExternalSortExecStreamParams const ¶ms)
00071 {
00072 ConduitExecStream::prepare(params);
00073
00074 pTempSegment = params.pTempSegment;
00075 resultsReady = false;
00076 nParallel = 1;
00077 storeFinalRun = params.storeFinalRun;
00078 estimatedNumRows = params.estimatedNumRows;
00079 earlyClose = params.earlyClose;
00080
00081 switch (params.distinctness) {
00082 case DUP_ALLOW:
00083 break;
00084 case DUP_DISCARD:
00085
00086 permAssert(false);
00087 case DUP_FAIL:
00088
00089 permAssert(false);
00090 }
00091
00092 TupleDescriptor const &srcRecDef = pInAccessor->getTupleDesc();
00093 sortInfo.keyProj = params.keyProj;
00094 assert(params.outputTupleDesc == srcRecDef);
00095 sortInfo.tupleDesc = srcRecDef;
00096 sortInfo.keyDesc.projectFrom(sortInfo.tupleDesc,params.keyProj);
00097 sortInfo.descendingKeyColumns = params.descendingKeyColumns;
00098 if (sortInfo.descendingKeyColumns.empty()) {
00099
00100 sortInfo.descendingKeyColumns.resize(sortInfo.keyProj.size(), false);
00101 }
00102 sortInfo.cbPage = params.pTempSegment->getFullPageSize();
00103 sortInfo.memSegmentAccessor = params.scratchAccessor;
00104 sortInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00105 sortInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00106 sortInfo.nSortMemPages = 0;
00107 }
00108
00109 void ExternalSortExecStreamImpl::getResourceRequirements(
00110 ExecStreamResourceQuantity &minQuantity,
00111 ExecStreamResourceQuantity &optQuantity,
00112 ExecStreamResourceSettingType &optType)
00113 {
00114 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00115
00116
00117 uint minPages = 3;
00118 minQuantity.nCachePages += minPages;
00119
00120
00121
00122
00123 if (isMAXU(estimatedNumRows)) {
00124 optType = EXEC_RESOURCE_UNBOUNDED;
00125 } else {
00126
00127
00128 RecordNum nPages =
00129 estimatedNumRows *
00130 ((pOutAccessor->getScratchTupleAccessor().getMaxByteCount() +
00131 pOutAccessor->getScratchTupleAccessor().getMinByteCount()) / 2) /
00132 sortInfo.memSegmentAccessor.pSegment->getUsablePageSize();
00133 uint numPages;
00134 if (nPages >= uint(MAXU)) {
00135 numPages = uint(MAXU) - 1;
00136 } else {
00137 numPages = uint(nPages);
00138 }
00139
00140
00141 optQuantity.nCachePages += std::max(minPages + 1, numPages);
00142 optType = EXEC_RESOURCE_ESTIMATE;
00143 }
00144 }
00145
00146 void ExternalSortExecStreamImpl::setResourceAllocation(
00147 ExecStreamResourceQuantity &quantity)
00148 {
00149
00150 ConduitExecStream::setResourceAllocation(quantity);
00151 sortInfo.nSortMemPages = quantity.nCachePages;
00152 nParallel = quantity.nThreads + 1;
00153
00154
00155
00156
00157 assert(nParallel == 1);
00158 }
00159
00160 void ExternalSortExecStreamImpl::open(bool restart)
00161 {
00162 if (restart) {
00163 releaseResources();
00164 }
00165
00166 ConduitExecStream::open(restart);
00167
00168
00169 sortInfo.nSortMemPagesPerRun = (sortInfo.nSortMemPages / nParallel);
00170
00171
00172 assert(sortInfo.nSortMemPagesPerRun > 0);
00173 sortInfo.nSortMemPagesPerRun--;
00174
00175
00176 assert(sortInfo.nSortMemPagesPerRun > 1);
00177
00178 runLoaders.reset(new SharedExternalSortRunLoader[nParallel]);
00179 for (uint i = 0; i < nParallel; ++i) {
00180 runLoaders[i].reset(new ExternalSortRunLoader(sortInfo));
00181 }
00182
00183 pOutputWriter.reset(new ExternalSortOutput(sortInfo));
00184
00185 for (uint i = 0; i < nParallel; ++i) {
00186 runLoaders[i]->startRun();
00187 }
00188
00189
00190 pOutputWriter->setSubStream(*(runLoaders[0]));
00191
00192 resultsReady = false;
00193 }
00194
00195 ExecStreamResult ExternalSortExecStreamImpl::execute(
00196 ExecStreamQuantum const &quantum)
00197 {
00198 if (!resultsReady) {
00199 if (pInAccessor->getState() != EXECBUF_EOS) {
00200 ExecStreamResult rc = precheckConduitBuffers();
00201 if (rc != EXECRC_YIELD) {
00202 return rc;
00203 }
00204 if (nParallel > 1) {
00205
00206 computeFirstResultParallel();
00207 } else {
00208 computeFirstResult();
00209 return EXECRC_BUF_UNDERFLOW;
00210 }
00211 } else {
00212 ExternalSortRunLoader &runLoader = *(runLoaders[0]);
00213 if (runLoader.isStarted()) {
00214 sortRun(runLoader);
00215 if (storedRuns.size() || storeFinalRun) {
00216
00217 storeRun(runLoader);
00218 }
00219 }
00220 mergeFirstResult();
00221
00222
00223 if (earlyClose) {
00224 ExecStreamGraphImpl &graphImpl =
00225 dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00226 graphImpl.closeProducers(getStreamId());
00227 }
00228
00229 resultsReady = true;
00230 }
00231 }
00232
00233 return pOutputWriter->fetch(*pOutAccessor);
00234 }
00235
00236 void ExternalSortExecStreamImpl::closeImpl()
00237 {
00238 releaseResources();
00239 ConduitExecStream::closeImpl();
00240 }
00241
00242 void ExternalSortExecStreamImpl::releaseResources()
00243 {
00244 if (pFinalRunAccessor) {
00245 pFinalRunAccessor->releaseResources();
00246 }
00247
00248 runLoaders.reset();
00249 pMerger.reset();
00250 pOutputWriter.reset();
00251 pFinalRunAccessor.reset();
00252 storedRuns.clear();
00253 }
00254
00255 void ExternalSortExecStreamImpl::computeFirstResult()
00256 {
00257 ExternalSortRunLoader &runLoader = *(runLoaders[0]);
00258 for (;;) {
00259 if (!runLoader.isStarted()) {
00260 runLoader.startRun();
00261 }
00262 ExternalSortRC rc = runLoader.loadRun(*pInAccessor);
00263 if (rc == EXTSORT_OVERFLOW) {
00264 sortRun(runLoader);
00265 storeRun(runLoader);
00266 } else {
00267 return;
00268 }
00269 }
00270 }
00271
00272 void ExternalSortExecStreamImpl::storeRun(ExternalSortSubStream &subStream)
00273 {
00274 FENNEL_TRACE(
00275 TRACE_FINE,
00276 "storing run " << storedRuns.size());
00277
00278 boost::scoped_ptr<ExternalSortRunAccessor> pRunAccessor;
00279 pRunAccessor.reset(new ExternalSortRunAccessor(sortInfo));
00280 pRunAccessor->storeRun(subStream);
00281
00282 StrictMutexGuard mutexGuard(storedRunMutex);
00283 storedRuns.push_back(pRunAccessor->getStoredRun());
00284 }
00285
00286 void ExternalSortExecStreamImpl::mergeFirstResult()
00287 {
00288 if (storedRuns.size()) {
00289 for (uint i = 0; i < nParallel; i++) {
00290 runLoaders[i]->releaseResources();
00291 }
00292
00293 if (!pMerger) {
00294 pMerger.reset(new ExternalSortMerger(sortInfo));
00295 pMerger->initRunAccess();
00296 }
00297
00298 uint iFirstRun = storedRuns.size() - 1;
00299 while (iFirstRun > 0) {
00300 uint nRunsToMerge;
00301
00302
00303
00304
00305 uint nMergePages = sortInfo.nSortMemPages - 1;
00306 if (storedRuns.size() <= nMergePages) {
00307 nRunsToMerge = storedRuns.size();
00308 } else {
00309 nRunsToMerge = std::min<uint>(
00310 storedRuns.size() - nMergePages + 1,
00311 nMergePages);
00312 }
00313
00314 optimizeRunOrder();
00315 iFirstRun = storedRuns.size() - nRunsToMerge;
00316
00317 FENNEL_TRACE(
00318 TRACE_FINE,
00319 "merging from run " << iFirstRun
00320 << " with run count = " << nRunsToMerge);
00321
00322 pMerger->startMerge(
00323 storedRuns.begin() + iFirstRun, nRunsToMerge);
00324 if ((iFirstRun > 0) || storeFinalRun) {
00325 storeRun(*pMerger);
00326 deleteStoredRunInfo(iFirstRun,nRunsToMerge);
00327 }
00328 }
00329
00330 if (storedRuns.size() == 1) {
00331 if (!pFinalRunAccessor) {
00332 pFinalRunAccessor.reset(new ExternalSortRunAccessor(sortInfo));
00333 }
00334
00335 FENNEL_TRACE(
00336 TRACE_FINE,
00337 "fetching from final run");
00338
00339 pFinalRunAccessor->initRead();
00340 pFinalRunAccessor->startRead(storedRuns[0]);
00341 pMerger->releaseResources();
00342 pOutputWriter->setSubStream(*pFinalRunAccessor);
00343 } else {
00344 FENNEL_TRACE(
00345 TRACE_FINE,
00346 "fetching from final merge with run count = "
00347 << storedRuns.size());
00348
00349 pOutputWriter->setSubStream(*pMerger);
00350 }
00351 }
00352 }
00353
00354 void ExternalSortExecStreamImpl::optimizeRunOrder()
00355 {
00356 uint i = storedRuns.size() - 1;
00357 while ((i > 0)
00358 && (storedRuns[i]->getWrittenPageCount()
00359 > storedRuns[i - 1]->getWrittenPageCount()))
00360 {
00361 std::swap(storedRuns[i],storedRuns[i - 1]);
00362 i--;
00363 }
00364 }
00365
00366 void ExternalSortExecStreamImpl::deleteStoredRunInfo(uint iFirstRun,uint nRuns)
00367 {
00368 StrictMutexGuard mutexGuard(storedRunMutex);
00369 storedRuns.erase(
00370 storedRuns.begin() + iFirstRun,
00371 storedRuns.begin() + iFirstRun + nRuns);
00372 }
00373
00374 void ExternalSortExecStreamImpl::computeFirstResultParallel()
00375 {
00376
00377
00378 assert(nParallel > 1);
00379
00380
00381
00382 threadPool.start(nParallel - 1);
00383 try {
00384 for (;;) {
00385 ExternalSortRunLoader &runLoader = reserveRunLoader();
00386 runLoader.startRun();
00387
00388 #if 0
00389 ExternalSortRC rc = runLoader.loadRun(*pInputStream);
00390 #else
00391 ExternalSortRC rc = EXTSORT_ENDOFDATA;
00392 #endif
00393 if (rc == EXTSORT_ENDOFDATA) {
00394
00395
00396 unreserveRunLoader(runLoader);
00397 break;
00398 }
00399
00400
00401 #if 0
00402 ExternalSortTask task(*this,runLoader);
00403 threadPool.submitTask(task);
00404 #endif
00405 }
00406 } catch (...) {
00407
00408
00409
00410 threadPool.stop();
00411 throw;
00412 }
00413
00414
00415 threadPool.stop();
00416
00417 mergeFirstResult();
00418 resultsReady = true;
00419 }
00420
00421 void ExternalSortTask::execute()
00422 {
00423 sortStream.sortRun(runLoader);
00424 sortStream.storeRun(runLoader);
00425 sortStream.unreserveRunLoader(runLoader);
00426 }
00427
00428 void ExternalSortExecStreamImpl::sortRun(ExternalSortRunLoader &runLoader)
00429 {
00430 FENNEL_TRACE(
00431 TRACE_FINE,
00432 "sorting run with tuple count = "
00433 << runLoader.getLoadedTupleCount());
00434 runLoader.sort();
00435 }
00436
00437 ExternalSortRunLoader &ExternalSortExecStreamImpl::reserveRunLoader()
00438 {
00439 StrictMutexGuard mutexGuard(runLoaderMutex);
00440 for (;;) {
00441 for (uint i = 0; i < nParallel; ++i) {
00442 ExternalSortRunLoader &runLoader = *(runLoaders[i]);
00443 if (!runLoader.runningParallelTask) {
00444 runLoader.runningParallelTask = true;
00445 return runLoader;
00446 }
00447 }
00448 runLoaderAvailable.wait(mutexGuard);
00449 }
00450 }
00451
00452 void ExternalSortExecStreamImpl::unreserveRunLoader(
00453 ExternalSortRunLoader &runLoader)
00454 {
00455 StrictMutexGuard mutexGuard(runLoaderMutex);
00456 runLoader.runningParallelTask = false;
00457 runLoaderAvailable.notify_all();
00458 }
00459
00460 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $");
00461
00462