00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef Fennel_SegPageLock_Included
00025 #define Fennel_SegPageLock_Included
00026
00027 #include "fennel/segment/Segment.h"
00028 #include "fennel/segment/SegmentAccessor.h"
00029 #include "fennel/cache/CacheAccessor.h"
00030 #include "fennel/cache/CachePage.h"
00031
00032 #include <boost/utility.hpp>
00033
00034 FENNEL_BEGIN_NAMESPACE
00035
00036
00037
00038
00039
00040
00041
00050 class FENNEL_SEGMENT_EXPORT SegPageLock
00051 : public boost::noncopyable
00052 {
00053
00054
00055
00056
00057 SegmentAccessor segmentAccessor;
00058 CachePage *pPage;
00059 LockMode lockMode;
00060 PageId lockedPageId;
00061 bool newPage;
00062 bool isWriteVersioned;
00063
00064 inline void resetPage()
00065 {
00066 pPage = NULL;
00067 lockedPageId = NULL_PAGE_ID;
00068 newPage = false;
00069 }
00070
00071 inline LockMode getLockMode(LockMode origLockMode)
00072 {
00073
00074
00075
00076
00077 if (isWriteVersioned) {
00078 if (origLockMode == LOCKMODE_X
00079 || origLockMode == LOCKMODE_X_NOWAIT)
00080 {
00081 return
00082 (origLockMode == LOCKMODE_X) ?
00083 LOCKMODE_S : LOCKMODE_S_NOWAIT;
00084 }
00085 }
00086
00087 return origLockMode;
00088 }
00089
00090 inline void initialize()
00091 {
00092
00093
00094 lockMode = LOCKMODE_X;
00095 isWriteVersioned = false;
00096 }
00097
00098
00099 public:
00100 explicit SegPageLock()
00101 {
00102 initialize();
00103 resetPage();
00104 }
00105
00106 explicit SegPageLock(
00107 SegmentAccessor const &segmentAccessor)
00108 {
00109 initialize();
00110 resetPage();
00111 accessSegment(segmentAccessor);
00112 }
00113
00114 ~SegPageLock()
00115 {
00116 unlock();
00117 }
00118
00119 inline void accessSegment(
00120 SegmentAccessor const &segmentAccessorInit)
00121 {
00122 assert(!pPage);
00123 assert(segmentAccessorInit.pSegment);
00124 assert(segmentAccessorInit.pCacheAccessor);
00125 segmentAccessor = segmentAccessorInit;
00126 isWriteVersioned = segmentAccessor.pSegment->isWriteVersioned();
00127 }
00128
00129 inline bool isLocked() const
00130 {
00131 return pPage ? true : false;
00132 }
00133
00134 inline CachePage &getPage() const
00135 {
00136 assert(isLocked());
00137 return *pPage;
00138 }
00139
00140 inline PageId allocatePage(PageOwnerId ownerId = ANON_PAGE_OWNER_ID)
00141 {
00142 PageId pageId = tryAllocatePage(ownerId);
00143 permAssert(pageId != NULL_PAGE_ID);
00144 return pageId;
00145 }
00146
00147 inline PageId tryAllocatePage(PageOwnerId ownerId = ANON_PAGE_OWNER_ID)
00148 {
00149 unlock();
00150 PageId pageId = segmentAccessor.pSegment->allocatePageId(ownerId);
00151 if (pageId == NULL_PAGE_ID) {
00152 return pageId;
00153 }
00154 lockPage(pageId,LOCKMODE_X,false);
00155 newPage = true;
00156 return pageId;
00157 }
00158
00159 inline void deallocateLockedPage()
00160 {
00161 assert(isLocked());
00162 BlockId blockId = pPage->getBlockId();
00163 unlock();
00164 PageId pageId = segmentAccessor.pSegment->translateBlockId(blockId);
00165
00166
00167 segmentAccessor.pSegment->deallocatePageRange(pageId,pageId);
00168 }
00169
00170 inline void deallocateUnlockedPage(PageId pageId)
00171 {
00172 assert(pageId != NULL_PAGE_ID);
00173 BlockId blockId = segmentAccessor.pSegment->translatePageId(pageId);
00174
00175
00176 segmentAccessor.pSegment->deallocatePageRange(pageId,pageId);
00177 }
00178
00179 inline void unlock()
00180 {
00181 if (pPage) {
00182 segmentAccessor.pCacheAccessor->unlockPage(
00183 *pPage,
00184 lockMode);
00185 resetPage();
00186 }
00187 }
00188
00189 inline void dontUnlock()
00190 {
00191 resetPage();
00192 }
00193
00194 inline void lockPage(
00195 PageId pageId,LockMode lockModeInit,
00196 bool readIfUnmapped = true)
00197 {
00198
00199
00200 if (isLocked() && pageId == lockedPageId && lockMode == lockModeInit) {
00201 return;
00202 }
00203 unlock();
00204 lockMode = getLockMode(lockModeInit);
00205 BlockId blockId = segmentAccessor.pSegment->translatePageId(pageId);
00206 pPage = segmentAccessor.pCacheAccessor->lockPage(
00207 blockId,
00208 lockMode,
00209 readIfUnmapped,
00210 segmentAccessor.pSegment->getMappedPageListener(blockId));
00211 lockedPageId = pageId;
00212 }
00213
00214 inline void lockPageWithCoupling(
00215 PageId pageId,LockMode lockModeInit)
00216 {
00217 assert(lockModeInit < LOCKMODE_S_NOWAIT);
00218 BlockId blockId = segmentAccessor.pSegment->translatePageId(pageId);
00219 LockMode newLockMode = getLockMode(lockModeInit);
00220 CachePage *pNewPage = segmentAccessor.pCacheAccessor->lockPage(
00221 blockId,
00222 newLockMode,
00223 true,
00224 segmentAccessor.pSegment->getMappedPageListener(blockId));
00225 assert(pNewPage);
00226 unlock();
00227 lockMode = newLockMode;
00228 pPage = pNewPage;
00229 lockedPageId = pageId;
00230 }
00231
00232 inline void lockShared(PageId pageId)
00233 {
00234 lockPage(pageId,LOCKMODE_S);
00235 }
00236
00237 inline void lockExclusive(PageId pageId)
00238 {
00239 lockPage(pageId,LOCKMODE_X);
00240 }
00241
00242 inline void lockSharedNoWait(PageId pageId)
00243 {
00244 lockPage(pageId,LOCKMODE_S_NOWAIT);
00245 lockMode = LOCKMODE_S;
00246 }
00247
00248 inline void lockExclusiveNoWait(PageId pageId)
00249 {
00250 lockPage(pageId,LOCKMODE_X_NOWAIT);
00251 lockMode = LOCKMODE_X;
00252 }
00253
00254 inline void updatePage()
00255 {
00256 assert(isLocked());
00257
00258
00259
00260 if (!newPage) {
00261 PageId origPageId =
00262 segmentAccessor.pSegment->translateBlockId(getPage().
00263 getBlockId());
00264 PageId updatePageId =
00265 segmentAccessor.pSegment->updatePage(origPageId);
00266 if (updatePageId != NULL_PAGE_ID) {
00267 lockUpdatePage(updatePageId);
00268 return;
00269 }
00270 }
00271
00272
00273
00274
00275 if (lockMode == LOCKMODE_S) {
00276 assert(isWriteVersioned);
00277 TxnId txnId = segmentAccessor.pCacheAccessor->getTxnId();
00278 pPage->upgrade(txnId);
00279 lockMode = LOCKMODE_X;
00280 }
00281 }
00282
00283 inline void lockUpdatePage(PageId updatePageId)
00284 {
00285 assert(isWriteVersioned);
00286 BlockId blockId =
00287 segmentAccessor.pSegment->translatePageId(updatePageId);
00288 assert(lockMode == LOCKMODE_S);
00289 CachePage *pNewPage = segmentAccessor.pCacheAccessor->lockPage(
00290 blockId,
00291 LOCKMODE_X,
00292 true,
00293 segmentAccessor.pSegment->getMappedPageListener(blockId));
00294 assert(pNewPage);
00295
00296
00297 memcpy(
00298 pNewPage->getWritableData(),
00299 pPage->getReadableData(),
00300 segmentAccessor.pSegment->getUsablePageSize());
00301 PageId origPageId = lockedPageId;
00302 unlock();
00303 lockMode = LOCKMODE_X;
00304 pPage = pNewPage;
00305 newPage = true;
00306
00307
00308 lockedPageId = origPageId;
00309 }
00310
00311 inline PageId getPageId()
00312 {
00313
00314
00315
00316 return lockedPageId;
00317 }
00318
00319 inline void flushPage(bool async)
00320 {
00321 assert(isLocked());
00322 segmentAccessor.pCacheAccessor->flushPage(getPage(), true);
00323 }
00324
00325
00326 inline void swapBuffers(SegPageLock &other)
00327 {
00328
00329 assert(isLocked());
00330 assert(other.isLocked());
00331 assert(lockMode == LOCKMODE_X);
00332 assert(other.lockMode == LOCKMODE_X);
00333 assert(pPage != other.pPage);
00334
00335
00336 other.updatePage();
00337
00338
00339
00340
00341 Segment &segment = *(segmentAccessor.pSegment);
00342 memcpy(
00343 other.pPage->getWritableData() + segment.getUsablePageSize(),
00344 pPage->getReadableData() + segment.getUsablePageSize(),
00345 segment.getFullPageSize() - segment.getUsablePageSize());
00346 pPage->swapBuffers(*other.pPage);
00347 }
00348
00349 inline bool tryUpgrade()
00350 {
00351 assert(isLocked());
00352 assert(lockMode == LOCKMODE_S);
00353
00354
00355 TxnId txnId = segmentAccessor.pCacheAccessor->getTxnId();
00356
00357
00358
00359 if (isWriteVersioned) {
00360 return true;
00361 } else {
00362 if (pPage->tryUpgrade(txnId)) {
00363 lockMode = LOCKMODE_X;
00364 return true;
00365 }
00366 return false;
00367 }
00368 }
00369
00370 inline SharedCacheAccessor getCacheAccessor() const
00371 {
00372 return segmentAccessor.pCacheAccessor;
00373 }
00374 };
00375
00389 struct FENNEL_SEGMENT_EXPORT StoredNode
00390 {
00394 MagicNumber magicNumber;
00395 };
00396
00397
00398
00413 template <class Node>
00414 class SegNodeLock : public SegPageLock
00415 {
00416 inline void verifyMagicNumber(Node const &node) const
00417 {
00418 assert(node.magicNumber == Node::MAGIC_NUMBER);
00419 }
00420
00421 public:
00422 explicit SegNodeLock()
00423 {
00424 }
00425
00426 explicit SegNodeLock(
00427 SegmentAccessor &segmentAccessor)
00428 : SegPageLock(segmentAccessor)
00429 {
00430 }
00431
00432 inline bool checkMagicNumber() const
00433 {
00434 Node const &node =
00435 *reinterpret_cast<Node const *>(getPage().getReadableData());
00436 return (node.magicNumber == Node::MAGIC_NUMBER);
00437 }
00438
00439 inline Node const &getNodeForRead() const
00440 {
00441 Node const &node =
00442 *reinterpret_cast<Node const *>(getPage().getReadableData());
00443 verifyMagicNumber(node);
00444 return node;
00445 }
00446
00447 inline Node &getNodeForWrite()
00448 {
00449 updatePage();
00450 return *reinterpret_cast<Node *>(getPage().getWritableData());
00451 }
00452
00453 inline PageId allocatePage(PageOwnerId ownerId = ANON_PAGE_OWNER_ID)
00454 {
00455 PageId pageId = SegPageLock::allocatePage(ownerId);
00456 setMagicNumber();
00457 return pageId;
00458 }
00459
00460 inline PageId tryAllocatePage(PageOwnerId ownerId = ANON_PAGE_OWNER_ID)
00461 {
00462 PageId pageId = SegPageLock::tryAllocatePage(ownerId);
00463 if (pageId != NULL_PAGE_ID) {
00464 setMagicNumber();
00465 }
00466 return pageId;
00467 }
00468
00469 inline void setMagicNumber()
00470 {
00471 getNodeForWrite().magicNumber = Node::MAGIC_NUMBER;
00472 }
00473
00474 inline bool isMagicNumberValid()
00475 {
00476 Node const &node =
00477 *reinterpret_cast<Node const *>(getPage().getReadableData());
00478 return node.magicNumber == Node::MAGIC_NUMBER;
00479 }
00480 };
00481
00482 FENNEL_END_NAMESPACE
00483
00484 #endif
00485
00486