BTreeWriter.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/btree/BTreeWriter.h"
00026 #include "fennel/btree/BTreeRecoveryFactory.h"
00027 #include "fennel/btree/BTreeAccessBaseImpl.h"
00028 #include "fennel/btree/BTreeReaderImpl.h"
00029 #include "fennel/btree/BTreeDuplicateKeyExcn.h"
00030 #include "fennel/txn/LogicalTxn.h"
00031 #include "fennel/common/ByteOutputStream.h"
00032 #include "fennel/common/ByteInputStream.h"
00033 #include "fennel/tuple/TuplePrinter.h"
00034 
00035 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $");
00036 
00037 
00038 BTreeWriter::BTreeWriter(
00039     BTreeDescriptor const &descriptor,
00040     SegmentAccessor const &scratchAccessorInit,
00041     bool monotonicInit)
00042     : BTreeReader(descriptor),
00043       scratchAccessor(scratchAccessorInit)
00044 {
00045     // we need exclusive locks on leaves for update operations
00046     leafLockMode = LOCKMODE_X;
00047     scratchPageLock.accessSegment(scratchAccessor);
00048     splitTupleBuffer.reset(
00049         new FixedBuffer[pNonLeafNodeAccessor->tupleAccessor.getMaxByteCount()]);
00050     leafTupleBuffer.reset(
00051         new FixedBuffer[pLeafNodeAccessor->tupleAccessor.getMaxByteCount()]);
00052     monotonic = monotonicInit;
00053 }
00054 
00055 BTreeWriter::~BTreeWriter()
00056 {
00057 }
00058 
00059 void BTreeWriter::insertTupleData(
00060     TupleData const &tupleData,Distinctness distinctness)
00061 {
00062     // TODO:  a small optimization is to use unmarshalled key data directly
00063     pLeafNodeAccessor->tupleAccessor.marshal(tupleData,leafTupleBuffer.get());
00064     insertTupleFromBuffer(leafTupleBuffer.get(),distinctness);
00065 }
00066 
00067 uint BTreeWriter::insertTupleFromBuffer(
00068     PConstBuffer pTupleBuffer,Distinctness distinctness)
00069 {
00070     BTreeNodeAccessor &nodeAccessor = *pLeafNodeAccessor;
00071     nodeAccessor.tupleAccessor.setCurrentTupleBuf(pTupleBuffer);
00072 
00073     validateTupleSize(nodeAccessor.tupleAccessor);
00074 
00075     uint cbTuple = nodeAccessor.tupleAccessor.getCurrentByteCount();
00076 
00077 #if 0
00078     TuplePrinter tuplePrinter;
00079     tuplePrinter.print(std::cout, keyDescriptor, searchKeyData);
00080     std::cout << std::endl;
00081 #endif
00082 
00083     // monotonic inserts do not need to search for key; we will always
00084     // be inserting after where we are positioned except the first time
00085     // through or after a split, where we will need to do an initial search
00086     // to setup the appropriate position
00087     if (monotonic) {
00088         if (isPositioned()) {
00089             ++iTupleOnLowestLevel;
00090             assert(checkMonotonicity(nodeAccessor, pTupleBuffer));
00091         } else {
00092             positionSearchKey(nodeAccessor);
00093         }
00094     } else {
00095         bool duplicate = positionSearchKey(nodeAccessor);
00096 
00097         // REVIEW:  This implements the SQL semantics whereby keys with null
00098         // values are considered duplicates for DISTINCT but not for UNIQUE.
00099         // Should probably introduce new DUP_ options to make it explicit.
00100         if (duplicate) {
00101             switch (distinctness) {
00102             case DUP_ALLOW:
00103                 break;
00104             case DUP_DISCARD:
00105                 endSearch();
00106                 return cbTuple;
00107             default:
00108                 if (!searchKeyData.containsNull()) {
00109                     endSearch();
00110                     throw BTreeDuplicateKeyExcn(keyDescriptor,searchKeyData);
00111                 }
00112             }
00113         }
00114     }
00115 
00116     if (isLoggingEnabled()) {
00117         ByteOutputStream &logStream = getLogicalTxn()->beginLogicalAction(
00118             *this,
00119             ACTION_INSERT);
00120         memcpy(
00121             logStream.getWritePointer(cbTuple),
00122             pTupleBuffer,
00123             cbTuple);
00124         logStream.consumeWritePointer(cbTuple);
00125         getLogicalTxn()->endLogicalAction();
00126     }
00127 
00128     bool split = !attemptInsertWithoutSplit(
00129         pageLock,pTupleBuffer,cbTuple,iTupleOnLowestLevel);
00130     if (split) {
00131         splitCurrentNode(pTupleBuffer,cbTuple,iTupleOnLowestLevel);
00132     }
00133 
00134     // restart the search after a split even if monotonic insert to
00135     // ensure correct path to newnode
00136     if (!monotonic || split) {
00137         endSearch();
00138     }
00139 
00140     return cbTuple;
00141 }
00142 
00143 bool BTreeWriter::attemptInsertWithoutSplit(
00144     BTreePageLock &targetPageLock,
00145     PConstBuffer pTupleBuffer,uint cbTuple,uint iPosition)
00146 {
00147     BTreeNode *pNode = &(targetPageLock.getNodeForWrite());
00148     BTreeNodeAccessor &nodeAccessor = getNodeAccessor(*pNode);
00149     switch (nodeAccessor.calculateCapacity(*pNode,cbTuple)) {
00150     case BTreeNodeAccessor::CAN_FIT:
00151         break;
00152     case BTreeNodeAccessor::CAN_FIT_WITH_COMPACTION:
00153         compactNode(pageLock);
00154         // compactNode uses swapBuffers, which is why we have to use a pointer
00155         // rather than a reference for pNode
00156         pNode = &(targetPageLock.getNodeForWrite());
00157         assert(nodeAccessor.calculateCapacity(*pNode,cbTuple)
00158                == BTreeNodeAccessor::CAN_FIT);
00159 
00160 #if 0
00161         std::cerr << "AFTER COMPACTION:" << std::endl;
00162         nodeAccessor.dumpNode(std::cerr,*pNode,pageId);
00163 #endif
00164 
00165         break;
00166     case BTreeNodeAccessor::CAN_NOT_FIT:
00167         return false;
00168     }
00169     PBuffer pBuffer = nodeAccessor.allocateEntry(*pNode,iPosition,cbTuple);
00170     memcpy(pBuffer,pTupleBuffer,cbTuple);
00171     return true;
00172 }
00173 
00174 void BTreeWriter::compactNode(BTreePageLock &targetPageLock)
00175 {
00176     BTreeNode &node = targetPageLock.getNodeForWrite();
00177     if (!scratchPageLock.isLocked()) {
00178         scratchPageLock.allocatePage();
00179     }
00180     BTreeNode &scratchNode = scratchPageLock.getNodeForWrite();
00181     BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00182     nodeAccessor.clearNode(scratchNode,getSegment()->getUsablePageSize());
00183     nodeAccessor.compactNode(node,scratchNode);
00184     pageLock.swapBuffers(scratchPageLock);
00185 }
00186 
00187 void BTreeWriter::splitCurrentNode(
00188     PConstBuffer pTupleBuffer,uint cbTuple,uint iNewTuple)
00189 {
00190     // Current node will be on left after split.
00191     PageId leftPageId = pageId;
00192     BTreeNode &node = pageLock.getNodeForWrite();
00193     BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00194 
00195     // New node will be on right after split.
00196     BTreePageLock newPageLock(treeDescriptor.segmentAccessor);
00197     PageId newPageId = newPageLock.allocatePage(getPageOwnerId());
00198     BTreeNode &newNode = newPageLock.getNodeForWrite();
00199     nodeAccessor.clearNode(newNode,getSegment()->getUsablePageSize());
00200 
00201 #if 0
00202     std::cerr << "BEFORE SPLIT:" << std::endl;
00203     nodeAccessor.dumpNode(std::cerr,node,pageId);
00204 #endif
00205 
00206     setRightSibling(newNode,newPageId,node.rightSibling);
00207     setRightSibling(node,pageId,newPageId);
00208 
00209     nodeAccessor.splitNode(node,newNode,cbTuple,monotonic);
00210 
00211 #if 0
00212     std::cerr << "LEFT AFTER SPLIT:" << std::endl;
00213     nodeAccessor.dumpNode(std::cerr,node,pageId);
00214     std::cerr << "RIGHT AFTER SPLIT:" << std::endl;
00215     nodeAccessor.dumpNode(std::cerr,newNode,newPageId);
00216 #endif
00217 
00218     // Figure out where new tuple goes
00219     BTreePageLock *pLockForNewTuple = NULL;
00220     if (iNewTuple < node.nEntries) {
00221         // definitely on left
00222         pLockForNewTuple = &pageLock;
00223     } else if (iNewTuple > node.nEntries) {
00224         // definitely on right
00225         pLockForNewTuple = &newPageLock;
00226     } else {
00227         // could go either way depending on how the balancing worked out
00228         if (nodeAccessor.calculateCapacity(newNode,cbTuple) !=
00229             BTreeNodeAccessor::CAN_NOT_FIT)
00230         {
00231             pLockForNewTuple = &newPageLock;
00232         } else {
00233             pLockForNewTuple = &pageLock;
00234         }
00235     }
00236     if (pLockForNewTuple == &newPageLock) {
00237         iNewTuple -= node.nEntries;
00238     }
00239 
00240     // NOTE:  After this,  either the node or newNode reference may be
00241     // invalid, so don't use them.
00242     bool inserted = attemptInsertWithoutSplit(
00243         *pLockForNewTuple,pTupleBuffer,cbTuple,iNewTuple);
00244     permAssert(inserted);
00245 
00246     // We cannot safely access node or newNode any more, so from here
00247     // on use these instead:
00248     // leftNode == node
00249     // rightNode == newNode
00250 
00251     BTreeNode const &leftNode = pageLock.getNodeForRead();
00252     BTreeNode const &rightNode = newPageLock.getNodeForRead();
00253 
00254     if (pageStack.empty()) {
00255         // We're splitting the root:  time to grow the tree.
00256         grow(rightNode, newPageId);
00257 
00258         // NOTE:  newPageLock will unlock automatically on return,
00259         // and pageLock will be unlocked by the endSearch() call
00260         // in insertTupleFromBuffer, so there's no need to
00261         // explicitly unlock either one here.
00262         return;
00263     }
00264 
00265     // We're done splitting current node; now update parent node with new keys
00266     // from split.  In the case of monotonic inserts, flush the left node.
00267     assert(leftNode.nEntries);
00268     if (monotonic) {
00269         pageLock.flushPage(true);
00270     }
00271 
00272     // This will be the same as nodeAccessor if we're splitting a non-leaf.
00273     BTreeNodeAccessor &upperNodeAccessor = *pNonLeafNodeAccessor;
00274 
00275     // Prepare the parent entry to associate with the left node,
00276     // and marshal it into splitTupleBuffer.
00277     nodeAccessor.accessTuple(leftNode,leftNode.nEntries - 1);
00278     nodeAccessor.unmarshalKey(upperNodeAccessor.tupleData);
00279     upperNodeAccessor.tupleData.back().pData =
00280         reinterpret_cast<PConstBuffer>(&leftPageId);
00281     upperNodeAccessor.tupleAccessor.marshal(
00282         upperNodeAccessor.tupleData,
00283         splitTupleBuffer.get());
00284 
00285     // Save a copy of the right page's high key in searchKeyData; we'll need it
00286     // later inside of lockParentPage.  Note that this means rightNode needs to
00287     // remain locked for the duration, because searchKeyData will point into
00288     // it.
00289     uint nRightKeys = nodeAccessor.getKeyCount(rightNode);
00290     nodeAccessor.accessTuple(rightNode,nRightKeys - 1);
00291     nodeAccessor.unmarshalKey(searchKeyData);
00292 
00293     // Now, lock parent page and find the position of the entry pointing
00294     // to the original node (pre-split).  If that node is the rightmost node,
00295     // then the position in the parent node corresponds to the infinity
00296     // key, i.e., the last entry in the node.
00297     bool rightMostNode = (rightNode.rightSibling == NULL_PAGE_ID);
00298     assert(!monotonic || (monotonic && rightMostNode));
00299     uint iPosition = lockParentPage(leftNode.height, rightMostNode);
00300 
00301     // TODO jvs 12-Feb-2006: The code below uses getEntryForRead, even though
00302     // we're actually going to write; should either rename that method or add a
00303     // new getEntryForWrite which returns a non-const pointer.
00304 
00305     // Update the original entry in the parent, changing its child pointer
00306     // to reference newPageId instead of leftPageId.
00307     BTreeNode &upperNode = pageLock.getNodeForWrite();
00308     upperNodeAccessor.tupleAccessor.setCurrentTupleBuf(
00309         const_cast<PBuffer>(
00310             upperNodeAccessor.getEntryForRead(upperNode,iPosition)));
00311     TupleDatum &childDatum = upperNodeAccessor.tupleData.back();
00312     pChildAccessor->unmarshalValue(
00313         upperNodeAccessor.tupleAccessor,
00314         childDatum);
00315     PageId &childPageId =
00316         *(reinterpret_cast<PageId *>(const_cast<PBuffer>(childDatum.pData)));
00317     assert(childPageId == leftPageId);
00318     childPageId = newPageId;
00319 
00320     // Finally, insert the splitter entry; this may entail recursively
00321     // splitting again.
00322     upperNodeAccessor.tupleAccessor.setCurrentTupleBuf(splitTupleBuffer.get());
00323     cbTuple = upperNodeAccessor.tupleAccessor.getCurrentByteCount();
00324     inserted = attemptInsertWithoutSplit(
00325         pageLock,splitTupleBuffer.get(),cbTuple,iPosition);
00326     if (inserted) {
00327         // The entry fit; no need for recursion.
00328         return;
00329     }
00330 
00331     // Oops, couldn't fit:  go recursive.
00332 
00333     // REVIEW jvs 12-Feb-2006:  split locking in general.
00334     newPageLock.unlock();
00335 
00336     splitCurrentNode(splitTupleBuffer.get(),cbTuple,iPosition);
00337 }
00338 
00339 uint BTreeWriter::lockParentPage(uint height, bool rightMostNode)
00340 {
00341     assert(!pageStack.empty());
00342     pageId = pageStack.back();
00343     pageStack.pop_back();
00344 
00345     uint iPosition;
00346     for (;;) {
00347         pageLock.lockPageWithCoupling(pageId,LOCKMODE_X);
00348 
00349         bool found;
00350         BTreeNode const &parentNode = pageLock.getNodeForRead();
00351         if (pageId == getRootPageId() && parentNode.height != height+1) {
00352             // REVIEW jvs 12-Feb-2006:  I promised Xiaoyang I would
00353             // review this code, but I still haven't gotten to it.
00354             // Should only get here in page-level concurrency scenarios.
00355             // Also need to devise some tests which force this situation.
00356 
00357             // the tree has grown during the time.
00358             // the new root is not the parent any more.
00359             //
00360             pageLock.unlock();
00361             BTreePageLock stackPageLock(treeDescriptor.segmentAccessor);
00362             uint iPosition;
00363             for (;;) {
00364                 stackPageLock.lockPageWithCoupling(pageId,LOCKMODE_S);
00365                 bool found;
00366                 BTreeNode const &node = stackPageLock.getNodeForRead();
00367                 if (node.height == height) {
00368                     // it is the same level now. get out of the loop.
00369                     stackPageLock.unlock();
00370                     break;
00371                 }
00372                 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00373 
00374                 iPosition = nodeAccessor.binarySearch(
00375                     node,
00376                     keyDescriptor,
00377                     searchKeyData,
00378                     DUP_SEEK_ANY,
00379                     true,
00380                     nodeAccessor.tupleData,
00381                     found);
00382                 nodeAccessor.accessTuple(node, iPosition);
00383                 nodeAccessor.unmarshalKey(nodeAccessor.tupleData);
00384                 if (iPosition == nodeAccessor.getKeyCount(node)) {
00385                     assert(!found);
00386                     // What we're searching for is bigger than everything on
00387                     // this node.  Have to search right.
00388                     if (node.rightSibling != NULL_PAGE_ID) {
00389                         pageId = node.rightSibling;
00390                         continue;
00391                     }
00392                 }
00393                 pageStack.push_back(pageId);
00394                 pageId = *(PageId *)(nodeAccessor.tupleData.back().pData);
00395                 stackPageLock.unlock();
00396             }
00397             pageId = pageStack.back();
00398             pageStack.pop_back();
00399             // now lock the real parent.
00400             pageLock.lockPageWithCoupling(pageId,LOCKMODE_X);
00401         }
00402 
00403         // we can not use parentNode because it might be changed.
00404         BTreeNode const &node = pageLock.getNodeForRead();
00405 
00406         BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00407         if (rightMostNode) {
00408             // If the split node is the rightmost node, return the position of
00409             // the rightmost entry in the parent node.
00410             iPosition = nodeAccessor.getKeyCount(node);
00411             break;
00412         }
00413 
00414         // TODO:  deal with duplicates here and above
00415 
00416         iPosition = nodeAccessor.binarySearch(
00417             node,
00418             keyDescriptor,
00419             searchKeyData,
00420             DUP_SEEK_ANY,
00421             true,
00422             nodeAccessor.tupleData,
00423             found);
00424 
00425         if (iPosition == nodeAccessor.getKeyCount(node)) {
00426             assert(!found);
00427             // What we're searching for is bigger than everything on
00428             // this node.  Have to search right.
00429             if (node.rightSibling != NULL_PAGE_ID) {
00430                 pageId = node.rightSibling;
00431                 continue;
00432             } else {
00433                 break;
00434             }
00435         }
00436         // it could be before the first entry, so not found is OK!
00437         break;
00438     }
00439     return iPosition;
00440 }
00441 
00442 void BTreeWriter::grow(
00443     BTreeNode const &rightNode, PageId rightPageId)
00444 {
00445     BTreeNode &node = pageLock.getNodeForWrite();
00446     BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node);
00447     // GROW BTREE
00448     // the btree needs to grow. The root should have
00449     // two entries: one to the left page and one the right page.
00450     // and the btree needs to keep the old root page id.
00451 #if  0
00452     std::cerr << "before grow root:" << std::endl;
00453     nodeAccessor.dumpNode(std::cerr,node,pageId);
00454     std::cerr << "before grow right:" << std::endl;
00455     nodeAccessor.dumpNode(std::cerr,rightNode,rightPageId);
00456 #endif
00457 
00458     BTreePageLock newLeftPageLock(treeDescriptor.segmentAccessor);
00459     PageId newLeftPageId = newLeftPageLock.allocatePage(getPageOwnerId());
00460     BTreeNode &newLeftNode = newLeftPageLock.getNodeForWrite();
00461     nodeAccessor.clearNode(newLeftNode,getSegment()->getUsablePageSize());
00462     // 1. copy the content of old root page to the new page2. (left).
00463     newLeftNode = node;
00464     memcpy(
00465         newLeftNode.getDataForWrite(),
00466         node.getDataForRead(),
00467         getSegment()->getUsablePageSize() - sizeof(BTreeNode));
00468 
00469     // 1a. fix the prefetch links to match the games we're playing
00470     // with the root
00471     getSegment()->setPageSuccessor(newLeftPageId, rightPageId);
00472     getSegment()->setPageSuccessor(pageId, NULL_PAGE_ID);
00473 
00474     // 2. clear the root page and set the right height.
00475     // we use the old nodeAccessor to clear the node.
00476 
00477     nodeAccessor.clearNode(node,getSegment()->getUsablePageSize());
00478     node.height = newLeftNode.height + 1;
00479     BTreeNodeAccessor &rootNodeAccessor = getNonLeafNodeAccessor(node);
00480 
00481     // 3. add two entries to the root page.
00482 
00483     nodeAccessor.accessTuple(newLeftNode, newLeftNode.nEntries - 1);
00484     nodeAccessor.unmarshalKey(rootNodeAccessor.tupleData);
00485     rootNodeAccessor.tupleData.back().pData =
00486         reinterpret_cast<PConstBuffer>(&newLeftPageId);
00487     uint cb = rootNodeAccessor.tupleAccessor.getByteCount(
00488                         rootNodeAccessor.tupleData);
00489     PBuffer pBuffer1 = rootNodeAccessor.allocateEntry(node,0,cb);
00490     rootNodeAccessor.tupleAccessor.marshal(
00491                 rootNodeAccessor.tupleData, pBuffer1);
00492 
00493     nodeAccessor.accessTuple(rightNode, rightNode.nEntries - 1);
00494     nodeAccessor.unmarshalKey(rootNodeAccessor.tupleData);
00495     rootNodeAccessor.tupleData.back().pData =
00496             reinterpret_cast<PConstBuffer>(&rightPageId);
00497     cb = rootNodeAccessor.tupleAccessor.getByteCount(
00498                 rootNodeAccessor.tupleData);
00499 
00500     PBuffer pBuffer2 = rootNodeAccessor.allocateEntry(node,1,cb);
00501     rootNodeAccessor.tupleAccessor.marshal(
00502                         rootNodeAccessor.tupleData, pBuffer2);
00503 #if 0
00504     std::cerr << "after grow root:" << std::endl;
00505     rootNodeAccessor.dumpNode(std::cerr,node,pageId);
00506     std::cerr << "after grow left:" << std::endl;
00507     nodeAccessor.dumpNode(std::cerr,newLeftNode,newLeftPageId);
00508     std::cerr << "after grow right:" << std::endl;
00509     nodeAccessor.dumpNode(std::cerr,rightNode,rightPageId);
00510 #endif
00511     if (monotonic) {
00512         newLeftPageLock.flushPage(true);
00513     }
00514     newLeftPageLock.unlock();
00515     return;
00516 }
00517 
00518 inline void BTreeWriter::optimizeRootLockMode()
00519 {
00520     if (pageId != getRootPageId()) {
00521         // In case the tree has grown, counteract the effect of
00522         // adjustRootLockMode for subsequent searches.
00523         rootLockMode = LOCKMODE_S;
00524     }
00525 }
00526 
00527 void BTreeWriter::deleteCurrent()
00528 {
00529     // TODO:  Balancing, all that jazz?  Maybe.
00530 
00531     assert(pageLock.isLocked());
00532 
00533     optimizeRootLockMode();
00534 
00535     BTreeNode &node = pageLock.getNodeForWrite();
00536     BTreeNodeAccessor &nodeAccessor = getLeafNodeAccessor(node);
00537 
00538     if (isLoggingEnabled()) {
00539         uint cbTuple = nodeAccessor.tupleAccessor.getCurrentByteCount();
00540         ByteOutputStream &logStream = getLogicalTxn()->beginLogicalAction(
00541             *this,
00542             ACTION_DELETE);
00543         memcpy(
00544             logStream.getWritePointer(cbTuple),
00545             nodeAccessor.tupleAccessor.getCurrentTupleBuf(),
00546             cbTuple);
00547         logStream.consumeWritePointer(cbTuple);
00548         getLogicalTxn()->endLogicalAction();
00549     }
00550 
00551     nodeAccessor.deallocateEntry(node,iTupleOnLowestLevel);
00552 
00553     // precompensate for subsequent calls to searchNext()
00554     --iTupleOnLowestLevel;
00555 }
00556 
00557 bool BTreeWriter::updateCurrent(TupleData const &tupleData)
00558 {
00559     // TODO:  assert that key has not changed
00560 
00561     PBuffer pTupleBuf;
00562 
00563     assert(pageLock.isLocked());
00564 
00565     optimizeRootLockMode();
00566 
00567     BTreeNode *pNode = &(pageLock.getNodeForWrite());
00568     BTreeNodeAccessor &nodeAccessor = getLeafNodeAccessor(*pNode);
00569 
00570     assert(!isLoggingEnabled());
00571 
00572     if (nodeAccessor.hasFixedWidthEntries()) {
00573         // we can use a direct overwrite
00574         pTupleBuf = const_cast<PBuffer>(
00575             nodeAccessor.getEntryForRead(*pNode,iTupleOnLowestLevel));
00576         nodeAccessor.tupleAccessor.marshal(tupleData,pTupleBuf);
00577         return true;
00578     }
00579 
00580     // calculate whether updated tuple can fit
00581     uint cbTuple =
00582         nodeAccessor.tupleAccessor.getByteCount(tupleData);
00583     int cbDelta =
00584         cbTuple - nodeAccessor.tupleAccessor.getCurrentByteCount();
00585     if (cbDelta < 0) {
00586         cbDelta = 0;
00587     }
00588 
00589     // NOTE:  there's a tiny discrepancy here due to entry overhead, but it
00590     // errs on the safe side, so ignore it
00591     switch (nodeAccessor.calculateCapacity(*pNode,cbDelta)) {
00592     case BTreeNodeAccessor::CAN_FIT:
00593         nodeAccessor.deallocateEntry(*pNode,iTupleOnLowestLevel);
00594         break;
00595     case BTreeNodeAccessor::CAN_FIT_WITH_COMPACTION:
00596         nodeAccessor.deallocateEntry(*pNode,iTupleOnLowestLevel);
00597         compactNode(pageLock);
00598         // compactNode uses swapBuffers, which is why we have to use a pointer
00599         // rather than a reference for pNode
00600         pNode = &(pageLock.getNodeForWrite());
00601         assert(nodeAccessor.calculateCapacity(*pNode,cbTuple)
00602                == BTreeNodeAccessor::CAN_FIT);
00603         break;
00604     case BTreeNodeAccessor::CAN_NOT_FIT:
00605         return false;
00606     default:
00607         permAssert(false);
00608     }
00609 
00610     pTupleBuf = nodeAccessor.allocateEntry(*pNode,iTupleOnLowestLevel,cbTuple);
00611     nodeAccessor.tupleAccessor.marshal(tupleData,pTupleBuf);
00612     return true;
00613 }
00614 
00615 LogicalTxnClassId BTreeWriter::getParticipantClassId() const
00616 {
00617     return BTreeRecoveryFactory::getParticipantClassId();
00618 }
00619 
00620 void BTreeWriter::describeParticipant(
00621     ByteOutputStream &logStream)
00622 {
00623     PageId rootPageId = getRootPageId();
00624     logStream.writeValue(rootPageId);
00625     getTupleDescriptor().writePersistent(logStream);
00626     getKeyProjection().writePersistent(logStream);
00627 }
00628 
00629 void BTreeWriter::undoLogicalAction(
00630     LogicalActionType actionType,
00631     ByteInputStream &logStream)
00632 {
00633     switch (actionType) {
00634     case ACTION_INSERT:
00635         deleteLogged(logStream);
00636         break;
00637     case ACTION_DELETE:
00638         insertLogged(logStream);
00639         break;
00640     default:
00641         permAssert(false);
00642     }
00643 }
00644 
00645 void BTreeWriter::redoLogicalAction(
00646     LogicalActionType actionType,
00647     ByteInputStream &logStream)
00648 {
00649     switch (actionType) {
00650     case ACTION_INSERT:
00651         insertLogged(logStream);
00652         break;
00653     case ACTION_DELETE:
00654         deleteLogged(logStream);
00655         break;
00656     default:
00657         permAssert(false);
00658     }
00659 }
00660 
00661 void BTreeWriter::insertLogged(ByteInputStream &logStream)
00662 {
00663     // REVIEW:  do we ever need to use a different Distinctness?
00664     uint cbTuple = insertTupleFromBuffer(
00665         logStream.getReadPointer(1),DUP_ALLOW);
00666     logStream.consumeReadPointer(cbTuple);
00667 }
00668 
00669 void BTreeWriter::deleteLogged(ByteInputStream &logStream)
00670 {
00671     pLeafNodeAccessor->tupleAccessor.setCurrentTupleBuf(
00672         logStream.getReadPointer(1));
00673     uint cbTuple = pLeafNodeAccessor->tupleAccessor.getCurrentByteCount();
00674     pLeafNodeAccessor->unmarshalKey(searchKeyData);
00675     if (searchForKey(searchKeyData,DUP_SEEK_ANY)) {
00676         deleteCurrent();
00677     } else {
00678         // TODO:  assert?
00679     }
00680     endSearch();
00681     logStream.consumeReadPointer(cbTuple);
00682 }
00683 
00684 void BTreeWriter::releaseScratchBuffers()
00685 {
00686     scratchPageLock.unlock();
00687 }
00688 
00689 bool BTreeWriter::positionSearchKey(BTreeNodeAccessor &nodeAccessor)
00690 {
00691     nodeAccessor.unmarshalKey(searchKeyData);
00692     pageStack.clear();
00693     DuplicateSeek seekMode = (monotonic) ? DUP_SEEK_END : DUP_SEEK_ANY;
00694     bool duplicate = searchForKeyTemplate< true, std::vector<PageId> >(
00695         searchKeyData,seekMode,true,pageStack,getRootPageId(),rootLockMode,
00696         READ_ALL);
00697     return duplicate;
00698 }
00699 
00700 bool BTreeWriter::checkMonotonicity(
00701     BTreeNodeAccessor &nodeAccessor, PConstBuffer pTupleBuffer)
00702 {
00703     // unmarshal previous key, if accessible
00704     if (iTupleOnLowestLevel == 0) {
00705         return true;
00706     }
00707     BTreeNode const &node = pageLock.getNodeForRead();
00708     accessTupleInline(node, iTupleOnLowestLevel - 1);
00709     nodeAccessor.unmarshalKey(comparisonKeyData);
00710 
00711     nodeAccessor.tupleAccessor.setCurrentTupleBuf(pTupleBuffer);
00712     nodeAccessor.unmarshalKey(searchKeyData);
00713     int keyComp = keyDescriptor.compareTuples(comparisonKeyData, searchKeyData);
00714 
00715     return (keyComp <= 0 && node.nEntries == iTupleOnLowestLevel);
00716 }
00717 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $");
00718 
00719 // End BTreeWriter.cpp

Generated on Mon Jun 22 04:00:13 2009 for Fennel by  doxygen 1.5.1