00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/btree/BTreeNonLeafReader.h"
00026 #include "fennel/btree/BTreeLeafReader.h"
00027 #include "fennel/btree/BTreeNodeAccessor.h"
00028 #include "fennel/segment/SegPageEntryIterImpl.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include "fennel/ftrs/BTreePrefetchSearchExecStream.h"
00031
00032 #include "math.h"
00033
00034 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreePrefetchSearchExecStream.cpp#7 $");
00035
00036 BTreePrefetchSearchExecStream::BTreePrefetchSearchExecStream() : leafPageQueue()
00037 {
00038 }
00039
00040 void BTreePrefetchSearchExecStream::prepare(
00041 BTreePrefetchSearchExecStreamParams const ¶ms)
00042 {
00043 BTreeSearchExecStream::prepare(params);
00044
00045 savedLowerBoundAccessor.compute(inputKeyDesc);
00046 pfLowerBoundData.compute(inputKeyDesc);
00047 if (upperBoundDesc.size() == 0) {
00048 savedUpperBoundAccessor.compute(inputKeyDesc);
00049 pfUpperBoundData.compute(inputKeyDesc);
00050 } else {
00051 savedUpperBoundAccessor.compute(upperBoundDesc);
00052 pfUpperBoundData.compute(upperBoundDesc);
00053 }
00054
00055 scratchLock.accessSegment(scratchAccessor);
00056 scratchPageSize = scratchAccessor.pSegment->getUsablePageSize();
00057 }
00058
00059 void BTreePrefetchSearchExecStream::getResourceRequirements(
00060 ExecStreamResourceQuantity &minQuantity,
00061 ExecStreamResourceQuantity &optQuantity,
00062 ExecStreamResourceSettingType &optType)
00063 {
00064 BTreeSearchExecStream::getResourceRequirements(minQuantity, optQuantity);
00065
00066
00067 minQuantity.nCachePages += 1;
00068 optQuantity.nCachePages += 1;
00069 nMiscCachePages = minQuantity.nCachePages;
00070
00071
00072
00073
00074
00075 minQuantity.nCachePages += 1;
00076 keyValuesSize = savedLowerBoundAccessor.getMaxByteCount() * 2;
00077
00078
00079
00080
00081
00082 if (keyValuesSize > scratchPageSize) {
00083 bigMaxKey = true;
00084 keyValuesSize = scratchPageSize;
00085 minQuantity.nCachePages += 1;
00086 } else {
00087 bigMaxKey = false;
00088 }
00089
00090 nEntriesPerScratchPage = scratchPageSize / keyValuesSize;
00091 uint optNumPages = (uint) ceil(8192.0 / nEntriesPerScratchPage);
00092 assert(optNumPages >= 1);
00093 optQuantity.nCachePages += optNumPages;
00094 optType = EXEC_RESOURCE_ACCURATE;
00095 }
00096
00097 void BTreePrefetchSearchExecStream::setResourceAllocation(
00098 ExecStreamResourceQuantity &quantity)
00099 {
00100 BTreeSearchExecStream::setResourceAllocation(quantity);
00101 nPrefetchScratchPages = quantity.nCachePages - nMiscCachePages;
00102 }
00103
00104 void BTreePrefetchSearchExecStream::open(bool restart)
00105 {
00106 BTreeSearchExecStream::open(restart);
00107 pNonLeafReader =
00108 SharedBTreeNonLeafReader(new BTreeNonLeafReader(treeDescriptor));
00109 initialPrefetchDone = false;
00110 processingDone = false;
00111 prevLeafSearchRc = true;
00112 returnedRoot = false;
00113 rootOnly = (pNonLeafReader->isRootOnly());
00114
00115 if (!restart) {
00116 uint nPrefetchEntries =
00117 (bigMaxKey) ?
00118 nPrefetchScratchPages / 2 :
00119 nPrefetchScratchPages * nEntriesPerScratchPage;
00120 leafPageQueue.resize(nPrefetchEntries);
00121 allocateScratchPages();
00122 leafPageQueue.setPrefetchSource(*this);
00123 }
00124 }
00125
00126 SharedBTreeReader BTreePrefetchSearchExecStream::newReader()
00127 {
00128 pLeafReader = SharedBTreeLeafReader(new BTreeLeafReader(treeDescriptor));
00129 pBTreeAccessBase = pBTreeReader = pLeafReader;
00130 return pBTreeReader;
00131 }
00132
00133 void BTreePrefetchSearchExecStream::allocateScratchPages()
00134 {
00135 for (uint i = 0; i < nPrefetchScratchPages; i++) {
00136 scratchLock.allocatePage();
00137 PBuffer page = scratchLock.getPage().getWritableData();
00138 scratchPages.push_back(page);
00139 }
00140 currPage = 0;
00141 currPageEntry = 0;
00142 }
00143
00144 void BTreePrefetchSearchExecStream::initPrefetchEntry(
00145 BTreePrefetchSearchKey &searchKey)
00146 {
00147 if (bigMaxKey) {
00148 searchKey.lowerKeyBuffer = scratchPages[currPage++];
00149 searchKey.upperKeyBuffer = scratchPages[currPage++];
00150 } else {
00151 searchKey.lowerKeyBuffer =
00152 scratchPages[currPage] + currPageEntry * keyValuesSize;
00153 searchKey.upperKeyBuffer = searchKey.lowerKeyBuffer + keyValuesSize / 2;
00154 if (++currPageEntry == nEntriesPerScratchPage) {
00155 currPage++;
00156 currPageEntry = 0;
00157 }
00158 }
00159 }
00160
00161 ExecStreamResult BTreePrefetchSearchExecStream::execute(
00162 ExecStreamQuantum const &quantum)
00163 {
00164 uint nTuples = 0;
00165
00166
00167
00168 for (;;) {
00169
00170 if (!innerSearchLoop()) {
00171 return EXECRC_BUF_UNDERFLOW;
00172 }
00173
00174
00175
00176 if (pInAccessor->getState() == EXECBUF_EOS && processingDone) {
00177 pOutAccessor->markEOS();
00178 return EXECRC_EOS;
00179 }
00180
00181
00182
00183 ExecStreamResult rc = innerFetchLoop(quantum, nTuples);
00184 if (rc != EXECRC_YIELD) {
00185 return rc;
00186 }
00187 }
00188 }
00189
00190 bool BTreePrefetchSearchExecStream::innerSearchLoop()
00191 {
00192
00193
00194 while (!pReader->isPositioned()) {
00195
00196
00197 if (pInAccessor->getState() != EXECBUF_EOS &&
00198 !pInAccessor->demandData())
00199 {
00200 return false;
00201 }
00202
00203 if (!initialPrefetchDone) {
00204 if (pInAccessor->getState() == EXECBUF_EOS) {
00205 processingDone = true;
00206 break;
00207 }
00208 getPrefetchSearchKey();
00209 leafPageQueue.mapRange(
00210 treeDescriptor.segmentAccessor,
00211 NULL_PAGE_ID);
00212 initialPrefetchDone = true;
00213 } else {
00214 ++leafPageQueue;
00215 }
00216
00217 std::pair<PageId, BTreePrefetchSearchKey> &prefetchEntry =
00218 *leafPageQueue;
00219
00220
00221 if (prefetchEntry.first == NULL_PAGE_ID) {
00222 processingDone = true;
00223 break;
00224 }
00225
00226
00227
00228
00229
00230
00231 setUpSearchKey(prefetchEntry.second);
00232 pLeafReader->setCurrentPageId(prefetchEntry.first);
00233
00234
00235
00236
00237
00238 if (prefetchEntry.second.newSearch || !prevLeafSearchRc) {
00239 prevLeafSearchRc = searchForKey();
00240 if (prevLeafSearchRc) {
00241 break;
00242 }
00243 } else {
00244 if (pReader->searchFirst() && testInterval()) {
00245 projAccessor.unmarshal(tupleData.begin());
00246 break;
00247 }
00248 }
00249
00250
00251 pReader->endSearch();
00252 }
00253 return true;
00254 }
00255
00256 void BTreePrefetchSearchExecStream::getPrefetchSearchKey()
00257 {
00258 readSearchKey();
00259 readDirectives();
00260 setAdditionalKeys();
00261 readUpperBoundKey();
00262 pfLowerBoundDirective = lowerBoundDirective;
00263 pfUpperBoundDirective = upperBoundDirective;
00264 pfLowerBoundData = *pSearchKey;
00265 pfUpperBoundData = upperBoundData;
00266 }
00267
00268 void BTreePrefetchSearchExecStream::setUpSearchKey(
00269 BTreePrefetchSearchKey const &searchKey)
00270 {
00271 lowerBoundDirective = searchKey.lowerBoundDirective;
00272 upperBoundDirective = searchKey.upperBoundDirective;
00273 setLowerBoundKey(searchKey.lowerKeyBuffer);
00274 savedUpperBoundAccessor.setCurrentTupleBuf(searchKey.upperKeyBuffer);
00275 savedUpperBoundAccessor.unmarshal(upperBoundData);
00276 }
00277
00278 void BTreePrefetchSearchExecStream::setLowerBoundKey(PConstBuffer buf)
00279 {
00280 savedLowerBoundAccessor.setCurrentTupleBuf(buf);
00281 savedLowerBoundAccessor.unmarshal(inputKeyData);
00282 pSearchKey = &inputKeyData;
00283 }
00284
00285 void BTreePrefetchSearchExecStream::setAdditionalKeys()
00286 {
00287 pSearchKey = &inputKeyData;
00288 }
00289
00290 PageId BTreePrefetchSearchExecStream::getNextPageForPrefetch(
00291 BTreePrefetchSearchKey &searchKey,
00292 bool &found)
00293 {
00294 found = true;
00295
00296
00297
00298
00299 if (rootOnly) {
00300 while (true) {
00301 if (returnedRoot) {
00302 if (pInAccessor->getState() == EXECBUF_EOS) {
00303 return NULL_PAGE_ID;
00304 } else if (!pInAccessor->demandData()) {
00305 found = false;
00306 return NULL_PAGE_ID;
00307 }
00308 returnedRoot = false;
00309 getPrefetchSearchKey();
00310 }
00311 returnedRoot = true;
00312 setSearchKeyData(true, searchKey);
00313 pInAccessor->consumeTuple();
00314 return pNonLeafReader->getRootPageId();
00315 }
00316 }
00317
00318 bool first;
00319
00320 while (true) {
00321
00322
00323
00324
00325 if (pNonLeafReader->isPositioned()) {
00326 first = false;
00327 if (endOnNextKey) {
00328 pInAccessor->consumeTuple();
00329 pNonLeafReader->endSearch();
00330 continue;
00331 } else {
00332 pNonLeafReader->searchNext();
00333 }
00334
00335
00336
00337
00338 } else {
00339 if (!pInAccessor->isTupleConsumptionPending()) {
00340
00341
00342
00343 if (pInAccessor->getState() == EXECBUF_EOS) {
00344 return NULL_PAGE_ID;
00345 } else if (!pInAccessor->demandData()) {
00346 found = false;
00347 return NULL_PAGE_ID;
00348 }
00349 getPrefetchSearchKey();
00350 }
00351 first = true;
00352 switch (pfLowerBoundDirective) {
00353 case SEARCH_UNBOUNDED_LOWER:
00354 if (pfLowerBoundData.size() <= 1) {
00355 pNonLeafReader->searchFirst();
00356 break;
00357 }
00358
00359
00360
00361 case SEARCH_CLOSED_LOWER:
00362 pNonLeafReader->searchForKey(
00363 pfLowerBoundData, DUP_SEEK_BEGIN, leastUpper);
00364 break;
00365 case SEARCH_OPEN_LOWER:
00366 pNonLeafReader->searchForKey(
00367 pfLowerBoundData, DUP_SEEK_END, leastUpper);
00368 break;
00369 default:
00370 permFail(
00371 "unexpected lower bound directive: "
00372 << (char) lowerBoundDirective);
00373 }
00374 }
00375
00376
00377
00378
00379
00380 assert(!pNonLeafReader->isSingular());
00381 endOnNextKey = testNonLeafInterval();
00382 setSearchKeyData(first, searchKey);
00383 return pNonLeafReader->getChildForCurrent();
00384 }
00385 }
00386
00387 void BTreePrefetchSearchExecStream::setSearchKeyData(
00388 bool newSearch,
00389 BTreePrefetchSearchKey &searchKey)
00390 {
00391 searchKey.newSearch = newSearch;
00392 searchKey.lowerBoundDirective =
00393 pfLowerBoundDirective;
00394 searchKey.upperBoundDirective =
00395 pfUpperBoundDirective;
00396 if (bigMaxKey) {
00397 assert(
00398 savedLowerBoundAccessor.getByteCount(pfLowerBoundData)
00399 <= keyValuesSize);
00400 assert(
00401 savedUpperBoundAccessor.getByteCount(pfUpperBoundData)
00402 <= keyValuesSize);
00403 }
00404 savedLowerBoundAccessor.marshal(pfLowerBoundData, searchKey.lowerKeyBuffer);
00405 savedUpperBoundAccessor.marshal(pfUpperBoundData, searchKey.upperKeyBuffer);
00406 }
00407
00408 bool BTreePrefetchSearchExecStream::testNonLeafInterval()
00409 {
00410
00411
00412 if (pNonLeafReader->isPositionedOnInfinityKey()) {
00413 return true;
00414 }
00415
00416 BTreeNodeAccessor &nodeAccessor =
00417 pNonLeafReader->getNonLeafNodeAccessor();
00418 if (pfUpperBoundDirective == SEARCH_UNBOUNDED_UPPER) {
00419
00420
00421
00422
00423 if (pfLowerBoundData.size() > 1) {
00424 nodeAccessor.unmarshalKey(readerKeyData);
00425 int c =
00426 inputKeyDesc.compareTuplesKey(
00427 pfLowerBoundData,
00428 readerKeyData,
00429 pfLowerBoundData.size() - 1);
00430
00431
00432 permAssert(c <= 0);
00433 return (c < 0);
00434 } else {
00435 return false;
00436 }
00437 } else {
00438 nodeAccessor.unmarshalKey(readerKeyData);
00439 int c = inputKeyDesc.compareTuples(pfUpperBoundData, readerKeyData);
00440
00441
00442 if (c == 0) {
00443 if (pfUpperBoundDirective == SEARCH_OPEN_UPPER) {
00444 c = -1;
00445 }
00446 }
00447 return (c < 0);
00448 }
00449 }
00450
00451 void BTreePrefetchSearchExecStream::closeImpl()
00452 {
00453 BTreeSearchExecStream::closeImpl();
00454 scratchPages.clear();
00455 if (scratchAccessor.pSegment) {
00456 scratchAccessor.pSegment->deallocatePageRange(
00457 NULL_PAGE_ID, NULL_PAGE_ID);
00458 }
00459 }
00460
00461 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreePrefetchSearchExecStream.cpp#7 $");
00462
00463