00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/btree/BTreeWriter.h"
00026 #include "fennel/btree/BTreeRecoveryFactory.h"
00027 #include "fennel/btree/BTreeAccessBaseImpl.h"
00028 #include "fennel/btree/BTreeReaderImpl.h"
00029 #include "fennel/btree/BTreeDuplicateKeyExcn.h"
00030 #include "fennel/txn/LogicalTxn.h"
00031 #include "fennel/common/ByteOutputStream.h"
00032 #include "fennel/common/ByteInputStream.h"
00033 #include "fennel/tuple/TuplePrinter.h"
00034
00035 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $");
00036
00037
00038 BTreeWriter::BTreeWriter(
00039 BTreeDescriptor const &descriptor,
00040 SegmentAccessor const &scratchAccessorInit,
00041 bool monotonicInit)
00042 : BTreeReader(descriptor),
00043 scratchAccessor(scratchAccessorInit)
00044 {
00045
00046 leafLockMode = LOCKMODE_X;
00047 scratchPageLock.accessSegment(scratchAccessor);
00048 splitTupleBuffer.reset(
00049 new FixedBuffer[pNonLeafNodeAccessor->tupleAccessor.getMaxByteCount()]);
00050 leafTupleBuffer.reset(
00051 new FixedBuffer[pLeafNodeAccessor->tupleAccessor.getMaxByteCount()]);
00052 monotonic = monotonicInit;
00053 }
00054
00055 BTreeWriter::~BTreeWriter()
00056 {
00057 }
00058
00059 void BTreeWriter::insertTupleData(
00060 TupleData const &tupleData,Distinctness distinctness)
00061 {
00062
00063 pLeafNodeAccessor->tupleAccessor.marshal(tupleData,leafTupleBuffer.get());
00064 insertTupleFromBuffer(leafTupleBuffer.get(),distinctness);
00065 }
00066
00067 uint BTreeWriter::insertTupleFromBuffer(
00068 PConstBuffer pTupleBuffer,Distinctness distinctness)
00069 {
00070 BTreeNodeAccessor &nodeAccessor = *pLeafNodeAccessor;
00071 nodeAccessor.tupleAccessor.setCurrentTupleBuf(pTupleBuffer);
00072
00073 validateTupleSize(nodeAccessor.tupleAccessor);
00074
00075 uint cbTuple = nodeAccessor.tupleAccessor.getCurrentByteCount();
00076
00077 #if 0
00078 TuplePrinter tuplePrinter;
00079 tuplePrinter.print(std::cout, keyDescriptor, searchKeyData);
00080 std::cout << std::endl;
00081 #endif
00082
00083
00084
00085
00086
00087 if (monotonic) {
00088 if (isPositioned()) {
00089 ++iTupleOnLowestLevel;
00090 assert(checkMonotonicity(nodeAccessor, pTupleBuffer));
00091 } else {
00092 positionSearchKey(nodeAccessor);
00093 }
00094 } else {
00095 bool duplicate = positionSearchKey(nodeAccessor);
00096
00097
00098
00099
00100 if (duplicate) {
00101 switch (distinctness) {
00102 case DUP_ALLOW:
00103 break;
00104 case DUP_DISCARD:
00105 endSearch();
00106 return cbTuple;
00107 default:
00108 if (!searchKeyData.containsNull()) {
00109 endSearch();
00110 throw BTreeDuplicateKeyExcn(keyDescriptor,searchKeyData);
00111 }
00112 }
00113 }
00114 }
00115
00116 if (isLoggingEnabled()) {
00117 ByteOutputStream &logStream = getLogicalTxn()->beginLogicalAction(
00118 *this,
00119 ACTION_INSERT);
00120 memcpy(
00121 logStream.getWritePointer(cbTuple),
00122 pTupleBuffer,
00123 cbTuple);
00124 logStream.consumeWritePointer(cbTuple);
00125 getLogicalTxn()->endLogicalAction();
00126 }
00127
00128 bool split = !attemptInsertWithoutSplit(
00129 pageLock,pTupleBuffer,cbTuple,iTupleOnLowestLevel);
00130 if (split) {
00131 splitCurrentNode(pTupleBuffer,cbTuple,iTupleOnLowestLevel);
00132 }
00133
00134
00135
00136 if (!monotonic || split) {
00137 endSearch();
00138 }
00139
00140 return cbTuple;
00141 }
00142
00143 bool BTreeWriter::attemptInsertWithoutSplit(
00144 BTreePageLock &targetPageLock,
00145 PConstBuffer pTupleBuffer,uint cbTuple,uint iPosition)
00146 {
00147 BTreeNode *pNode = &(targetPageLock.getNodeForWrite());
00148 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(*pNode);
00149 switch (nodeAccessor.calculateCapacity(*pNode,cbTuple)) {
00150 case BTreeNodeAccessor::CAN_FIT:
00151 break;
00152 case BTreeNodeAccessor::CAN_FIT_WITH_COMPACTION:
00153 compactNode(pageLock);
00154
00155
00156 pNode = &(targetPageLock.getNodeForWrite());
00157 assert(nodeAccessor.calculateCapacity(*pNode,cbTuple)
00158 == BTreeNodeAccessor::CAN_FIT);
00159
00160 #if 0
00161 std::cerr << "AFTER COMPACTION:" << std::endl;
00162 nodeAccessor.dumpNode(std::cerr,*pNode,pageId);
00163 #endif
00164
00165 break;
00166 case BTreeNodeAccessor::CAN_NOT_FIT:
00167 return false;
00168 }
00169 PBuffer pBuffer = nodeAccessor.allocateEntry(*pNode,iPosition,cbTuple);
00170 memcpy(pBuffer,pTupleBuffer,cbTuple);
00171 return true;
00172 }
00173
00174 void BTreeWriter::compactNode(BTreePageLock &targetPageLock)
00175 {
00176 BTreeNode &node = targetPageLock.getNodeForWrite();
00177 if (!scratchPageLock.isLocked()) {
00178 scratchPageLock.allocatePage();
00179 }
00180 BTreeNode &scratchNode = scratchPageLock.getNodeForWrite();
00181 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00182 nodeAccessor.clearNode(scratchNode,getSegment()->getUsablePageSize());
00183 nodeAccessor.compactNode(node,scratchNode);
00184 pageLock.swapBuffers(scratchPageLock);
00185 }
00186
00187 void BTreeWriter::splitCurrentNode(
00188 PConstBuffer pTupleBuffer,uint cbTuple,uint iNewTuple)
00189 {
00190
00191 PageId leftPageId = pageId;
00192 BTreeNode &node = pageLock.getNodeForWrite();
00193 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00194
00195
00196 BTreePageLock newPageLock(treeDescriptor.segmentAccessor);
00197 PageId newPageId = newPageLock.allocatePage(getPageOwnerId());
00198 BTreeNode &newNode = newPageLock.getNodeForWrite();
00199 nodeAccessor.clearNode(newNode,getSegment()->getUsablePageSize());
00200
00201 #if 0
00202 std::cerr << "BEFORE SPLIT:" << std::endl;
00203 nodeAccessor.dumpNode(std::cerr,node,pageId);
00204 #endif
00205
00206 setRightSibling(newNode,newPageId,node.rightSibling);
00207 setRightSibling(node,pageId,newPageId);
00208
00209 nodeAccessor.splitNode(node,newNode,cbTuple,monotonic);
00210
00211 #if 0
00212 std::cerr << "LEFT AFTER SPLIT:" << std::endl;
00213 nodeAccessor.dumpNode(std::cerr,node,pageId);
00214 std::cerr << "RIGHT AFTER SPLIT:" << std::endl;
00215 nodeAccessor.dumpNode(std::cerr,newNode,newPageId);
00216 #endif
00217
00218
00219 BTreePageLock *pLockForNewTuple = NULL;
00220 if (iNewTuple < node.nEntries) {
00221
00222 pLockForNewTuple = &pageLock;
00223 } else if (iNewTuple > node.nEntries) {
00224
00225 pLockForNewTuple = &newPageLock;
00226 } else {
00227
00228 if (nodeAccessor.calculateCapacity(newNode,cbTuple) !=
00229 BTreeNodeAccessor::CAN_NOT_FIT)
00230 {
00231 pLockForNewTuple = &newPageLock;
00232 } else {
00233 pLockForNewTuple = &pageLock;
00234 }
00235 }
00236 if (pLockForNewTuple == &newPageLock) {
00237 iNewTuple -= node.nEntries;
00238 }
00239
00240
00241
00242 bool inserted = attemptInsertWithoutSplit(
00243 *pLockForNewTuple,pTupleBuffer,cbTuple,iNewTuple);
00244 permAssert(inserted);
00245
00246
00247
00248
00249
00250
00251 BTreeNode const &leftNode = pageLock.getNodeForRead();
00252 BTreeNode const &rightNode = newPageLock.getNodeForRead();
00253
00254 if (pageStack.empty()) {
00255
00256 grow(rightNode, newPageId);
00257
00258
00259
00260
00261
00262 return;
00263 }
00264
00265
00266
00267 assert(leftNode.nEntries);
00268 if (monotonic) {
00269 pageLock.flushPage(true);
00270 }
00271
00272
00273 BTreeNodeAccessor &upperNodeAccessor = *pNonLeafNodeAccessor;
00274
00275
00276
00277 nodeAccessor.accessTuple(leftNode,leftNode.nEntries - 1);
00278 nodeAccessor.unmarshalKey(upperNodeAccessor.tupleData);
00279 upperNodeAccessor.tupleData.back().pData =
00280 reinterpret_cast<PConstBuffer>(&leftPageId);
00281 upperNodeAccessor.tupleAccessor.marshal(
00282 upperNodeAccessor.tupleData,
00283 splitTupleBuffer.get());
00284
00285
00286
00287
00288
00289 uint nRightKeys = nodeAccessor.getKeyCount(rightNode);
00290 nodeAccessor.accessTuple(rightNode,nRightKeys - 1);
00291 nodeAccessor.unmarshalKey(searchKeyData);
00292
00293
00294
00295
00296
00297 bool rightMostNode = (rightNode.rightSibling == NULL_PAGE_ID);
00298 assert(!monotonic || (monotonic && rightMostNode));
00299 uint iPosition = lockParentPage(leftNode.height, rightMostNode);
00300
00301
00302
00303
00304
00305
00306
00307 BTreeNode &upperNode = pageLock.getNodeForWrite();
00308 upperNodeAccessor.tupleAccessor.setCurrentTupleBuf(
00309 const_cast<PBuffer>(
00310 upperNodeAccessor.getEntryForRead(upperNode,iPosition)));
00311 TupleDatum &childDatum = upperNodeAccessor.tupleData.back();
00312 pChildAccessor->unmarshalValue(
00313 upperNodeAccessor.tupleAccessor,
00314 childDatum);
00315 PageId &childPageId =
00316 *(reinterpret_cast<PageId *>(const_cast<PBuffer>(childDatum.pData)));
00317 assert(childPageId == leftPageId);
00318 childPageId = newPageId;
00319
00320
00321
00322 upperNodeAccessor.tupleAccessor.setCurrentTupleBuf(splitTupleBuffer.get());
00323 cbTuple = upperNodeAccessor.tupleAccessor.getCurrentByteCount();
00324 inserted = attemptInsertWithoutSplit(
00325 pageLock,splitTupleBuffer.get(),cbTuple,iPosition);
00326 if (inserted) {
00327
00328 return;
00329 }
00330
00331
00332
00333
00334 newPageLock.unlock();
00335
00336 splitCurrentNode(splitTupleBuffer.get(),cbTuple,iPosition);
00337 }
00338
00339 uint BTreeWriter::lockParentPage(uint height, bool rightMostNode)
00340 {
00341 assert(!pageStack.empty());
00342 pageId = pageStack.back();
00343 pageStack.pop_back();
00344
00345 uint iPosition;
00346 for (;;) {
00347 pageLock.lockPageWithCoupling(pageId,LOCKMODE_X);
00348
00349 bool found;
00350 BTreeNode const &parentNode = pageLock.getNodeForRead();
00351 if (pageId == getRootPageId() && parentNode.height != height+1) {
00352
00353
00354
00355
00356
00357
00358
00359
00360 pageLock.unlock();
00361 BTreePageLock stackPageLock(treeDescriptor.segmentAccessor);
00362 uint iPosition;
00363 for (;;) {
00364 stackPageLock.lockPageWithCoupling(pageId,LOCKMODE_S);
00365 bool found;
00366 BTreeNode const &node = stackPageLock.getNodeForRead();
00367 if (node.height == height) {
00368
00369 stackPageLock.unlock();
00370 break;
00371 }
00372 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00373
00374 iPosition = nodeAccessor.binarySearch(
00375 node,
00376 keyDescriptor,
00377 searchKeyData,
00378 DUP_SEEK_ANY,
00379 true,
00380 nodeAccessor.tupleData,
00381 found);
00382 nodeAccessor.accessTuple(node, iPosition);
00383 nodeAccessor.unmarshalKey(nodeAccessor.tupleData);
00384 if (iPosition == nodeAccessor.getKeyCount(node)) {
00385 assert(!found);
00386
00387
00388 if (node.rightSibling != NULL_PAGE_ID) {
00389 pageId = node.rightSibling;
00390 continue;
00391 }
00392 }
00393 pageStack.push_back(pageId);
00394 pageId = *(PageId *)(nodeAccessor.tupleData.back().pData);
00395 stackPageLock.unlock();
00396 }
00397 pageId = pageStack.back();
00398 pageStack.pop_back();
00399
00400 pageLock.lockPageWithCoupling(pageId,LOCKMODE_X);
00401 }
00402
00403
00404 BTreeNode const &node = pageLock.getNodeForRead();
00405
00406 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00407 if (rightMostNode) {
00408
00409
00410 iPosition = nodeAccessor.getKeyCount(node);
00411 break;
00412 }
00413
00414
00415
00416 iPosition = nodeAccessor.binarySearch(
00417 node,
00418 keyDescriptor,
00419 searchKeyData,
00420 DUP_SEEK_ANY,
00421 true,
00422 nodeAccessor.tupleData,
00423 found);
00424
00425 if (iPosition == nodeAccessor.getKeyCount(node)) {
00426 assert(!found);
00427
00428
00429 if (node.rightSibling != NULL_PAGE_ID) {
00430 pageId = node.rightSibling;
00431 continue;
00432 } else {
00433 break;
00434 }
00435 }
00436
00437 break;
00438 }
00439 return iPosition;
00440 }
00441
00442 void BTreeWriter::grow(
00443 BTreeNode const &rightNode, PageId rightPageId)
00444 {
00445 BTreeNode &node = pageLock.getNodeForWrite();
00446 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00447
00448
00449
00450
00451 #if 0
00452 std::cerr << "before grow root:" << std::endl;
00453 nodeAccessor.dumpNode(std::cerr,node,pageId);
00454 std::cerr << "before grow right:" << std::endl;
00455 nodeAccessor.dumpNode(std::cerr,rightNode,rightPageId);
00456 #endif
00457
00458 BTreePageLock newLeftPageLock(treeDescriptor.segmentAccessor);
00459 PageId newLeftPageId = newLeftPageLock.allocatePage(getPageOwnerId());
00460 BTreeNode &newLeftNode = newLeftPageLock.getNodeForWrite();
00461 nodeAccessor.clearNode(newLeftNode,getSegment()->getUsablePageSize());
00462
00463 newLeftNode = node;
00464 memcpy(
00465 newLeftNode.getDataForWrite(),
00466 node.getDataForRead(),
00467 getSegment()->getUsablePageSize() - sizeof(BTreeNode));
00468
00469
00470
00471 getSegment()->setPageSuccessor(newLeftPageId, rightPageId);
00472 getSegment()->setPageSuccessor(pageId, NULL_PAGE_ID);
00473
00474
00475
00476
00477 nodeAccessor.clearNode(node,getSegment()->getUsablePageSize());
00478 node.height = newLeftNode.height + 1;
00479 BTreeNodeAccessor &rootNodeAccessor = getNonLeafNodeAccessor(node);
00480
00481
00482
00483 nodeAccessor.accessTuple(newLeftNode, newLeftNode.nEntries - 1);
00484 nodeAccessor.unmarshalKey(rootNodeAccessor.tupleData);
00485 rootNodeAccessor.tupleData.back().pData =
00486 reinterpret_cast<PConstBuffer>(&newLeftPageId);
00487 uint cb = rootNodeAccessor.tupleAccessor.getByteCount(
00488 rootNodeAccessor.tupleData);
00489 PBuffer pBuffer1 = rootNodeAccessor.allocateEntry(node,0,cb);
00490 rootNodeAccessor.tupleAccessor.marshal(
00491 rootNodeAccessor.tupleData, pBuffer1);
00492
00493 nodeAccessor.accessTuple(rightNode, rightNode.nEntries - 1);
00494 nodeAccessor.unmarshalKey(rootNodeAccessor.tupleData);
00495 rootNodeAccessor.tupleData.back().pData =
00496 reinterpret_cast<PConstBuffer>(&rightPageId);
00497 cb = rootNodeAccessor.tupleAccessor.getByteCount(
00498 rootNodeAccessor.tupleData);
00499
00500 PBuffer pBuffer2 = rootNodeAccessor.allocateEntry(node,1,cb);
00501 rootNodeAccessor.tupleAccessor.marshal(
00502 rootNodeAccessor.tupleData, pBuffer2);
00503 #if 0
00504 std::cerr << "after grow root:" << std::endl;
00505 rootNodeAccessor.dumpNode(std::cerr,node,pageId);
00506 std::cerr << "after grow left:" << std::endl;
00507 nodeAccessor.dumpNode(std::cerr,newLeftNode,newLeftPageId);
00508 std::cerr << "after grow right:" << std::endl;
00509 nodeAccessor.dumpNode(std::cerr,rightNode,rightPageId);
00510 #endif
00511 if (monotonic) {
00512 newLeftPageLock.flushPage(true);
00513 }
00514 newLeftPageLock.unlock();
00515 return;
00516 }
00517
00518 inline void BTreeWriter::optimizeRootLockMode()
00519 {
00520 if (pageId != getRootPageId()) {
00521
00522
00523 rootLockMode = LOCKMODE_S;
00524 }
00525 }
00526
00527 void BTreeWriter::deleteCurrent()
00528 {
00529
00530
00531 assert(pageLock.isLocked());
00532
00533 optimizeRootLockMode();
00534
00535 BTreeNode &node = pageLock.getNodeForWrite();
00536 BTreeNodeAccessor &nodeAccessor = getLeafNodeAccessor(node);
00537
00538 if (isLoggingEnabled()) {
00539 uint cbTuple = nodeAccessor.tupleAccessor.getCurrentByteCount();
00540 ByteOutputStream &logStream = getLogicalTxn()->beginLogicalAction(
00541 *this,
00542 ACTION_DELETE);
00543 memcpy(
00544 logStream.getWritePointer(cbTuple),
00545 nodeAccessor.tupleAccessor.getCurrentTupleBuf(),
00546 cbTuple);
00547 logStream.consumeWritePointer(cbTuple);
00548 getLogicalTxn()->endLogicalAction();
00549 }
00550
00551 nodeAccessor.deallocateEntry(node,iTupleOnLowestLevel);
00552
00553
00554 --iTupleOnLowestLevel;
00555 }
00556
00557 bool BTreeWriter::updateCurrent(TupleData const &tupleData)
00558 {
00559
00560
00561 PBuffer pTupleBuf;
00562
00563 assert(pageLock.isLocked());
00564
00565 optimizeRootLockMode();
00566
00567 BTreeNode *pNode = &(pageLock.getNodeForWrite());
00568 BTreeNodeAccessor &nodeAccessor = getLeafNodeAccessor(*pNode);
00569
00570 assert(!isLoggingEnabled());
00571
00572 if (nodeAccessor.hasFixedWidthEntries()) {
00573
00574 pTupleBuf = const_cast<PBuffer>(
00575 nodeAccessor.getEntryForRead(*pNode,iTupleOnLowestLevel));
00576 nodeAccessor.tupleAccessor.marshal(tupleData,pTupleBuf);
00577 return true;
00578 }
00579
00580
00581 uint cbTuple =
00582 nodeAccessor.tupleAccessor.getByteCount(tupleData);
00583 int cbDelta =
00584 cbTuple - nodeAccessor.tupleAccessor.getCurrentByteCount();
00585 if (cbDelta < 0) {
00586 cbDelta = 0;
00587 }
00588
00589
00590
00591 switch (nodeAccessor.calculateCapacity(*pNode,cbDelta)) {
00592 case BTreeNodeAccessor::CAN_FIT:
00593 nodeAccessor.deallocateEntry(*pNode,iTupleOnLowestLevel);
00594 break;
00595 case BTreeNodeAccessor::CAN_FIT_WITH_COMPACTION:
00596 nodeAccessor.deallocateEntry(*pNode,iTupleOnLowestLevel);
00597 compactNode(pageLock);
00598
00599
00600 pNode = &(pageLock.getNodeForWrite());
00601 assert(nodeAccessor.calculateCapacity(*pNode,cbTuple)
00602 == BTreeNodeAccessor::CAN_FIT);
00603 break;
00604 case BTreeNodeAccessor::CAN_NOT_FIT:
00605 return false;
00606 default:
00607 permAssert(false);
00608 }
00609
00610 pTupleBuf = nodeAccessor.allocateEntry(*pNode,iTupleOnLowestLevel,cbTuple);
00611 nodeAccessor.tupleAccessor.marshal(tupleData,pTupleBuf);
00612 return true;
00613 }
00614
00615 LogicalTxnClassId BTreeWriter::getParticipantClassId() const
00616 {
00617 return BTreeRecoveryFactory::getParticipantClassId();
00618 }
00619
00620 void BTreeWriter::describeParticipant(
00621 ByteOutputStream &logStream)
00622 {
00623 PageId rootPageId = getRootPageId();
00624 logStream.writeValue(rootPageId);
00625 getTupleDescriptor().writePersistent(logStream);
00626 getKeyProjection().writePersistent(logStream);
00627 }
00628
00629 void BTreeWriter::undoLogicalAction(
00630 LogicalActionType actionType,
00631 ByteInputStream &logStream)
00632 {
00633 switch (actionType) {
00634 case ACTION_INSERT:
00635 deleteLogged(logStream);
00636 break;
00637 case ACTION_DELETE:
00638 insertLogged(logStream);
00639 break;
00640 default:
00641 permAssert(false);
00642 }
00643 }
00644
00645 void BTreeWriter::redoLogicalAction(
00646 LogicalActionType actionType,
00647 ByteInputStream &logStream)
00648 {
00649 switch (actionType) {
00650 case ACTION_INSERT:
00651 insertLogged(logStream);
00652 break;
00653 case ACTION_DELETE:
00654 deleteLogged(logStream);
00655 break;
00656 default:
00657 permAssert(false);
00658 }
00659 }
00660
00661 void BTreeWriter::insertLogged(ByteInputStream &logStream)
00662 {
00663
00664 uint cbTuple = insertTupleFromBuffer(
00665 logStream.getReadPointer(1),DUP_ALLOW);
00666 logStream.consumeReadPointer(cbTuple);
00667 }
00668
00669 void BTreeWriter::deleteLogged(ByteInputStream &logStream)
00670 {
00671 pLeafNodeAccessor->tupleAccessor.setCurrentTupleBuf(
00672 logStream.getReadPointer(1));
00673 uint cbTuple = pLeafNodeAccessor->tupleAccessor.getCurrentByteCount();
00674 pLeafNodeAccessor->unmarshalKey(searchKeyData);
00675 if (searchForKey(searchKeyData,DUP_SEEK_ANY)) {
00676 deleteCurrent();
00677 } else {
00678
00679 }
00680 endSearch();
00681 logStream.consumeReadPointer(cbTuple);
00682 }
00683
00684 void BTreeWriter::releaseScratchBuffers()
00685 {
00686 scratchPageLock.unlock();
00687 }
00688
00689 bool BTreeWriter::positionSearchKey(BTreeNodeAccessor &nodeAccessor)
00690 {
00691 nodeAccessor.unmarshalKey(searchKeyData);
00692 pageStack.clear();
00693 DuplicateSeek seekMode = (monotonic) ? DUP_SEEK_END : DUP_SEEK_ANY;
00694 bool duplicate = searchForKeyTemplate< true, std::vector<PageId> >(
00695 searchKeyData,seekMode,true,pageStack,getRootPageId(),rootLockMode,
00696 READ_ALL);
00697 return duplicate;
00698 }
00699
00700 bool BTreeWriter::checkMonotonicity(
00701 BTreeNodeAccessor &nodeAccessor, PConstBuffer pTupleBuffer)
00702 {
00703
00704 if (iTupleOnLowestLevel == 0) {
00705 return true;
00706 }
00707 BTreeNode const &node = pageLock.getNodeForRead();
00708 accessTupleInline(node, iTupleOnLowestLevel - 1);
00709 nodeAccessor.unmarshalKey(comparisonKeyData);
00710
00711 nodeAccessor.tupleAccessor.setCurrentTupleBuf(pTupleBuffer);
00712 nodeAccessor.unmarshalKey(searchKeyData);
00713 int keyComp = keyDescriptor.compareTuples(comparisonKeyData, searchKeyData);
00714
00715 return (keyComp <= 0 && node.nEntries == iTupleOnLowestLevel);
00716 }
00717 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $");
00718
00719