00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef Fennel_CacheMethodsImpl_Included
00025 #define Fennel_CacheMethodsImpl_Included
00026
00027 #include "fennel/device/DeviceAccessSchedulerParams.h"
00028 #include "fennel/device/RandomAccessNullDevice.h"
00029 #include "fennel/cache/CacheParams.h"
00030 #include "fennel/common/CompoundId.h"
00031 #include "fennel/common/InvalidParamExcn.h"
00032 #include "fennel/cache/VMAllocator.h"
00033
00034
00035
00036 FENNEL_BEGIN_NAMESPACE
00037
00038
00039
00040
00041
00042
00043
00044 template <class PageT,class VictimPolicyT>
00045 CacheImpl<PageT,VictimPolicyT>
00046 ::CacheImpl(
00047 CacheParams const ¶ms,
00048 CacheAllocator *pBufferAllocatorInit)
00049 :
00050 deviceTable(CompoundId::getMaxDeviceCount()),
00051 pageTable(),
00052 bufferAllocator(
00053 pBufferAllocatorInit ?
00054 *pBufferAllocatorInit
00055 : *new VMAllocator(params.cbPage,0)),
00056 pBufferAllocator(pBufferAllocatorInit ? NULL : &bufferAllocator),
00057 victimPolicy(params),
00058 timerThread(*this)
00059 {
00060 cbPage = params.cbPage;
00061 pDeviceAccessScheduler = NULL;
00062 inFlushMode = false;
00063
00064
00065 dirtyHighWaterPercent = 25;
00066 dirtyLowWaterPercent = 5;
00067
00068 initializeStats();
00069
00070 allocatePages(params);
00071
00072
00073
00074
00075
00076 pageTable.resize(2*pages.size()+1);
00077 for (uint i = 0; i < pageTable.size(); i++) {
00078 pageTable[i] = new PageBucketT();
00079 }
00080
00081 try {
00082 pDeviceAccessScheduler = DeviceAccessScheduler::newScheduler(
00083 params.schedParams);
00084 } catch (FennelExcn &ex) {
00085 close();
00086 throw ex;
00087 }
00088
00089
00090 registerDevice(
00091 NULL_DEVICE_ID,
00092 SharedRandomAccessDevice(
00093 new RandomAccessNullDevice()));
00094
00095 idleFlushInterval = params.idleFlushInterval;
00096 if (idleFlushInterval) {
00097 timerThread.start();
00098 }
00099
00100 prefetchPagesMax = params.prefetchPagesMax;
00101 prefetchThrottleRate = params.prefetchThrottleRate;
00102 }
00103
00104 template <class PageT,class VictimPolicyT>
00105 void CacheImpl<PageT,VictimPolicyT>::getPrefetchParams(
00106 uint &prefetchPagesMax,
00107 uint &prefetchThrottleRate)
00108 {
00109 prefetchPagesMax = this->prefetchPagesMax;
00110 prefetchThrottleRate = this->prefetchThrottleRate;
00111 }
00112
00113 template <class PageT,class VictimPolicyT>
00114 void CacheImpl<PageT,VictimPolicyT>::initializeStats()
00115 {
00116
00117 statsSinceInit.nHits = 0;
00118 statsSinceInit.nHitsSinceInit = 0;
00119 statsSinceInit.nRequests = 0;
00120 statsSinceInit.nRequestsSinceInit = 0;
00121 statsSinceInit.nVictimizations = 0;
00122 statsSinceInit.nVictimizationsSinceInit = 0;
00123 statsSinceInit.nDirtyPages = 0;
00124 statsSinceInit.nPageReads = 0;
00125 statsSinceInit.nPageReadsSinceInit = 0;
00126 statsSinceInit.nPageWrites = 0;
00127 statsSinceInit.nPageWritesSinceInit = 0;
00128 statsSinceInit.nRejectedPrefetches = 0;
00129 statsSinceInit.nRejectedPrefetchesSinceInit = 0;
00130 statsSinceInit.nIoRetries = 0;
00131 statsSinceInit.nIoRetriesSinceInit = 0;
00132 statsSinceInit.nSuccessfulPrefetches = 0;
00133 statsSinceInit.nSuccessfulPrefetchesSinceInit = 0;
00134 statsSinceInit.nLazyWrites = 0;
00135 statsSinceInit.nLazyWritesSinceInit = 0;
00136 statsSinceInit.nLazyWriteCalls = 0;
00137 statsSinceInit.nLazyWriteCallsSinceInit = 0;
00138 statsSinceInit.nVictimizationWrites = 0;
00139 statsSinceInit.nVictimizationWritesSinceInit = 0;
00140 statsSinceInit.nCheckpointWrites = 0;
00141 statsSinceInit.nCheckpointWritesSinceInit = 0;
00142 statsSinceInit.nMemPagesAllocated = 0;
00143 statsSinceInit.nMemPagesUnused = 0;
00144 statsSinceInit.nMemPagesMax = 0;
00145 }
00146
00147 template <class PageT,class VictimPolicyT>
00148 void CacheImpl<PageT,VictimPolicyT>::allocatePages(CacheParams const ¶ms)
00149 {
00150 static const int allocErrorMsgSize = 255;
00151 uint nPagesMax = 0;
00152 uint nPagesInit = 0;
00153
00154
00155
00156 for (int attempts = 0; attempts < 2; attempts++) {
00157 bool allocError = false;
00158 int allocErrorCode = 0;
00159 char allocErrorMsg[allocErrorMsgSize + 1] = { 0 };
00160
00161 nPagesMax = params.nMemPagesMax;
00162 nPagesInit = params.nMemPagesInit;
00163
00164 try {
00165 if (attempts != 0) {
00166 nPagesMax = CacheParams::defaultMemPagesMax;
00167 nPagesInit = CacheParams::defaultMemPagesInit;
00168 }
00169
00170 pages.clear();
00171 if (pages.capacity() > nPagesMax) {
00172
00173
00174
00175 std::vector<PageT *>(1).swap(pages);
00176 }
00177 pages.reserve(nPagesMax);
00178 pages.assign(nPagesMax, NULL);
00179
00180
00181 for (uint i = 0; i < nPagesMax; i++) {
00182 PBuffer pBuffer = NULL;
00183 if (i < nPagesInit) {
00184 pBuffer = static_cast<PBuffer>(
00185 bufferAllocator.allocate(&allocErrorCode));
00186 if (pBuffer == NULL) {
00187 allocError = true;
00188 strncpy(
00189 allocErrorMsg, "mmap failed", allocErrorMsgSize);
00190 break;
00191 }
00192 }
00193 PageT &page = *new PageT(*this,pBuffer);
00194 pages[i] = &page;
00195 }
00196 } catch (std::exception &excn) {
00197 allocError = true;
00198 allocErrorCode = 0;
00199 if (dynamic_cast<std::bad_alloc *>(&excn) != NULL) {
00200 strncpy(allocErrorMsg, "malloc failed", allocErrorMsgSize);
00201 } else {
00202 strncpy(allocErrorMsg, excn.what(), allocErrorMsgSize);
00203 }
00204 }
00205
00206 if (!allocError) {
00207
00208 break;
00209 }
00210
00211
00212 for (uint i = 0; i < pages.size(); i++) {
00213 if (!pages[i]) {
00214 break;
00215 }
00216 PBuffer pBuffer = pages[i]->pBuffer;
00217 deleteAndNullify(pages[i]);
00218 if (pBuffer) {
00219
00220
00221
00222
00223
00224 bufferAllocator.deallocate(pBuffer);
00225 }
00226 }
00227
00228 if (attempts != 0) {
00229
00230 close();
00231 throw SysCallExcn(std::string(allocErrorMsg), allocErrorCode);
00232 }
00233 }
00234
00235
00236
00237
00238 for (uint i = 0; i < pages.size(); i++) {
00239 PageT *page = pages[i];
00240 PBuffer pBuffer = page->pBuffer;
00241 if (pBuffer) {
00242 unmappedBucket.pageList.push_back(*page);
00243 victimPolicy.registerPage(*page);
00244 } else {
00245 unallocatedBucket.pageList.push_back(*page);
00246 }
00247 }
00248
00249 uint nPages = std::min(nPagesInit, nPagesMax);
00250 calcDirtyThreshholds(nPages);
00251 victimPolicy.setAllocatedPageCount(nPages);
00252 }
00253
00254 template <class PageT,class VictimPolicyT>
00255 void CacheImpl<PageT,VictimPolicyT>::calcDirtyThreshholds(uint nCachePages)
00256 {
00257 dirtyHighWaterMark = nCachePages * dirtyHighWaterPercent / 100;
00258 dirtyLowWaterMark = nCachePages * dirtyLowWaterPercent / 100;
00259 }
00260
00261 template <class PageT,class VictimPolicyT>
00262 uint CacheImpl<PageT,VictimPolicyT>::getAllocatedPageCount()
00263 {
00264 SXMutexSharedGuard guard(unallocatedBucket.mutex);
00265 return pages.size() - unallocatedBucket.pageList.size();
00266 }
00267
00268 template <class PageT,class VictimPolicyT>
00269 uint CacheImpl<PageT,VictimPolicyT>::getMaxAllocatedPageCount()
00270 {
00271 return pages.size();
00272 }
00273
00274 template <class PageT,class VictimPolicyT>
00275 void CacheImpl<PageT,VictimPolicyT>::setAllocatedPageCount(
00276 uint nMemPagesDesired)
00277 {
00278 assert(nMemPagesDesired <= pages.size());
00279
00280
00281 SXMutexExclusiveGuard unallocatedBucketGuard(unallocatedBucket.mutex);
00282 uint nMemPages =
00283 pages.size() - unallocatedBucket.pageList.size();
00284 if (nMemPages < nMemPagesDesired) {
00285
00286
00287
00288
00289 int nMemPagesToAllocate = nMemPagesDesired - nMemPages;
00290 std::vector<PBuffer> buffers(nMemPagesToAllocate);
00291
00292 for (int i = 0; i < nMemPagesToAllocate; ++i) {
00293 int errorCode;
00294 PBuffer pBuffer = static_cast<PBuffer>(
00295 bufferAllocator.allocate(&errorCode));
00296
00297 if (pBuffer == NULL) {
00298
00299 for (int i = 0; i < nMemPagesToAllocate; i++) {
00300 if (buffers[i] == NULL) {
00301 break;
00302 }
00303
00304
00305
00306
00307
00308 bufferAllocator.deallocate(buffers[i]);
00309 }
00310 buffers.clear();
00311 std::vector<PBuffer>(0).swap(buffers);
00312
00313 throw SysCallExcn("mmap failed", errorCode);
00314 }
00315
00316 buffers[i] = pBuffer;
00317 }
00318
00319 PageBucketMutator mutator(unallocatedBucket.pageList);
00320 for (int i = 0; i < nMemPagesToAllocate; i++) {
00321 PBuffer pBuffer = buffers[i];
00322 PageT *page = mutator.detach();
00323 assert(!page->pBuffer);
00324 page->pBuffer = pBuffer;
00325 victimPolicy.registerPage(*page);
00326
00327 freePage(*page);
00328 }
00329 } else {
00330
00331 for (; nMemPages > nMemPagesDesired; --nMemPages) {
00332 PageT *page;
00333 do {
00334 page = findFreePage();
00335 } while (!page);
00336
00337 int errorCode;
00338 if (bufferAllocator.deallocate(page->pBuffer, &errorCode)) {
00339
00340
00341 freePage(*page);
00342 throw SysCallExcn("munmap failed", errorCode);
00343 }
00344 page->pBuffer = NULL;
00345 victimPolicy.unregisterPage(*page);
00346
00347 unallocatedBucket.pageList.push_back(*page);
00348 }
00349 }
00350
00351 calcDirtyThreshholds(nMemPagesDesired);
00352
00353 victimPolicy.setAllocatedPageCount(nMemPagesDesired);
00354 }
00355
00356 template <class PageT,class VictimPolicyT>
00357 PageT *CacheImpl<PageT,VictimPolicyT>
00358 ::lockPage(
00359 BlockId blockId,LockMode lockMode,bool readIfUnmapped,
00360 MappedPageListener *pMappedPageListener,TxnId txnId)
00361 {
00362
00363
00364 assert(blockId != NULL_BLOCK_ID);
00365 assert(CompoundId::getDeviceId(blockId) != NULL_DEVICE_ID);
00366 PageBucketT &bucket = getHashBucket(blockId);
00367 PageT *page = lookupPage(bucket,blockId,true);
00368 if (page) {
00369 assert(page->pMappedPageListener == pMappedPageListener);
00370
00371
00372 incrementStatsCounter(nCacheHits);
00373 } else {
00374 do {
00375 page = findFreePage();
00376 } while (!page);
00377
00378
00379
00380
00381
00382 PageT &mappedPage = mapPage(
00383 bucket,*page,blockId,pMappedPageListener,readIfUnmapped);
00384 if (&mappedPage == page) {
00385
00386
00387 if (readIfUnmapped) {
00388 readPageAsync(*page);
00389 }
00390 } else {
00391
00392
00393 page = &mappedPage;
00394 }
00395 if (readIfUnmapped) {
00396
00397
00398
00399 StrictMutexGuard pageGuard(page->mutex);
00400 while (page->dataStatus == CachePage::DATA_READ) {
00401 page->waitForPendingIO(pageGuard);
00402 }
00403 }
00404 }
00405
00406 incrementStatsCounter(nCacheRequests);
00407
00408
00409
00410 if (!page->lock.waitFor(lockMode,ETERNITY,txnId)) {
00411
00412 assert((lockMode == LOCKMODE_S_NOWAIT) ||
00413 (lockMode == LOCKMODE_X_NOWAIT));
00414 StrictMutexGuard pageGuard(page->mutex);
00415 page->nReferences--;
00416 if (!page->nReferences) {
00417 victimPolicy.notifyPageUnpin(*page);
00418 }
00419 return NULL;
00420 }
00421 if ((lockMode == LOCKMODE_X) || (lockMode == LOCKMODE_X_NOWAIT)) {
00422
00423
00424
00425
00426
00427 StrictMutexGuard pageGuard(page->mutex);
00428 while (page->dataStatus == CachePage::DATA_WRITE) {
00429 page->waitForPendingIO(pageGuard);
00430 }
00431 #ifdef DEBUG
00432 int errorCode;
00433 if (bufferAllocator.setProtection(
00434 page->pBuffer, cbPage, false, &errorCode))
00435 {
00436 throw new SysCallExcn("memory protection failed", errorCode);
00437 }
00438 #endif
00439 } else {
00440
00441 #ifdef DEBUG
00442 StrictMutexGuard pageGuard(page->mutex);
00443 if (page->nReferences == 1) {
00444 int errorCode;
00445 if (bufferAllocator.setProtection(
00446 page->pBuffer, cbPage, true, &errorCode))
00447 {
00448 throw new SysCallExcn("memory protection failed", errorCode);
00449 }
00450 }
00451 #endif
00452 }
00453 return page;
00454 }
00455
00456 template <class PageT,class VictimPolicyT>
00457 void CacheImpl<PageT,VictimPolicyT>
00458 ::unlockPage(
00459 CachePage &vPage,LockMode lockMode,TxnId txnId)
00460 {
00461 assert(lockMode < LOCKMODE_S_NOWAIT);
00462 PageT &page = static_cast<PageT &>(vPage);
00463 StrictMutexGuard pageGuard(page.mutex);
00464 assert(page.nReferences);
00465 bool bFree = false;
00466 assert(page.hasBlockId());
00467 if (CompoundId::getDeviceId(page.getBlockId()) == NULL_DEVICE_ID) {
00468
00469 bFree = true;
00470 } else {
00471 int errorCode;
00472 if (bufferAllocator.setProtection(
00473 page.pBuffer, cbPage, false, &errorCode))
00474 {
00475 throw new SysCallExcn("memory protection failed", errorCode);
00476 }
00477
00478 page.lock.release(lockMode,txnId);
00479 }
00480 page.nReferences--;
00481 if (!page.nReferences) {
00482 if (bFree) {
00483
00484
00485
00486 page.dataStatus = CachePage::DATA_INVALID;
00487 page.blockId = NULL_BLOCK_ID;
00488 freePage(page);
00489 } else {
00490
00491
00492 victimPolicy.notifyPageUnpin(page);
00493 }
00494
00495
00496
00497 freePageCondition.notify_all();
00498 }
00499 }
00500
00501 template <class PageT,class VictimPolicyT>
00502 bool CacheImpl<PageT,VictimPolicyT>
00503 ::isPageMapped(BlockId blockId)
00504 {
00505 PageBucketT &bucket = getHashBucket(blockId);
00506 SXMutexSharedGuard bucketGuard(bucket.mutex);
00507 for (PageBucketIter iter(bucket.pageList); iter; ++iter) {
00508 StrictMutexGuard pageGuard(iter->mutex);
00509 if (iter->getBlockId() == blockId) {
00510 bucketGuard.unlock();
00511 victimPolicy.notifyPageAccess(*iter, false);
00512 return true;
00513 }
00514 }
00515 return false;
00516 }
00517
00518 template <class PageT,class VictimPolicyT>
00519 void CacheImpl<PageT,VictimPolicyT>
00520 ::discardPage(BlockId blockId)
00521 {
00522 assert(blockId != NULL_BLOCK_ID);
00523 PageBucketT &bucket = getHashBucket(blockId);
00524 PageT *page = lookupPage(bucket,blockId,false);
00525 if (!page) {
00526
00527
00528 victimPolicy.notifyPageDiscard(blockId);
00529 return;
00530 }
00531 StrictMutexGuard pageGuard(page->mutex);
00532
00533
00534
00535 while (page->dataStatus == CachePage::DATA_WRITE) {
00536 page->waitForPendingIO(pageGuard);
00537 }
00538
00539 assert(page->nReferences == 1);
00540 page->nReferences = 0;
00541 unmapAndFreeDiscardedPage(*page,pageGuard);
00542 }
00543
00544 template <class PageT,class VictimPolicyT>
00545 PageT &CacheImpl<PageT,VictimPolicyT>
00546 ::lockScratchPage(BlockNum blockNum)
00547 {
00548 PageT *page;
00549 do {
00550 page = findFreePage();
00551 } while (!page);
00552
00553 StrictMutexGuard pageGuard(page->mutex);
00554 page->nReferences = 1;
00555
00556
00557
00558
00559 page->dataStatus = CachePage::DATA_DIRTY;
00560 CompoundId::setDeviceId(page->blockId,NULL_DEVICE_ID);
00561 CompoundId::setBlockNum(page->blockId,blockNum);
00562
00563 return *page;
00564 }
00565
00566 template <class PageT,class VictimPolicyT>
00567 bool CacheImpl<PageT,VictimPolicyT>
00568 ::prefetchPage(BlockId blockId,MappedPageListener *pMappedPageListener)
00569 {
00570 assert(blockId != NULL_BLOCK_ID);
00571 if (isPageMapped(blockId)) {
00572
00573
00574 successfulPrefetch();
00575 return true;
00576 }
00577 PageT *page = findFreePage();
00578 if (!page) {
00579
00580 rejectedPrefetch();
00581 return false;
00582 }
00583
00584 PageBucketT &bucket = getHashBucket(blockId);
00585 bool bPendingRead = true;
00586
00587
00588
00589 bool bIncRef = false;
00590 PageT &mappedPage = mapPage(
00591 bucket,*page,blockId,pMappedPageListener,bPendingRead,bIncRef);
00592 if (&mappedPage == page) {
00593 if (readPageAsync(*page)) {
00594 successfulPrefetch();
00595 } else {
00596 rejectedPrefetch();
00597 return false;
00598 }
00599 } else {
00600
00601
00602 page = &mappedPage;
00603 }
00604 return true;
00605 }
00606
00607 template <class PageT,class VictimPolicyT>
00608 void CacheImpl<PageT,VictimPolicyT>
00609 ::prefetchBatch(
00610 BlockId blockId,uint nPagesPerBatch,
00611 MappedPageListener *pMappedPageListener)
00612 {
00613 assert(blockId != NULL_BLOCK_ID);
00614 assert(nPagesPerBatch > 1);
00615
00616 SharedRandomAccessDevice &pDevice = getDevice(
00617 CompoundId::getDeviceId(blockId));
00618 DeviceAccessScheduler &scheduler = getDeviceAccessScheduler(*pDevice);
00619 RandomAccessRequest request;
00620 request.pDevice = pDevice.get();
00621 request.cbOffset = getPageOffset(blockId);
00622 request.cbTransfer = 0;
00623 request.type = RandomAccessRequest::READ;
00624
00625 BlockId blockIdi = blockId;
00626 for (uint i = 0; i < nPagesPerBatch; i++) {
00627 PageT *page;
00628 do {
00629 page = findFreePage();
00630 } while (!page);
00631
00632 PageBucketT &bucket = getHashBucket(blockIdi);
00633 bool bPendingRead = true;
00634 bool bIncRef = false;
00635 PageT &mappedPage = mapPage(
00636 bucket,*page,blockIdi,pMappedPageListener,bPendingRead,bIncRef);
00637 if (&mappedPage != page) {
00638
00639
00640
00641
00642
00643 if (request.cbTransfer) {
00644 if (scheduler.schedule(request)) {
00645 successfulPrefetch();
00646 } else {
00647 ioRetry();
00648 rejectedPrefetch();
00649 }
00650 }
00651
00652
00653 request.cbOffset += request.cbTransfer;
00654 request.cbOffset += getPageSize();
00655 request.cbTransfer = 0;
00656 request.bindingList.clear(false);
00657 } else {
00658
00659 request.cbTransfer += getPageSize();
00660 request.bindingList.push_back(*page);
00661 }
00662 CompoundId::incBlockNum(blockIdi);
00663 }
00664
00665 if (request.cbTransfer) {
00666 if (scheduler.schedule(request)) {
00667 successfulPrefetch();
00668 } else {
00669 ioRetry();
00670 rejectedPrefetch();
00671 }
00672 }
00673 }
00674
00675 template <class PageT,class VictimPolicyT>
00676 void CacheImpl<PageT,VictimPolicyT>
00677 ::successfulPrefetch()
00678 {
00679 incrementStatsCounter(nSuccessfulCachePrefetches);
00680 }
00681
00682 template <class PageT,class VictimPolicyT>
00683 void CacheImpl<PageT,VictimPolicyT>
00684 ::rejectedPrefetch()
00685 {
00686 incrementStatsCounter(nRejectedCachePrefetches);
00687 }
00688
00689 template <class PageT,class VictimPolicyT>
00690 void CacheImpl<PageT,VictimPolicyT>
00691 ::ioRetry()
00692 {
00693 incrementStatsCounter(nIoRetries);
00694 }
00695
00696 template <class PageT,class VictimPolicyT>
00697 uint CacheImpl<PageT,VictimPolicyT>
00698 ::checkpointPages(
00699 PagePredicate &pagePredicate,CheckpointType checkpointType)
00700 {
00701
00702
00703
00704 uint nPages = 0;
00705 bool countPages = true;
00706
00707 FlushPhase flushPhase;
00708 if (checkpointType >= CHECKPOINT_FLUSH_AND_UNMAP) {
00709 flushPhase = phaseInitiate;
00710 } else {
00711 flushPhase = phaseSkip;
00712 }
00713 for (;;) {
00714 for (uint i = 0; i < pages.size(); i++) {
00715 PageT &page = *(pages[i]);
00716 StrictMutexGuard pageGuard(page.mutex);
00717
00718 if (!page.hasBlockId()) {
00719 continue;
00720 }
00721 if (!pagePredicate(page)) {
00722 continue;
00723 }
00724 if (countPages) {
00725 ++nPages;
00726 }
00727 if (flushPhase == phaseInitiate) {
00728 if (page.isDirty()) {
00729
00730
00731 assert(!page.isExclusiveLockHeld());
00732 incrementStatsCounter(nCheckpointWrites);
00733
00734 writePageAsync(page);
00735 }
00736 } else if (flushPhase == phaseWait) {
00737 BlockId origBlockId = page.getBlockId();
00738 MappedPageListener *origListener = page.pMappedPageListener;
00739 while (page.dataStatus == CachePage::DATA_WRITE) {
00740 page.waitForPendingIO(pageGuard);
00741 }
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754 if (page.pMappedPageListener &&
00755 page.pMappedPageListener == origListener &&
00756 page.getBlockId() == origBlockId)
00757 {
00758 MappedPageListener *newListener =
00759 page.pMappedPageListener->notifyAfterPageCheckpointFlush(
00760 page);
00761 if (newListener != NULL) {
00762 page.pMappedPageListener = newListener;
00763 }
00764 }
00765 } else {
00766 if (checkpointType <= CHECKPOINT_FLUSH_AND_UNMAP) {
00767 unmapAndFreeDiscardedPage(page,pageGuard);
00768 }
00769 }
00770 }
00771 countPages = false;
00772 if (flushPhase == phaseInitiate) {
00773 flushPhase = phaseWait;
00774 continue;
00775 }
00776 if (flushPhase == phaseWait) {
00777 if (checkpointType <= CHECKPOINT_FLUSH_AND_UNMAP) {
00778 flushPhase = phaseSkip;
00779 continue;
00780 }
00781 }
00782 return nPages;
00783 }
00784 }
00785
00786 template <class PageT,class VictimPolicyT>
00787 void CacheImpl<PageT,VictimPolicyT>
00788 ::flushPage(CachePage &page,bool async)
00789 {
00790 StrictMutexGuard pageGuard(page.mutex);
00791 assert(page.isExclusiveLockHeld());
00792 if (page.pMappedPageListener) {
00793 if (!page.pMappedPageListener->canFlushPage(page)) {
00794 if (async) {
00795
00796
00797
00798 return;
00799 }
00800 permFail("attempt to flush page out of order");
00801 }
00802 }
00803 if (page.dataStatus != CachePage::DATA_WRITE) {
00804
00805 writePageAsync(static_cast<PageT &>(page));
00806 }
00807 if (async) {
00808 return;
00809 }
00810
00811 while (page.dataStatus == CachePage::DATA_WRITE) {
00812 page.waitForPendingIO(pageGuard);
00813 }
00814 }
00815
00816 template <class PageT,class VictimPolicyT>
00817 void CacheImpl<PageT,VictimPolicyT>
00818 ::nicePage(CachePage &page)
00819 {
00820 victimPolicy.notifyPageNice(static_cast<PageT &>(page));
00821 }
00822
00823 template <class PageT,class VictimPolicyT>
00824 void CacheImpl<PageT,VictimPolicyT>
00825 ::registerDevice(DeviceId deviceId,SharedRandomAccessDevice pDevice)
00826 {
00827 assert(deviceTable[opaqueToInt(deviceId)] == NULL);
00828 deviceTable[opaqueToInt(deviceId)] = pDevice;
00829 pDeviceAccessScheduler->registerDevice(pDevice);
00830 }
00831
00832 template <class PageT,class VictimPolicyT>
00833 void CacheImpl<PageT,VictimPolicyT>
00834 ::unregisterDevice(DeviceId deviceId)
00835 {
00836 SharedRandomAccessDevice &pDevice = getDevice(deviceId);
00837 assert(pDevice);
00838 DeviceIdPagePredicate pagePredicate(deviceId);
00839 uint nPages = checkpointPages(pagePredicate,CHECKPOINT_DISCARD);
00840 assert(!nPages);
00841 pDeviceAccessScheduler->unregisterDevice(pDevice);
00842 pDevice.reset();
00843 }
00844
00845 template <class PageT,class VictimPolicyT>
00846 SharedRandomAccessDevice &CacheImpl<PageT,VictimPolicyT>
00847 ::getDevice(DeviceId deviceId)
00848 {
00849 return deviceTable[opaqueToInt(deviceId)];
00850 }
00851
00852
00853
00854
00855
00856 template <class PageT,class VictimPolicyT>
00857 void CacheImpl<PageT,VictimPolicyT>
00858 ::notifyTransferCompletion(CachePage &page,bool bSuccess)
00859 {
00860 StrictMutexGuard pageGuard(page.mutex);
00861
00862
00863
00864
00865
00866 switch (page.dataStatus) {
00867 case CachePage::DATA_WRITE:
00868 {
00869 if (!bSuccess) {
00870 std::cerr << "Write failed for page 0x" << std::hex <<
00871 opaqueToInt(page.getBlockId());
00872 ::abort();
00873 }
00874 decrementCounter(nDirtyPages);
00875 victimPolicy.notifyPageClean(static_cast<PageT &>(page));
00876
00877
00878 freePageCondition.notify_all();
00879 }
00880 break;
00881 case CachePage::DATA_READ:
00882 break;
00883 default:
00884 permAssert(false);
00885 break;
00886 }
00887 if (bSuccess) {
00888 CachePage::DataStatus oldStatus = page.dataStatus;
00889 page.dataStatus = CachePage::DATA_CLEAN;
00890 if (page.pMappedPageListener) {
00891 if (oldStatus == CachePage::DATA_READ) {
00892 page.pMappedPageListener->notifyAfterPageRead(page);
00893 } else {
00894 page.pMappedPageListener->notifyAfterPageFlush(page);
00895 }
00896 }
00897 } else {
00898 page.dataStatus = CachePage::DATA_ERROR;
00899 }
00900 page.ioCompletionCondition.notify_all();
00901 }
00902
00903 template <class PageT,class VictimPolicyT>
00904 void CacheImpl<PageT,VictimPolicyT>
00905 ::markPageDirty(CachePage &page)
00906 {
00907 StrictMutexGuard pageGuard(page.mutex);
00908 incrementCounter(nDirtyPages);
00909 bool bValid = page.isDataValid();
00910 page.dataStatus = CachePage::DATA_DIRTY;
00911 victimPolicy.notifyPageDirty(static_cast<PageT &>(page));
00912
00913
00914
00915
00916
00917 pageGuard.unlock();
00918 if (page.pMappedPageListener) {
00919 page.pMappedPageListener->notifyPageDirty(page,bValid);
00920 }
00921 }
00922
00923
00924
00925
00926
00927 template <class PageT,class VictimPolicyT>
00928 uint CacheImpl<PageT,VictimPolicyT>
00929 ::getTimerIntervalMillis()
00930 {
00931 return idleFlushInterval;
00932 }
00933
00934 template <class PageT,class VictimPolicyT>
00935 void CacheImpl<PageT,VictimPolicyT>
00936 ::onTimerInterval()
00937 {
00938 flushSomePages();
00939 }
00940
00941
00942
00943
00944
00945 template <class PageT,class VictimPolicyT>
00946 void CacheImpl<PageT,VictimPolicyT>::closeImpl()
00947 {
00948 if (timerThread.isStarted()) {
00949 timerThread.stop();
00950 }
00951
00952 if (pDeviceAccessScheduler) {
00953 pDeviceAccessScheduler->stop();
00954 }
00955
00956
00957 if (getDevice(NULL_DEVICE_ID)) {
00958 unregisterDevice(NULL_DEVICE_ID);
00959 }
00960
00961
00962 for (uint i = 0; i < deviceTable.size(); i++) {
00963 assert(!deviceTable[i]);
00964 }
00965
00966 deleteAndNullify(pDeviceAccessScheduler);
00967
00968
00969 for (uint i = 0; i < pageTable.size(); i++) {
00970
00971 assert(!pageTable[i]->pageList.size());
00972 deleteAndNullify(pageTable[i]);
00973 }
00974
00975 unmappedBucket.pageList.clear();
00976 unallocatedBucket.pageList.clear();
00977
00978
00979 for (uint i = 0; i < pages.size(); i++) {
00980 if (!pages[i]) {
00981 continue;
00982 }
00983 victimPolicy.unregisterPage(*(pages[i]));
00984 PBuffer pBuffer = pages[i]->pBuffer;
00985 if (pBuffer) {
00986 int errorCode;
00987 if (bufferAllocator.deallocate(pBuffer, &errorCode)) {
00988 throw SysCallExcn("munmap failed", errorCode);
00989 }
00990 }
00991 deleteAndNullify(pages[i]);
00992 }
00993 }
00994
00995 template <class PageT,class VictimPolicyT>
00996 PageT *CacheImpl<PageT,VictimPolicyT>
00997 ::lookupPage(PageBucketT &bucket,BlockId blockId, bool pin)
00998 {
00999 assertCorrectBucket(bucket,blockId);
01000 SXMutexSharedGuard bucketGuard(bucket.mutex);
01001 for (PageBucketIter iter(bucket.pageList); iter; ++iter) {
01002 StrictMutexGuard pageGuard(iter->mutex);
01003 if (iter->getBlockId() == blockId) {
01004 victimPolicy.notifyPageAccess(*iter, pin);
01005 iter->nReferences++;
01006 while (iter->dataStatus == CachePage::DATA_READ) {
01007 iter->waitForPendingIO(pageGuard);
01008 }
01009 return iter;
01010 }
01011 }
01012 return NULL;
01013 }
01014
01015 template <class PageT,class VictimPolicyT>
01016 PageT *CacheImpl<PageT,VictimPolicyT>
01017 ::findFreePage()
01018 {
01019
01020
01021
01022
01023 if (unmappedBucket.pageList.size()) {
01024 SXMutexExclusiveGuard unmappedBucketGuard(unmappedBucket.mutex);
01025 PageBucketMutator mutator(unmappedBucket.pageList);
01026 if (mutator) {
01027 assert(!mutator->hasBlockId());
01028 return mutator.detach();
01029 }
01030 }
01031
01032
01033 uint nToFlush = 10;
01034
01035 VictimSharedGuard victimSharedGuard(victimPolicy.getMutex());
01036 std::pair<VictimPageIterator,VictimPageIterator> victimRange(
01037 victimPolicy.getVictimRange());
01038 for (; victimRange.first != victimRange.second; ++(victimRange.first)) {
01039 PageT &page = *(victimRange.first);
01040
01041 StrictMutexGuard pageGuard(page.mutex, boost::try_to_lock);
01042 if (!pageGuard.owns_lock()) {
01043 continue;
01044 }
01045 if (canVictimizePage(page)) {
01046 if (page.isDirty()) {
01047
01048
01049
01050 if (!nToFlush) {
01051 continue;
01052 }
01053 if (page.pMappedPageListener &&
01054 !page.pMappedPageListener->canFlushPage(page))
01055 {
01056 continue;
01057 }
01058 nToFlush--;
01059 incrementStatsCounter(nVictimizationWrites);
01060
01061
01062 if (!writePageAsync(page)) {
01063 nToFlush = 0;
01064 }
01065 continue;
01066 }
01067
01068
01069 victimSharedGuard.unlock();
01070 unmapPage(page,pageGuard,false);
01071 incrementStatsCounter(nVictimizations);
01072 return &page;
01073 }
01074 }
01075 victimSharedGuard.unlock();
01076
01077
01078 StrictMutexGuard freePageGuard(freePageMutex);
01079 boost::xtime atv;
01080 convertTimeout(100,atv);
01081 freePageCondition.timed_wait(freePageGuard,atv);
01082 return NULL;
01083 }
01084
01085 template <class PageT,class VictimPolicyT>
01086 void CacheImpl<PageT,VictimPolicyT>
01087 ::collectStats(CacheStats &stats)
01088 {
01089 stats.nHits = nCacheHits;
01090 stats.nRequests = nCacheRequests;
01091 stats.nVictimizations = nVictimizations;
01092 stats.nDirtyPages = nDirtyPages;
01093 stats.nPageReads = nPageReads;
01094 stats.nPageWrites = nPageWrites;
01095 stats.nRejectedPrefetches = nRejectedCachePrefetches;
01096 stats.nIoRetries = nIoRetries;
01097 stats.nSuccessfulPrefetches = nSuccessfulCachePrefetches;
01098 stats.nLazyWrites = nLazyWrites;
01099 stats.nLazyWriteCalls = nLazyWriteCalls;
01100 stats.nVictimizationWrites = nVictimizationWrites;
01101 stats.nCheckpointWrites = nCheckpointWrites;
01102 stats.nMemPagesAllocated = getAllocatedPageCount();
01103 stats.nMemPagesUnused = unmappedBucket.pageList.size();
01104 stats.nMemPagesMax = getMaxAllocatedPageCount();
01105
01106
01107 nCacheHits.clear();
01108 nCacheRequests.clear();
01109 nVictimizations.clear();
01110 nPageReads.clear();
01111 nPageWrites.clear();
01112 nRejectedCachePrefetches.clear();
01113 nIoRetries.clear();
01114 nSuccessfulCachePrefetches.clear();
01115 nLazyWrites.clear();
01116 nLazyWriteCalls.clear();
01117 nVictimizationWrites.clear();
01118 nCheckpointWrites.clear();
01119
01120 statsSinceInit.nHitsSinceInit += stats.nHits;
01121 statsSinceInit.nRequestsSinceInit += stats.nRequests;
01122 statsSinceInit.nVictimizationsSinceInit += stats.nVictimizations;
01123 statsSinceInit.nPageReadsSinceInit += stats.nPageReads;
01124 statsSinceInit.nPageWritesSinceInit += stats.nPageWrites;
01125 statsSinceInit.nRejectedPrefetchesSinceInit += stats.nRejectedPrefetches;
01126 statsSinceInit.nIoRetriesSinceInit += stats.nIoRetries;
01127 statsSinceInit.nSuccessfulPrefetchesSinceInit +=
01128 stats.nSuccessfulPrefetches;
01129 statsSinceInit.nLazyWritesSinceInit += stats.nLazyWrites;
01130 statsSinceInit.nLazyWriteCallsSinceInit += stats.nLazyWriteCalls;
01131 statsSinceInit.nVictimizationWritesSinceInit += stats.nVictimizationWrites;
01132 statsSinceInit.nCheckpointWritesSinceInit += stats.nCheckpointWrites;
01133
01134 stats.nHitsSinceInit = statsSinceInit.nHitsSinceInit;
01135 stats.nRequestsSinceInit = statsSinceInit.nRequestsSinceInit;
01136 stats.nVictimizationsSinceInit = statsSinceInit.nVictimizationsSinceInit;
01137 stats.nPageReadsSinceInit = statsSinceInit.nPageReadsSinceInit;
01138 stats.nPageWritesSinceInit = statsSinceInit.nPageWritesSinceInit;
01139 stats.nRejectedPrefetchesSinceInit =
01140 statsSinceInit.nRejectedPrefetchesSinceInit;
01141 stats.nIoRetriesSinceInit =
01142 statsSinceInit.nIoRetriesSinceInit;
01143 stats.nSuccessfulPrefetchesSinceInit =
01144 statsSinceInit.nSuccessfulPrefetchesSinceInit;
01145 stats.nLazyWritesSinceInit = statsSinceInit.nLazyWritesSinceInit;
01146 stats.nLazyWriteCallsSinceInit = statsSinceInit.nLazyWriteCallsSinceInit;
01147 stats.nVictimizationWritesSinceInit =
01148 statsSinceInit.nVictimizationWritesSinceInit;
01149 stats.nCheckpointWritesSinceInit =
01150 statsSinceInit.nCheckpointWritesSinceInit;
01151 }
01152
01153 template <class PageT,class VictimPolicyT>
01154 void CacheImpl<PageT,VictimPolicyT>
01155 ::flushSomePages()
01156 {
01157
01158 uint nToFlush = std::min<uint>(5,nDirtyPages);
01159 if (!nToFlush) {
01160
01161 return;
01162 }
01163
01164
01165 if (!inFlushMode) {
01166 if (nDirtyPages < dirtyHighWaterMark) {
01167 return;
01168 }
01169 inFlushMode = true;
01170 }
01171 if (nDirtyPages < dirtyLowWaterMark) {
01172 inFlushMode = false;
01173 return;
01174 }
01175
01176 incrementStatsCounter(nLazyWriteCalls);
01177 uint nFlushed = 0;
01178 VictimSharedGuard victimSharedGuard(victimPolicy.getMutex());
01179 std::pair<DirtyVictimPageIterator,DirtyVictimPageIterator> victimRange(
01180 victimPolicy.getDirtyVictimRange());
01181 for (; victimRange.first != victimRange.second; ++(victimRange.first)) {
01182 PageT &page = *(victimRange.first);
01183
01184 StrictMutexGuard pageGuard(page.mutex, boost::try_to_lock);
01185 if (!pageGuard.owns_lock()) {
01186 continue;
01187 }
01188 if (!page.isDirty()) {
01189 continue;
01190 }
01191 if (page.isScratchLocked()) {
01192
01193 continue;
01194 }
01195 if (!page.lock.waitFor(LOCKMODE_S_NOWAIT)) {
01196
01197 continue;
01198 } else {
01199
01200 page.lock.release(LOCKMODE_S);
01201 }
01202 if (page.pMappedPageListener &&
01203 !page.pMappedPageListener->canFlushPage(page))
01204 {
01205 continue;
01206 }
01207 incrementStatsCounter(nLazyWrites);
01208
01209
01210 if (!writePageAsync(page)) {
01211 break;
01212 }
01213 nFlushed++;
01214 if (nFlushed >= nToFlush) {
01215 break;
01216 }
01217 }
01218 }
01219
01220 template <class PageT,class VictimPolicyT>
01221 void CacheImpl<PageT,VictimPolicyT>
01222 ::unmapPage(PageT &page,StrictMutexGuard &pageGuard, bool discard)
01223 {
01224 assert(!page.nReferences);
01225 assert(pageGuard.owns_lock());
01226
01227 victimPolicy.notifyPageUnmap(page, discard);
01228 if (page.pMappedPageListener) {
01229 page.pMappedPageListener->notifyPageUnmap(page);
01230 page.pMappedPageListener = NULL;
01231 }
01232 if (page.isDirty()) {
01233 decrementCounter(nDirtyPages);
01234 }
01235
01236
01237
01238
01239
01240 BlockId blockId = page.getBlockId();
01241 page.blockId = NULL_BLOCK_ID;
01242 page.dataStatus = CachePage::DATA_INVALID;
01243 pageGuard.unlock();
01244
01245 PageBucketT &bucket = getHashBucket(blockId);
01246 SXMutexExclusiveGuard bucketGuard(bucket.mutex);
01247 bool bFound = bucket.pageList.remove(page);
01248 assert(bFound);
01249 }
01250
01251 template <class PageT,class VictimPolicyT>
01252 void CacheImpl<PageT,VictimPolicyT>
01253 ::unmapAndFreeDiscardedPage(PageT &page,StrictMutexGuard &pageGuard)
01254 {
01255 while (page.isTransferInProgress()) {
01256 page.waitForPendingIO(pageGuard);
01257 }
01258 unmapPage(page,pageGuard,true);
01259 pageGuard.lock();
01260 assert(!page.nReferences);
01261 freePage(page);
01262 }
01263
01264 template <class PageT,class VictimPolicyT>
01265 PageT &CacheImpl<PageT,VictimPolicyT>
01266 ::mapPage(
01267 PageBucketT &bucket,PageT &page,BlockId blockId,
01268 MappedPageListener *pMappedPageListener,
01269 bool bPendingRead,bool bIncRef)
01270 {
01271 assert(!page.hasBlockId());
01272 assert(!page.isDirty());
01273 assert(getDevice(CompoundId::getDeviceId(blockId)).get());
01274 assertCorrectBucket(bucket,blockId);
01275
01276
01277
01278 SXMutexExclusiveGuard bucketGuard(bucket.mutex);
01279 for (PageBucketIter iter(bucket.pageList); iter; ++iter) {
01280 StrictMutexGuard pageGuard(iter->mutex);
01281 if (iter->getBlockId() == blockId) {
01282
01283 freePage(page);
01284 if (bIncRef) {
01285 iter->nReferences++;
01286 }
01287 bucketGuard.unlock();
01288 assert(pMappedPageListener == iter->pMappedPageListener);
01289 victimPolicy.notifyPageAccess(*iter, bIncRef);
01290 return *iter;
01291 }
01292 }
01293
01294
01295 StrictMutexGuard pageGuard(page.mutex);
01296 page.blockId = blockId;
01297 assert(!page.pMappedPageListener);
01298 page.pMappedPageListener = pMappedPageListener;
01299 if (bIncRef) {
01300 page.nReferences++;
01301 }
01302 if (bPendingRead) {
01303 page.dataStatus = CachePage::DATA_READ;
01304 }
01305 bucket.pageList.push_back(page);
01306 bucketGuard.unlock();
01307 victimPolicy.notifyPageMap(page, bIncRef);
01308 if (pMappedPageListener) {
01309 pMappedPageListener->notifyPageMap(page);
01310 }
01311 return page;
01312 }
01313
01314 template <class PageT,class VictimPolicyT>
01315 bool CacheImpl<PageT,VictimPolicyT>
01316 ::transferPageAsync(PageT &page)
01317 {
01318 SharedRandomAccessDevice &pDevice =
01319 getDevice(CompoundId::getDeviceId(page.getBlockId()));
01320 RandomAccessRequest request;
01321 request.pDevice = pDevice.get();
01322 request.cbOffset = getPageOffset(page.getBlockId());
01323 request.cbTransfer = getPageSize();
01324 if (page.dataStatus == CachePage::DATA_WRITE) {
01325 request.type = RandomAccessRequest::WRITE;
01326 } else {
01327 assert(page.dataStatus == CachePage::DATA_READ);
01328 request.type = RandomAccessRequest::READ;
01329 }
01330 request.bindingList.push_back(page);
01331 bool rc = getDeviceAccessScheduler(*pDevice).schedule(request);
01332 if (!rc) {
01333 ioRetry();
01334 }
01335 return rc;
01336 }
01337
01338 template <class PageT,class VictimPolicyT>
01339 CacheAllocator &CacheImpl<PageT,VictimPolicyT>
01340 ::getAllocator() const
01341 {
01342 return bufferAllocator;
01343 }
01344
01345 template <class PageT,class VictimPolicyT>
01346 inline bool CacheImpl<PageT,VictimPolicyT>
01347 ::readPageAsync(PageT &page)
01348 {
01349 page.dataStatus = CachePage::DATA_READ;
01350 incrementStatsCounter(nPageReads);
01351 return transferPageAsync(page);
01352 }
01353
01354 template <class PageT,class VictimPolicyT>
01355 inline bool CacheImpl<PageT,VictimPolicyT>
01356 ::writePageAsync(PageT &page)
01357 {
01358 assert(page.isDirty());
01359 if (page.pMappedPageListener) {
01360 assert(page.pMappedPageListener->canFlushPage(page));
01361 page.pMappedPageListener->notifyBeforePageFlush(page);
01362 }
01363 page.dataStatus = CachePage::DATA_WRITE;
01364 incrementStatsCounter(nPageWrites);
01365 if (!transferPageAsync(page)) {
01366 return false;
01367 } else {
01368 return true;
01369 }
01370 }
01371
01372 template <class PageT,class VictimPolicyT>
01373 inline FileSize CacheImpl<PageT,VictimPolicyT>
01374 ::getPageOffset(BlockId const &blockId)
01375 {
01376 return ((FileSize) CompoundId::getBlockNum(blockId))
01377 * (FileSize) cbPage;
01378 }
01379
01380 template <class PageT,class VictimPolicyT>
01381 inline PageBucket<PageT> &CacheImpl<PageT,VictimPolicyT>
01382 ::getHashBucket(BlockId const &blockId)
01383 {
01384 std::hash<BlockId> hasher;
01385 size_t hashCode = hasher(blockId);
01386 return *(pageTable[hashCode % pageTable.size()]);
01387 }
01388
01389 template <class PageT,class VictimPolicyT>
01390 inline void CacheImpl<PageT,VictimPolicyT>
01391 ::assertCorrectBucket(PageBucketT &bucket,BlockId const &blockId)
01392 {
01393 assert(&bucket == &(getHashBucket(blockId)));
01394 }
01395
01396 template <class PageT,class VictimPolicyT>
01397 inline void CacheImpl<PageT,VictimPolicyT>
01398 ::freePage(PageT &page)
01399 {
01400 SXMutexExclusiveGuard unmappedBucketGuard(unmappedBucket.mutex);
01401 unmappedBucket.pageList.push_back(page);
01402 }
01403
01404 template <class PageT,class VictimPolicyT>
01405 inline bool CacheImpl<PageT,VictimPolicyT>
01406 ::canVictimizePage(PageT &page)
01407 {
01408
01409
01410
01411
01412 return page.hasBlockId()
01413 && !page.nReferences
01414 && !page.isTransferInProgress();
01415 }
01416
01417 template <class PageT,class VictimPolicyT>
01418 inline void CacheImpl<PageT,VictimPolicyT>
01419 ::incrementCounter(AtomicCounter &x)
01420 {
01421 ++x;
01422 }
01423
01424 template <class PageT,class VictimPolicyT>
01425 inline void CacheImpl<PageT,VictimPolicyT>
01426 ::decrementCounter(AtomicCounter &x)
01427 {
01428 --x;
01429 }
01430
01431 template <class PageT,class VictimPolicyT>
01432 inline void CacheImpl<PageT,VictimPolicyT>
01433 ::incrementStatsCounter(AtomicCounter &x)
01434 {
01435 incrementCounter(x);
01436 }
01437
01438 template <class PageT,class VictimPolicyT>
01439 inline void CacheImpl<PageT,VictimPolicyT>
01440 ::decrementStatsCounter(AtomicCounter &x)
01441 {
01442 decrementCounter(x);
01443 }
01444
01445 FENNEL_END_NAMESPACE
01446
01447 #endif
01448
01449