00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef Fennel_BTreeReaderImpl_Included
00025 #define Fennel_BTreeReaderImpl_Included
00026
00027 #include "fennel/btree/BTreeReader.h"
00028
00029 FENNEL_BEGIN_NAMESPACE
00030
00031 inline TupleData const &BTreeReader::getSearchKey()
00032 {
00033 assert(pSearchKey);
00034 return *pSearchKey;
00035 }
00036
00037 inline uint BTreeReader::binarySearch(
00038 BTreeNode const &node,
00039 DuplicateSeek dupSeek,
00040 bool leastUpper,
00041 bool &found)
00042 {
00043 return getNodeAccessor(node).binarySearch(
00044 node,
00045 keyDescriptor,
00046 getSearchKey(),
00047 dupSeek,
00048 leastUpper,
00049 comparisonKeyData,
00050 found);
00051 }
00052
00053 inline bool BTreeReader::adjustRootLockMode(LockMode &lockMode)
00054 {
00055 if (lockMode == leafLockMode) {
00056
00057 return true;
00058 }
00059
00060
00061 rootLockMode = leafLockMode;
00062 lockMode = leafLockMode;
00063
00064
00065
00066 if (pageLock.tryUpgrade()) {
00067 return true;
00068 }
00069
00070
00071 pageLock.unlock();
00072 return false;
00073 }
00074
00075 inline int BTreeReader::compareFirstKey(BTreeNode const &node)
00076 {
00077 return getNodeAccessor(node).compareFirstKey(
00078 node,
00079 keyDescriptor,
00080 getSearchKey(),
00081 comparisonKeyData);
00082 }
00083
00084 inline void BTreeReader::accessTupleInline(BTreeNode const &node, uint iEntry)
00085 {
00086 getNodeAccessor(node).accessTupleInline(node, iEntry);
00087 }
00088
00089 inline void BTreeReader::accessLeafTuple()
00090 {
00091 BTreeNode const &node = pageLock.getNodeForRead();
00092 getLeafNodeAccessor(node).accessTuple(node,iTupleOnLowestLevel);
00093 }
00094
00095 template <bool leafLockCoupling,class PageStack>
00096 inline bool BTreeReader::searchForKeyTemplate(
00097 TupleData const &key, DuplicateSeek dupSeek, bool leastUpper,
00098 PageStack &pageStack, PageId startPageId, LockMode initialLockMode,
00099 ReadMode readMode)
00100 {
00101 pSearchKey = &key;
00102
00103
00104
00105
00106
00107 PageId rightSearchTerminator = NULL_PAGE_ID;
00108 pageId = startPageId;
00109 LockMode lockMode = initialLockMode;
00110 bool lockCoupling = false;
00111 bool foundKeyAndMovedRight = false;
00112 for (;;) {
00113 if (leafLockCoupling && lockCoupling) {
00114 pageLock.lockPageWithCoupling(pageId,lockMode);
00115 } else {
00116 pageLock.lockPage(pageId,lockMode);
00117 }
00118
00119 BTreeNode const *pNode = &(pageLock.getNodeForRead());
00120
00121
00122 if (!pNode->height && !adjustRootLockMode(lockMode)) {
00123
00124 continue;
00125 }
00126
00127 bool found;
00128 uint iKeyBound = binarySearch(*pNode,dupSeek,leastUpper,found);
00129 if (foundKeyAndMovedRight && !found) {
00130
00131
00132
00133
00134
00135 assert(iKeyBound == 0);
00136 found = true;
00137 }
00138
00139
00140
00141
00142
00143 if (!leastUpper && !found && iKeyBound == pNode->nEntries - 1 &&
00144 pNode->rightSibling != NULL_PAGE_ID)
00145 {
00146
00147
00148 assert(leafLockCoupling == false);
00149
00150 PageId siblingPageId = pNode->rightSibling;
00151 pageLock.unlock();
00152 pageLock.lockPage(siblingPageId, lockMode);
00153 BTreeNode const &rightNode = pageLock.getNodeForRead();
00154 int res = compareFirstKey(rightNode);
00155
00156 pageLock.unlock();
00157 if (res < 0) {
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168 pageLock.lockPage(pageId, lockMode);
00169 pNode = &(pageLock.getNodeForRead());
00170 accessTupleInline(*pNode, iKeyBound);
00171 } else {
00172
00173
00174 if (readMode == READ_LEAF_ONLY) {
00175 assert(rightNode.height == 0);
00176
00177
00178 pageLock.lockPage(pageId, lockMode);
00179 iTupleOnLowestLevel = iKeyBound;
00180 return false;
00181 }
00182 pageId = siblingPageId;
00183 foundKeyAndMovedRight = false;
00184 continue;
00185 }
00186 }
00187
00188 if (iKeyBound == pNode->nEntries) {
00189 assert(!found || (dupSeek == DUP_SEEK_END));
00190
00191
00192 if (pNode->rightSibling == rightSearchTerminator) {
00193
00194
00195
00196
00197 assert(!pNode->height);
00198 if (rightSearchTerminator == NULL_PAGE_ID) {
00199 singular = true;
00200 }
00201 } else {
00202
00203 foundKeyAndMovedRight = found;
00204 pageId = pNode->rightSibling;
00205 if (leafLockCoupling && !pNode->height) {
00206 lockCoupling = true;
00207 }
00208 continue;
00209 }
00210 }
00211
00212 switch (pNode->height) {
00213 case 0:
00214
00215 iTupleOnLowestLevel = iKeyBound;
00216 return found;
00217
00218 case 1:
00219 if (readMode == READ_NONLEAF_ONLY) {
00220 iTupleOnLowestLevel = iKeyBound;
00221 return found;
00222 }
00223
00224 lockMode = leafLockMode;
00225 break;
00226 }
00227
00228
00229 pageStack.push_back(pageId);
00230
00231
00232 pageId = getChildForCurrent();
00233 foundKeyAndMovedRight = false;
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243 if ((*pSearchKey).size() == keyDescriptor.size() &&
00244 dupSeek != DUP_SEEK_END)
00245 {
00246 if (iKeyBound < (pNode->nEntries - 1)) {
00247 rightSearchTerminator = getChild(*pNode,iKeyBound + 1);
00248 } else {
00249
00250
00251
00252 PageId rightSiblingPageId = pNode->rightSibling;
00253 pageLock.unlock();
00254 rightSearchTerminator = getFirstChild(rightSiblingPageId);
00255 }
00256 }
00257 }
00258 }
00259
00260 FENNEL_END_NAMESPACE
00261
00262 #endif
00263
00264