CacheMethodsImpl.h

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/cache/CacheMethodsImpl.h#32 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #ifndef Fennel_CacheMethodsImpl_Included
00025 #define Fennel_CacheMethodsImpl_Included
00026 
00027 #include "fennel/device/DeviceAccessSchedulerParams.h"
00028 #include "fennel/device/RandomAccessNullDevice.h"
00029 #include "fennel/cache/CacheParams.h"
00030 #include "fennel/common/CompoundId.h"
00031 #include "fennel/common/InvalidParamExcn.h"
00032 #include "fennel/cache/VMAllocator.h"
00033 
00034 // NOTE:  don't include this file directly; include CacheImpl.h instead
00035 
00036 FENNEL_BEGIN_NAMESPACE
00037 
00038 // TODO:  cache data structure consistency check
00039 
00040 // ----------------------------------------------------------------------
00041 // Public entry points
00042 // ----------------------------------------------------------------------
00043 
00044 template <class PageT,class VictimPolicyT>
00045 CacheImpl<PageT,VictimPolicyT>
00046 ::CacheImpl(
00047     CacheParams const &params,
00048     CacheAllocator *pBufferAllocatorInit)
00049 :
00050     deviceTable(CompoundId::getMaxDeviceCount()),
00051     pageTable(),
00052     bufferAllocator(
00053         pBufferAllocatorInit ?
00054         *pBufferAllocatorInit
00055         : *new VMAllocator(params.cbPage,0)),
00056     pBufferAllocator(pBufferAllocatorInit ? NULL : &bufferAllocator),
00057     victimPolicy(params),
00058     timerThread(*this)
00059 {
00060     cbPage = params.cbPage;
00061     pDeviceAccessScheduler = NULL;
00062     inFlushMode = false;
00063 
00064     // TODO - parameterize
00065     dirtyHighWaterPercent = 25;
00066     dirtyLowWaterPercent = 5;
00067 
00068     initializeStats();
00069 
00070     allocatePages(params);
00071 
00072     // initialize page hash table
00073     // NOTE: this is the size of the page hash table; 2N is for a 50%
00074     // load factor, and +1 is to avoid picking an even number
00075     // TODO:  use a static table of primes to pick the least-upper-bound prime
00076     pageTable.resize(2*pages.size()+1);
00077     for (uint i = 0; i < pageTable.size(); i++) {
00078         pageTable[i] = new PageBucketT();
00079     }
00080 
00081     try {
00082         pDeviceAccessScheduler = DeviceAccessScheduler::newScheduler(
00083             params.schedParams);
00084     } catch (FennelExcn &ex) {
00085         close();
00086         throw ex;
00087     }
00088 
00089     // initialize null device
00090     registerDevice(
00091         NULL_DEVICE_ID,
00092         SharedRandomAccessDevice(
00093             new RandomAccessNullDevice()));
00094 
00095     idleFlushInterval = params.idleFlushInterval;
00096     if (idleFlushInterval) {
00097         timerThread.start();
00098     }
00099 
00100     prefetchPagesMax = params.prefetchPagesMax;
00101     prefetchThrottleRate = params.prefetchThrottleRate;
00102 }
00103 
00104 template <class PageT,class VictimPolicyT>
00105 void CacheImpl<PageT,VictimPolicyT>::getPrefetchParams(
00106     uint &prefetchPagesMax,
00107     uint &prefetchThrottleRate)
00108 {
00109     prefetchPagesMax = this->prefetchPagesMax;
00110     prefetchThrottleRate = this->prefetchThrottleRate;
00111 }
00112 
00113 template <class PageT,class VictimPolicyT>
00114 void CacheImpl<PageT,VictimPolicyT>::initializeStats()
00115 {
00116     // clear instantaneous counters too just to avoid confusion
00117     statsSinceInit.nHits = 0;
00118     statsSinceInit.nHitsSinceInit = 0;
00119     statsSinceInit.nRequests = 0;
00120     statsSinceInit.nRequestsSinceInit = 0;
00121     statsSinceInit.nVictimizations = 0;
00122     statsSinceInit.nVictimizationsSinceInit = 0;
00123     statsSinceInit.nDirtyPages = 0;
00124     statsSinceInit.nPageReads = 0;
00125     statsSinceInit.nPageReadsSinceInit = 0;
00126     statsSinceInit.nPageWrites = 0;
00127     statsSinceInit.nPageWritesSinceInit = 0;
00128     statsSinceInit.nRejectedPrefetches = 0;
00129     statsSinceInit.nRejectedPrefetchesSinceInit = 0;
00130     statsSinceInit.nIoRetries = 0;
00131     statsSinceInit.nIoRetriesSinceInit = 0;
00132     statsSinceInit.nSuccessfulPrefetches = 0;
00133     statsSinceInit.nSuccessfulPrefetchesSinceInit = 0;
00134     statsSinceInit.nLazyWrites = 0;
00135     statsSinceInit.nLazyWritesSinceInit = 0;
00136     statsSinceInit.nLazyWriteCalls = 0;
00137     statsSinceInit.nLazyWriteCallsSinceInit = 0;
00138     statsSinceInit.nVictimizationWrites = 0;
00139     statsSinceInit.nVictimizationWritesSinceInit = 0;
00140     statsSinceInit.nCheckpointWrites = 0;
00141     statsSinceInit.nCheckpointWritesSinceInit = 0;
00142     statsSinceInit.nMemPagesAllocated = 0;
00143     statsSinceInit.nMemPagesUnused = 0;
00144     statsSinceInit.nMemPagesMax = 0;
00145 }
00146 
00147 template <class PageT,class VictimPolicyT>
00148 void CacheImpl<PageT,VictimPolicyT>::allocatePages(CacheParams const &params)
00149 {
00150     static const int allocErrorMsgSize = 255;
00151     uint nPagesMax = 0;
00152     uint nPagesInit = 0;
00153 
00154     // Make two attempts: First, use the configured values.  If that fails,
00155     // try again with default nMemPagesMax.  If that fails, throw in the towel.
00156     for (int attempts = 0; attempts < 2; attempts++) {
00157         bool allocError = false;
00158         int allocErrorCode = 0;
00159         char allocErrorMsg[allocErrorMsgSize + 1] = { 0 };
00160 
00161         nPagesMax = params.nMemPagesMax;
00162         nPagesInit = params.nMemPagesInit;
00163 
00164         try {
00165             if (attempts != 0) {
00166                 nPagesMax = CacheParams::defaultMemPagesMax;
00167                 nPagesInit = CacheParams::defaultMemPagesInit;
00168             }
00169 
00170             pages.clear();
00171             if (pages.capacity() > nPagesMax) {
00172                 // Reset capacity of pages to a smaller value by swapping pages
00173                 // with a temporary vector that has tiny capacity.  (Avoid
00174                 // zero capacity since that causes a memset warning.)
00175                 std::vector<PageT *>(1).swap(pages);
00176             }
00177             pages.reserve(nPagesMax);
00178             pages.assign(nPagesMax, NULL);
00179 
00180             // allocate pages, but defer adding all of them onto the free list
00181             for (uint i = 0; i < nPagesMax; i++) {
00182                 PBuffer pBuffer = NULL;
00183                 if (i < nPagesInit) {
00184                     pBuffer = static_cast<PBuffer>(
00185                         bufferAllocator.allocate(&allocErrorCode));
00186                     if (pBuffer == NULL) {
00187                         allocError = true;
00188                         strncpy(
00189                             allocErrorMsg, "mmap failed", allocErrorMsgSize);
00190                         break;
00191                     }
00192                 }
00193                 PageT &page = *new PageT(*this,pBuffer);
00194                 pages[i] = &page;
00195             }
00196         } catch (std::exception &excn) {
00197             allocError = true;
00198             allocErrorCode = 0;
00199             if (dynamic_cast<std::bad_alloc *>(&excn) != NULL) {
00200                 strncpy(allocErrorMsg, "malloc failed", allocErrorMsgSize);
00201             } else {
00202                 strncpy(allocErrorMsg, excn.what(), allocErrorMsgSize);
00203             }
00204         }
00205 
00206         if (!allocError) {
00207             // successful allocation
00208             break;
00209         }
00210 
00211         // Free the allocated pages
00212         for (uint i = 0; i < pages.size(); i++) {
00213             if (!pages[i]) {
00214                 break;
00215             }
00216             PBuffer pBuffer = pages[i]->pBuffer;
00217             deleteAndNullify(pages[i]);
00218             if (pBuffer) {
00219                 // Ignore any error. We are sometimes unable to deallocate
00220                 // pages when trying to recover from initial failure.  Likely
00221                 // the second attempt will fail as well.  This leads to a
00222                 // failed assertion in the VMAllocator destructor.  See the
00223                 // comment there.
00224                 bufferAllocator.deallocate(pBuffer);
00225             }
00226         }
00227 
00228         if (attempts != 0) {
00229             // Reduced page count still failed.  Give up.
00230             close();
00231             throw SysCallExcn(std::string(allocErrorMsg), allocErrorCode);
00232         }
00233     }
00234 
00235     // Go back and add the pages to the free list and register them with
00236     // victimPolicy (requires no further memory allocation as the free lists
00237     // and victim policy use IntrusiveList and IntrusiveDList).
00238     for (uint i = 0; i < pages.size(); i++) {
00239         PageT *page = pages[i];
00240         PBuffer pBuffer = page->pBuffer;
00241         if (pBuffer) {
00242             unmappedBucket.pageList.push_back(*page);
00243             victimPolicy.registerPage(*page);
00244         } else {
00245             unallocatedBucket.pageList.push_back(*page);
00246         }
00247     }
00248 
00249     uint nPages = std::min(nPagesInit, nPagesMax);
00250     calcDirtyThreshholds(nPages);
00251     victimPolicy.setAllocatedPageCount(nPages);
00252 }
00253 
00254 template <class PageT,class VictimPolicyT>
00255 void CacheImpl<PageT,VictimPolicyT>::calcDirtyThreshholds(uint nCachePages)
00256 {
00257     dirtyHighWaterMark = nCachePages * dirtyHighWaterPercent / 100;
00258     dirtyLowWaterMark = nCachePages * dirtyLowWaterPercent / 100;
00259 }
00260 
00261 template <class PageT,class VictimPolicyT>
00262 uint CacheImpl<PageT,VictimPolicyT>::getAllocatedPageCount()
00263 {
00264     SXMutexSharedGuard guard(unallocatedBucket.mutex);
00265     return pages.size() - unallocatedBucket.pageList.size();
00266 }
00267 
00268 template <class PageT,class VictimPolicyT>
00269 uint CacheImpl<PageT,VictimPolicyT>::getMaxAllocatedPageCount()
00270 {
00271     return pages.size();
00272 }
00273 
00274 template <class PageT,class VictimPolicyT>
00275 void CacheImpl<PageT,VictimPolicyT>::setAllocatedPageCount(
00276     uint nMemPagesDesired)
00277 {
00278     assert(nMemPagesDesired <= pages.size());
00279     // exclusive lock unallocatedBucket in case someone is crazy enough to call
00280     // this method from multiple threads
00281     SXMutexExclusiveGuard unallocatedBucketGuard(unallocatedBucket.mutex);
00282     uint nMemPages =
00283         pages.size() - unallocatedBucket.pageList.size();
00284     if (nMemPages < nMemPagesDesired) {
00285         // allocate some more
00286 
00287         // LER-5976: Allocate all pBuffers ahead of time so we can revert to
00288         // the old cache size if there's an allocation error.
00289         int nMemPagesToAllocate = nMemPagesDesired - nMemPages;
00290         std::vector<PBuffer> buffers(nMemPagesToAllocate);
00291 
00292         for (int i = 0; i < nMemPagesToAllocate; ++i) {
00293             int errorCode;
00294             PBuffer pBuffer = static_cast<PBuffer>(
00295                 bufferAllocator.allocate(&errorCode));
00296 
00297             if (pBuffer == NULL) {
00298                 // Release each allocated buffer and re-throw
00299                 for (int i = 0; i < nMemPagesToAllocate; i++) {
00300                     if (buffers[i] == NULL) {
00301                         break;
00302                     }
00303 
00304                     // Ignore any errors and try to deallocate as many of the
00305                     // buffers as possible.  Ignoring errors leads to a failed
00306                     // assertion in the VMAllocator destructor on shutdown. See
00307                     // the comment there.
00308                     bufferAllocator.deallocate(buffers[i]);
00309                 }
00310                 buffers.clear();
00311                 std::vector<PBuffer>(0).swap(buffers); // dealloc vector
00312 
00313                 throw SysCallExcn("mmap failed", errorCode);
00314             }
00315 
00316             buffers[i] = pBuffer;
00317         }
00318 
00319         PageBucketMutator mutator(unallocatedBucket.pageList);
00320         for (int i = 0; i < nMemPagesToAllocate; i++) {
00321             PBuffer pBuffer = buffers[i];
00322             PageT *page = mutator.detach();
00323             assert(!page->pBuffer);
00324             page->pBuffer = pBuffer;
00325             victimPolicy.registerPage(*page);
00326             // move to unmappedBucket
00327             freePage(*page);
00328         }
00329     } else {
00330         // deallocate some
00331         for (; nMemPages > nMemPagesDesired; --nMemPages) {
00332             PageT *page;
00333             do {
00334                 page = findFreePage();
00335             } while (!page);
00336 
00337             int errorCode;
00338             if (bufferAllocator.deallocate(page->pBuffer, &errorCode)) {
00339                 // If the page buffer couldn't be deallocated, put it back
00340                 // before reporting the error
00341                 freePage(*page);
00342                 throw SysCallExcn("munmap failed", errorCode);
00343             }
00344             page->pBuffer = NULL;
00345             victimPolicy.unregisterPage(*page);
00346             // move to unallocatedBucket
00347             unallocatedBucket.pageList.push_back(*page);
00348         }
00349     }
00350 
00351     calcDirtyThreshholds(nMemPagesDesired);
00352     // Notify the policy of the new cache size
00353     victimPolicy.setAllocatedPageCount(nMemPagesDesired);
00354 }
00355 
00356 template <class PageT,class VictimPolicyT>
00357 PageT *CacheImpl<PageT,VictimPolicyT>
00358 ::lockPage(
00359     BlockId blockId,LockMode lockMode,bool readIfUnmapped,
00360     MappedPageListener *pMappedPageListener,TxnId txnId)
00361 {
00362     // first find the page and increment its reference count
00363 
00364     assert(blockId != NULL_BLOCK_ID);
00365     assert(CompoundId::getDeviceId(blockId) != NULL_DEVICE_ID);
00366     PageBucketT &bucket = getHashBucket(blockId);
00367     PageT *page = lookupPage(bucket,blockId,true);
00368     if (page) {
00369         assert(page->pMappedPageListener == pMappedPageListener);
00370         // note that lookupPage incremented page's reference count for us, so
00371         // it's safe from victimization from here on
00372         incrementStatsCounter(nCacheHits);
00373     } else {
00374         do {
00375             page = findFreePage();
00376         } while (!page);
00377 
00378         // note that findFreePage returns an unmapped page, making it safe from
00379         // victimization at this point; mapPage will increment the reference
00380         // count
00381 
00382         PageT &mappedPage = mapPage(
00383             bucket,*page,blockId,pMappedPageListener,readIfUnmapped);
00384         if (&mappedPage == page) {
00385             // mapPage found no existing mapping, so initiate read from disk if
00386             // necessary
00387             if (readIfUnmapped) {
00388                 readPageAsync(*page);
00389             }
00390         } else {
00391             // mapPage found an existing mapping, so forget unused free page,
00392             // and no need to initiate read from disk
00393             page = &mappedPage;
00394         }
00395         if (readIfUnmapped) {
00396             // whether or not an existing mapping was found, need
00397             // to wait for any pending read to complete (either our own started
00398             // above or someone else's)
00399             StrictMutexGuard pageGuard(page->mutex);
00400             while (page->dataStatus == CachePage::DATA_READ) {
00401                 page->waitForPendingIO(pageGuard);
00402             }
00403         }
00404     }
00405 
00406     incrementStatsCounter(nCacheRequests);
00407 
00408     // now acquire the requested lock
00409 
00410     if (!page->lock.waitFor(lockMode,ETERNITY,txnId)) {
00411         // NoWait failed; release reference
00412         assert((lockMode == LOCKMODE_S_NOWAIT) ||
00413                (lockMode == LOCKMODE_X_NOWAIT));
00414         StrictMutexGuard pageGuard(page->mutex);
00415         page->nReferences--;
00416         if (!page->nReferences) {
00417             victimPolicy.notifyPageUnpin(*page);
00418         }
00419         return NULL;
00420     }
00421     if ((lockMode == LOCKMODE_X) || (lockMode == LOCKMODE_X_NOWAIT)) {
00422         // if we're locking the page for write, then need to make sure
00423         // that any pending write completes before this thread starts
00424         // changing the contents
00425 
00426         // REVIEW: can we use double-checked idiom here?
00427         StrictMutexGuard pageGuard(page->mutex);
00428         while (page->dataStatus == CachePage::DATA_WRITE) {
00429             page->waitForPendingIO(pageGuard);
00430         }
00431 #ifdef DEBUG
00432         int errorCode;
00433         if (bufferAllocator.setProtection(
00434                 page->pBuffer, cbPage, false, &errorCode))
00435         {
00436             throw new SysCallExcn("memory protection failed", errorCode);
00437         }
00438 #endif
00439     } else {
00440         // TODO jvs 7-Feb-2006:  protection for other cases
00441 #ifdef DEBUG
00442         StrictMutexGuard pageGuard(page->mutex);
00443         if (page->nReferences == 1) {
00444             int errorCode;
00445             if (bufferAllocator.setProtection(
00446                     page->pBuffer, cbPage, true, &errorCode))
00447             {
00448                 throw new SysCallExcn("memory protection failed", errorCode);
00449             }
00450         }
00451 #endif
00452     }
00453     return page;
00454 }
00455 
00456 template <class PageT,class VictimPolicyT>
00457 void CacheImpl<PageT,VictimPolicyT>
00458 ::unlockPage(
00459     CachePage &vPage,LockMode lockMode,TxnId txnId)
00460 {
00461     assert(lockMode < LOCKMODE_S_NOWAIT);
00462     PageT &page = static_cast<PageT &>(vPage);
00463     StrictMutexGuard pageGuard(page.mutex);
00464     assert(page.nReferences);
00465     bool bFree = false;
00466     assert(page.hasBlockId());
00467     if (CompoundId::getDeviceId(page.getBlockId()) == NULL_DEVICE_ID) {
00468         // originated from lockScratchPage()
00469         bFree = true;
00470     } else {
00471         int errorCode;
00472         if (bufferAllocator.setProtection(
00473                 page.pBuffer, cbPage, false, &errorCode))
00474         {
00475             throw new SysCallExcn("memory protection failed", errorCode);
00476         }
00477 
00478         page.lock.release(lockMode,txnId);
00479     }
00480     page.nReferences--;
00481     if (!page.nReferences) {
00482         if (bFree) {
00483             // The page lock was acquired via lockScratch, so return it to
00484             // the free list.  No need to notify the victimPolicy since
00485             // the policy wasn't notified when the page was locked.
00486             page.dataStatus = CachePage::DATA_INVALID;
00487             page.blockId = NULL_BLOCK_ID;
00488             freePage(page);
00489         } else {
00490             // notify the victim policy that the page is no longer
00491             // being referenced
00492             victimPolicy.notifyPageUnpin(page);
00493         }
00494 
00495         // let waiting threads know that a page has become available
00496         // (either on the free list or as a victimization candidate)
00497         freePageCondition.notify_all();
00498     }
00499 }
00500 
00501 template <class PageT,class VictimPolicyT>
00502 bool CacheImpl<PageT,VictimPolicyT>
00503 ::isPageMapped(BlockId blockId)
00504 {
00505     PageBucketT &bucket = getHashBucket(blockId);
00506     SXMutexSharedGuard bucketGuard(bucket.mutex);
00507     for (PageBucketIter iter(bucket.pageList); iter; ++iter) {
00508         StrictMutexGuard pageGuard(iter->mutex);
00509         if (iter->getBlockId() == blockId) {
00510             bucketGuard.unlock();
00511             victimPolicy.notifyPageAccess(*iter, false);
00512             return true;
00513         }
00514     }
00515     return false;
00516 }
00517 
00518 template <class PageT,class VictimPolicyT>
00519 void CacheImpl<PageT,VictimPolicyT>
00520 ::discardPage(BlockId blockId)
00521 {
00522     assert(blockId != NULL_BLOCK_ID);
00523     PageBucketT &bucket = getHashBucket(blockId);
00524     PageT *page = lookupPage(bucket,blockId,false);
00525     if (!page) {
00526         // page is not mapped, so nothing to discard, but still need to
00527         // notify the policy
00528         victimPolicy.notifyPageDiscard(blockId);
00529         return;
00530     }
00531     StrictMutexGuard pageGuard(page->mutex);
00532     // lookupPage already waited for pending reads, but also need to wait for
00533     // pending writes
00534     // REVIEW:  isn't this redundant with code in unmapAndFreeDiscardedPage?
00535     while (page->dataStatus == CachePage::DATA_WRITE) {
00536         page->waitForPendingIO(pageGuard);
00537     }
00538     // our own lookupPage adds 1 reference; it should be the only one left
00539     assert(page->nReferences == 1);
00540     page->nReferences = 0;
00541     unmapAndFreeDiscardedPage(*page,pageGuard);
00542 }
00543 
00544 template <class PageT,class VictimPolicyT>
00545 PageT &CacheImpl<PageT,VictimPolicyT>
00546 ::lockScratchPage(BlockNum blockNum)
00547 {
00548     PageT *page;
00549     do {
00550         page = findFreePage();
00551     } while (!page);
00552 
00553     StrictMutexGuard pageGuard(page->mutex);
00554     page->nReferences = 1;
00555     // Set dirty early to avoid work on first call to getWritableData.
00556     // No need to notify the victimPolicy that the page is dirty because
00557     // scratch pages are locked for the duration of their use so they're
00558     // never candidates for victimization or flushing.
00559     page->dataStatus = CachePage::DATA_DIRTY;
00560     CompoundId::setDeviceId(page->blockId,NULL_DEVICE_ID);
00561     CompoundId::setBlockNum(page->blockId,blockNum);
00562 
00563     return *page;
00564 }
00565 
00566 template <class PageT,class VictimPolicyT>
00567 bool CacheImpl<PageT,VictimPolicyT>
00568 ::prefetchPage(BlockId blockId,MappedPageListener *pMappedPageListener)
00569 {
00570     assert(blockId != NULL_BLOCK_ID);
00571     if (isPageMapped(blockId)) {
00572         // already mapped:  either it's already fully read or someone
00573         // else has initiated a read; either way, nothing for us to do
00574         successfulPrefetch();
00575         return true;
00576     }
00577     PageT *page = findFreePage();
00578     if (!page) {
00579         // cache is low on free pages:  ignore prefetch hint
00580         rejectedPrefetch();
00581         return false;
00582     }
00583 
00584     PageBucketT &bucket = getHashBucket(blockId);
00585     bool bPendingRead = true;
00586     // don't need to increment the page reference count since the pending
00587     // read will protect the page from victimization, and the calling thread
00588     // doesn't actually want a reference until it locks the page later
00589     bool bIncRef = false;
00590     PageT &mappedPage = mapPage(
00591         bucket,*page,blockId,pMappedPageListener,bPendingRead,bIncRef);
00592     if (&mappedPage == page) {
00593         if (readPageAsync(*page)) {
00594             successfulPrefetch();
00595         } else {
00596             rejectedPrefetch();
00597             return false;
00598         }
00599     } else {
00600         // forget unused free page, and don't bother with read since someone
00601         // else must already have kicked it off
00602         page = &mappedPage;
00603     }
00604     return true;
00605 }
00606 
00607 template <class PageT,class VictimPolicyT>
00608 void CacheImpl<PageT,VictimPolicyT>
00609 ::prefetchBatch(
00610     BlockId blockId,uint nPagesPerBatch,
00611     MappedPageListener *pMappedPageListener)
00612 {
00613     assert(blockId != NULL_BLOCK_ID);
00614     assert(nPagesPerBatch > 1);
00615 
00616     SharedRandomAccessDevice &pDevice = getDevice(
00617         CompoundId::getDeviceId(blockId));
00618     DeviceAccessScheduler &scheduler = getDeviceAccessScheduler(*pDevice);
00619     RandomAccessRequest request;
00620     request.pDevice = pDevice.get();
00621     request.cbOffset = getPageOffset(blockId);
00622     request.cbTransfer = 0;
00623     request.type = RandomAccessRequest::READ;
00624 
00625     BlockId blockIdi = blockId;
00626     for (uint i = 0; i < nPagesPerBatch; i++) {
00627         PageT *page;
00628         do {
00629             page = findFreePage();
00630         } while (!page);
00631 
00632         PageBucketT &bucket = getHashBucket(blockIdi);
00633         bool bPendingRead = true;
00634         bool bIncRef = false;
00635         PageT &mappedPage = mapPage(
00636             bucket,*page,blockIdi,pMappedPageListener,bPendingRead,bIncRef);
00637         if (&mappedPage != page) {
00638             // This page already mapped; can't do batch prefetch.  For the
00639             // pages which we've already mapped, initiate transfer.
00640             // For this page, skip altogether since it's already been read
00641             // (or has a read in progress).  For remaining pages, continue
00642             // building new request.
00643             if (request.cbTransfer) {
00644                 if (scheduler.schedule(request)) {
00645                     successfulPrefetch();
00646                 } else {
00647                     ioRetry();
00648                     rejectedPrefetch();
00649                 }
00650             }
00651             // adjust start past transfer just initiated plus already mapped
00652             // page
00653             request.cbOffset += request.cbTransfer;
00654             request.cbOffset += getPageSize();
00655             request.cbTransfer = 0;
00656             request.bindingList.clear(false);
00657         } else {
00658             // add this page to the request
00659             request.cbTransfer += getPageSize();
00660             request.bindingList.push_back(*page);
00661         }
00662         CompoundId::incBlockNum(blockIdi);
00663     }
00664     // deal with leftovers
00665     if (request.cbTransfer) {
00666         if (scheduler.schedule(request)) {
00667             successfulPrefetch();
00668         } else {
00669             ioRetry();
00670             rejectedPrefetch();
00671         }
00672     }
00673 }
00674 
00675 template <class PageT,class VictimPolicyT>
00676 void CacheImpl<PageT,VictimPolicyT>
00677 ::successfulPrefetch()
00678 {
00679     incrementStatsCounter(nSuccessfulCachePrefetches);
00680 }
00681 
00682 template <class PageT,class VictimPolicyT>
00683 void CacheImpl<PageT,VictimPolicyT>
00684 ::rejectedPrefetch()
00685 {
00686     incrementStatsCounter(nRejectedCachePrefetches);
00687 }
00688 
00689 template <class PageT,class VictimPolicyT>
00690 void CacheImpl<PageT,VictimPolicyT>
00691 ::ioRetry()
00692 {
00693     incrementStatsCounter(nIoRetries);
00694 }
00695 
00696 template <class PageT,class VictimPolicyT>
00697 uint CacheImpl<PageT,VictimPolicyT>
00698 ::checkpointPages(
00699     PagePredicate &pagePredicate,CheckpointType checkpointType)
00700 {
00701     // TODO:  change RandomAccessRequest interface so that we can gang
00702     // these all up into one big discontiguous request
00703 
00704     uint nPages = 0;
00705     bool countPages = true;
00706 
00707     FlushPhase flushPhase;
00708     if (checkpointType >= CHECKPOINT_FLUSH_AND_UNMAP) {
00709         flushPhase = phaseInitiate;
00710     } else {
00711         flushPhase = phaseSkip;
00712     }
00713     for (;;) {
00714         for (uint i = 0; i < pages.size(); i++) {
00715             PageT &page = *(pages[i]);
00716             StrictMutexGuard pageGuard(page.mutex);
00717             // restrict view to just mapped pages of interest
00718             if (!page.hasBlockId()) {
00719                 continue;
00720             }
00721             if (!pagePredicate(page)) {
00722                 continue;
00723             }
00724             if (countPages) {
00725                 ++nPages;
00726             }
00727             if (flushPhase == phaseInitiate) {
00728                 if (page.isDirty()) {
00729                     // shouldn't be flushing a page if someone is currently
00730                     // scribbling on it
00731                     assert(!page.isExclusiveLockHeld());
00732                     incrementStatsCounter(nCheckpointWrites);
00733                     // initiate a flush
00734                     writePageAsync(page);
00735                 }
00736             } else if (flushPhase == phaseWait) {
00737                 BlockId origBlockId = page.getBlockId();
00738                 MappedPageListener *origListener = page.pMappedPageListener;
00739                 while (page.dataStatus == CachePage::DATA_WRITE) {
00740                     page.waitForPendingIO(pageGuard);
00741                 }
00742 
00743                 // If this page has been remapped during sleeps that occurred
00744                 // while waiting for the page I/O to complete, then there's
00745                 // no need to reset the listener, since the remap has
00746                 // effectively reset the listener.  (TODO: zfong 6/23/08 -
00747                 // Add a unit testcase for this.)
00748                 //
00749                 // Otherwise, reset the listener, if called for by the original
00750                 // listener.  Note that by doing so, during the next iteration
00751                 // in the outermost for loop in this method when we're
00752                 // unmapping cache entries, we will not unmap this page
00753                 // because we've changed the listener.
00754                 if (page.pMappedPageListener &&
00755                     page.pMappedPageListener == origListener &&
00756                     page.getBlockId() == origBlockId)
00757                 {
00758                     MappedPageListener *newListener =
00759                     page.pMappedPageListener->notifyAfterPageCheckpointFlush(
00760                         page);
00761                     if (newListener != NULL) {
00762                         page.pMappedPageListener = newListener;
00763                     }
00764                 }
00765             } else {
00766                 if (checkpointType <= CHECKPOINT_FLUSH_AND_UNMAP) {
00767                     unmapAndFreeDiscardedPage(page,pageGuard);
00768                 }
00769             }
00770         }
00771         countPages = false;
00772         if (flushPhase == phaseInitiate) {
00773             flushPhase = phaseWait;
00774             continue;
00775         }
00776         if (flushPhase == phaseWait) {
00777             if (checkpointType <= CHECKPOINT_FLUSH_AND_UNMAP) {
00778                 flushPhase = phaseSkip;
00779                 continue;
00780             }
00781         }
00782         return nPages;
00783     }
00784 }
00785 
00786 template <class PageT,class VictimPolicyT>
00787 void CacheImpl<PageT,VictimPolicyT>
00788 ::flushPage(CachePage &page,bool async)
00789 {
00790     StrictMutexGuard pageGuard(page.mutex);
00791     assert(page.isExclusiveLockHeld());
00792     if (page.pMappedPageListener) {
00793         if (!page.pMappedPageListener->canFlushPage(page)) {
00794             if (async) {
00795                 // TODO jvs 21-Jan-2006: instead of ignoring request, fail; we
00796                 // should be using Segment-level logic to avoid ever getting
00797                 // here
00798                 return;
00799             }
00800             permFail("attempt to flush page out of order");
00801         }
00802     }
00803     if (page.dataStatus != CachePage::DATA_WRITE) {
00804         // no flush already in progress, so request one
00805         writePageAsync(static_cast<PageT &>(page));
00806     }
00807     if (async) {
00808         return;
00809     }
00810     // wait for flush to complete
00811     while (page.dataStatus == CachePage::DATA_WRITE) {
00812         page.waitForPendingIO(pageGuard);
00813     }
00814 }
00815 
00816 template <class PageT,class VictimPolicyT>
00817 void CacheImpl<PageT,VictimPolicyT>
00818 ::nicePage(CachePage &page)
00819 {
00820     victimPolicy.notifyPageNice(static_cast<PageT &>(page));
00821 }
00822 
00823 template <class PageT,class VictimPolicyT>
00824 void CacheImpl<PageT,VictimPolicyT>
00825 ::registerDevice(DeviceId deviceId,SharedRandomAccessDevice pDevice)
00826 {
00827     assert(deviceTable[opaqueToInt(deviceId)] == NULL);
00828     deviceTable[opaqueToInt(deviceId)] = pDevice;
00829     pDeviceAccessScheduler->registerDevice(pDevice);
00830 }
00831 
00832 template <class PageT,class VictimPolicyT>
00833 void CacheImpl<PageT,VictimPolicyT>
00834 ::unregisterDevice(DeviceId deviceId)
00835 {
00836     SharedRandomAccessDevice &pDevice = getDevice(deviceId);
00837     assert(pDevice);
00838     DeviceIdPagePredicate pagePredicate(deviceId);
00839     uint nPages = checkpointPages(pagePredicate,CHECKPOINT_DISCARD);
00840     assert(!nPages);
00841     pDeviceAccessScheduler->unregisterDevice(pDevice);
00842     pDevice.reset();
00843 }
00844 
00845 template <class PageT,class VictimPolicyT>
00846 SharedRandomAccessDevice &CacheImpl<PageT,VictimPolicyT>
00847 ::getDevice(DeviceId deviceId)
00848 {
00849     return deviceTable[opaqueToInt(deviceId)];
00850 }
00851 
00852 // ----------------------------------------------------------------------
00853 // Notification methods called from friend Page
00854 // ----------------------------------------------------------------------
00855 
00856 template <class PageT,class VictimPolicyT>
00857 void CacheImpl<PageT,VictimPolicyT>
00858 ::notifyTransferCompletion(CachePage &page,bool bSuccess)
00859 {
00860     StrictMutexGuard pageGuard(page.mutex);
00861     // NOTE: A write failure is always a panic, because there's nothing we
00862     // can do to recover from it.  However, read failures may be expected under
00863     // some recovery conditions, and will be detected as an assertion when the
00864     // caller invokes readablePage() on the locked page.  Callers in recovery
00865     // can use isDataValid() to avoid the assertion.
00866     switch (page.dataStatus) {
00867     case CachePage::DATA_WRITE:
00868         {
00869             if (!bSuccess) {
00870                 std::cerr << "Write failed for page 0x" << std::hex <<
00871                     opaqueToInt(page.getBlockId());
00872                 ::abort();
00873             }
00874             decrementCounter(nDirtyPages);
00875             victimPolicy.notifyPageClean(static_cast<PageT &>(page));
00876             // let waiting threads know that this page may now be available
00877             // for victimization
00878             freePageCondition.notify_all();
00879         }
00880         break;
00881     case CachePage::DATA_READ:
00882         break;
00883     default:
00884         permAssert(false);
00885         break;
00886     }
00887     if (bSuccess) {
00888         CachePage::DataStatus oldStatus = page.dataStatus;
00889         page.dataStatus = CachePage::DATA_CLEAN;
00890         if (page.pMappedPageListener) {
00891             if (oldStatus == CachePage::DATA_READ) {
00892                 page.pMappedPageListener->notifyAfterPageRead(page);
00893             } else {
00894                 page.pMappedPageListener->notifyAfterPageFlush(page);
00895             }
00896         }
00897     } else {
00898         page.dataStatus = CachePage::DATA_ERROR;
00899     }
00900     page.ioCompletionCondition.notify_all();
00901 }
00902 
00903 template <class PageT,class VictimPolicyT>
00904 void CacheImpl<PageT,VictimPolicyT>
00905 ::markPageDirty(CachePage &page)
00906 {
00907     StrictMutexGuard pageGuard(page.mutex);
00908     incrementCounter(nDirtyPages);
00909     bool bValid = page.isDataValid();
00910     page.dataStatus = CachePage::DATA_DIRTY;
00911     victimPolicy.notifyPageDirty(static_cast<PageT &>(page));
00912 
00913     // No synchronization required during notification because caller already
00914     // holds exclusive lock on page.  The notification is called AFTER the page
00915     // has already been marked dirty in case the listener needs to write to
00916     // the page (otherwise an infinite loop would occur).
00917     pageGuard.unlock();
00918     if (page.pMappedPageListener) {
00919         page.pMappedPageListener->notifyPageDirty(page,bValid);
00920     }
00921 }
00922 
00923 // ----------------------------------------------------------------------
00924 // Implementation of TimerThreadClient interface
00925 // ----------------------------------------------------------------------
00926 
00927 template <class PageT,class VictimPolicyT>
00928 uint CacheImpl<PageT,VictimPolicyT>
00929 ::getTimerIntervalMillis()
00930 {
00931     return idleFlushInterval;
00932 }
00933 
00934 template <class PageT,class VictimPolicyT>
00935 void CacheImpl<PageT,VictimPolicyT>
00936 ::onTimerInterval()
00937 {
00938     flushSomePages();
00939 }
00940 
00941 // ----------------------------------------------------------------------
00942 // Private implementation methods
00943 // ----------------------------------------------------------------------
00944 
00945 template <class PageT,class VictimPolicyT>
00946 void CacheImpl<PageT,VictimPolicyT>::closeImpl()
00947 {
00948     if (timerThread.isStarted()) {
00949         timerThread.stop();
00950     }
00951 
00952     if (pDeviceAccessScheduler) {
00953         pDeviceAccessScheduler->stop();
00954     }
00955 
00956     // unregister the null device
00957     if (getDevice(NULL_DEVICE_ID)) {
00958         unregisterDevice(NULL_DEVICE_ID);
00959     }
00960 
00961     // make sure all devices got unregistered
00962     for (uint i = 0; i < deviceTable.size(); i++) {
00963         assert(!deviceTable[i]);
00964     }
00965 
00966     deleteAndNullify(pDeviceAccessScheduler);
00967 
00968     // clean up page hash table
00969     for (uint i = 0; i < pageTable.size(); i++) {
00970         // all pages should already have been unmapped
00971         assert(!pageTable[i]->pageList.size());
00972         deleteAndNullify(pageTable[i]);
00973     }
00974 
00975     unmappedBucket.pageList.clear();
00976     unallocatedBucket.pageList.clear();
00977 
00978     // deallocate all pages
00979     for (uint i = 0; i < pages.size(); i++) {
00980         if (!pages[i]) {
00981             continue;
00982         }
00983         victimPolicy.unregisterPage(*(pages[i]));
00984         PBuffer pBuffer = pages[i]->pBuffer;
00985         if (pBuffer) {
00986             int errorCode;
00987             if (bufferAllocator.deallocate(pBuffer, &errorCode)) {
00988                 throw SysCallExcn("munmap failed", errorCode);
00989             }
00990         }
00991         deleteAndNullify(pages[i]);
00992     }
00993 }
00994 
00995 template <class PageT,class VictimPolicyT>
00996 PageT *CacheImpl<PageT,VictimPolicyT>
00997 ::lookupPage(PageBucketT &bucket,BlockId blockId, bool pin)
00998 {
00999     assertCorrectBucket(bucket,blockId);
01000     SXMutexSharedGuard bucketGuard(bucket.mutex);
01001     for (PageBucketIter iter(bucket.pageList); iter; ++iter) {
01002         StrictMutexGuard pageGuard(iter->mutex);
01003         if (iter->getBlockId() == blockId) {
01004             victimPolicy.notifyPageAccess(*iter, pin);
01005             iter->nReferences++;
01006             while (iter->dataStatus == CachePage::DATA_READ) {
01007                 iter->waitForPendingIO(pageGuard);
01008             }
01009             return iter;
01010         }
01011     }
01012     return NULL;
01013 }
01014 
01015 template <class PageT,class VictimPolicyT>
01016 PageT *CacheImpl<PageT,VictimPolicyT>
01017 ::findFreePage()
01018 {
01019     // Check unmappedBucket first.  Note the use of the double-checked locking
01020     // idiom here; it's OK because perfect accuracy is not required.  Under
01021     // steady-state conditions, unmappedBucket will be empty, so avoiding
01022     // unnecessary locking is a worthwhile optimization.
01023     if (unmappedBucket.pageList.size()) {
01024         SXMutexExclusiveGuard unmappedBucketGuard(unmappedBucket.mutex);
01025         PageBucketMutator mutator(unmappedBucket.pageList);
01026         if (mutator) {
01027             assert(!mutator->hasBlockId());
01028             return mutator.detach();
01029         }
01030     }
01031     // search for a victimizable page, trying pages in the order recommended
01032     // by victimPolicy
01033     uint nToFlush = 10;
01034 
01035     VictimSharedGuard victimSharedGuard(victimPolicy.getMutex());
01036     std::pair<VictimPageIterator,VictimPageIterator> victimRange(
01037         victimPolicy.getVictimRange());
01038     for (; victimRange.first != victimRange.second; ++(victimRange.first)) {
01039         PageT &page = *(victimRange.first);
01040         // if page mutex is unavailable, just skip it
01041         StrictMutexGuard pageGuard(page.mutex, boost::try_to_lock);
01042         if (!pageGuard.owns_lock()) {
01043             continue;
01044         }
01045         if (canVictimizePage(page)) {
01046             if (page.isDirty()) {
01047                 // can't victimize a dirty page; kick off an async write
01048                 // and maybe later when we come back to try again it will
01049                 // be available
01050                 if (!nToFlush) {
01051                     continue;
01052                 }
01053                 if (page.pMappedPageListener &&
01054                     !page.pMappedPageListener->canFlushPage(page))
01055                 {
01056                     continue;
01057                 }
01058                 nToFlush--;
01059                 incrementStatsCounter(nVictimizationWrites);
01060                 // If the write request required retry, don't submit any
01061                 // additional write requests in this loop
01062                 if (!writePageAsync(page)) {
01063                     nToFlush = 0;
01064                 }
01065                 continue;
01066             }
01067             // NOTE:  have to do this early since unmapPage will
01068             // call back into victimPolicy, which could deadlock
01069             victimSharedGuard.unlock();
01070             unmapPage(page,pageGuard,false);
01071             incrementStatsCounter(nVictimizations);
01072             return &page;
01073         }
01074     }
01075     victimSharedGuard.unlock();
01076 
01077     // no free pages, so wait for one (with timeout just in case)
01078     StrictMutexGuard freePageGuard(freePageMutex);
01079     boost::xtime atv;
01080     convertTimeout(100,atv);
01081     freePageCondition.timed_wait(freePageGuard,atv);
01082     return NULL;
01083 }
01084 
01085 template <class PageT,class VictimPolicyT>
01086 void CacheImpl<PageT,VictimPolicyT>
01087 ::collectStats(CacheStats &stats)
01088 {
01089     stats.nHits = nCacheHits;
01090     stats.nRequests = nCacheRequests;
01091     stats.nVictimizations = nVictimizations;
01092     stats.nDirtyPages = nDirtyPages;
01093     stats.nPageReads = nPageReads;
01094     stats.nPageWrites = nPageWrites;
01095     stats.nRejectedPrefetches = nRejectedCachePrefetches;
01096     stats.nIoRetries = nIoRetries;
01097     stats.nSuccessfulPrefetches = nSuccessfulCachePrefetches;
01098     stats.nLazyWrites = nLazyWrites;
01099     stats.nLazyWriteCalls = nLazyWriteCalls;
01100     stats.nVictimizationWrites = nVictimizationWrites;
01101     stats.nCheckpointWrites = nCheckpointWrites;
01102     stats.nMemPagesAllocated = getAllocatedPageCount();
01103     stats.nMemPagesUnused = unmappedBucket.pageList.size();
01104     stats.nMemPagesMax = getMaxAllocatedPageCount();
01105 
01106     // NOTE:  nDirtyPages is not cumulative; don't clear it!
01107     nCacheHits.clear();
01108     nCacheRequests.clear();
01109     nVictimizations.clear();
01110     nPageReads.clear();
01111     nPageWrites.clear();
01112     nRejectedCachePrefetches.clear();
01113     nIoRetries.clear();
01114     nSuccessfulCachePrefetches.clear();
01115     nLazyWrites.clear();
01116     nLazyWriteCalls.clear();
01117     nVictimizationWrites.clear();
01118     nCheckpointWrites.clear();
01119 
01120     statsSinceInit.nHitsSinceInit += stats.nHits;
01121     statsSinceInit.nRequestsSinceInit += stats.nRequests;
01122     statsSinceInit.nVictimizationsSinceInit += stats.nVictimizations;
01123     statsSinceInit.nPageReadsSinceInit += stats.nPageReads;
01124     statsSinceInit.nPageWritesSinceInit += stats.nPageWrites;
01125     statsSinceInit.nRejectedPrefetchesSinceInit += stats.nRejectedPrefetches;
01126     statsSinceInit.nIoRetriesSinceInit += stats.nIoRetries;
01127     statsSinceInit.nSuccessfulPrefetchesSinceInit +=
01128         stats.nSuccessfulPrefetches;
01129     statsSinceInit.nLazyWritesSinceInit += stats.nLazyWrites;
01130     statsSinceInit.nLazyWriteCallsSinceInit += stats.nLazyWriteCalls;
01131     statsSinceInit.nVictimizationWritesSinceInit += stats.nVictimizationWrites;
01132     statsSinceInit.nCheckpointWritesSinceInit += stats.nCheckpointWrites;
01133 
01134     stats.nHitsSinceInit = statsSinceInit.nHitsSinceInit;
01135     stats.nRequestsSinceInit = statsSinceInit.nRequestsSinceInit;
01136     stats.nVictimizationsSinceInit = statsSinceInit.nVictimizationsSinceInit;
01137     stats.nPageReadsSinceInit = statsSinceInit.nPageReadsSinceInit;
01138     stats.nPageWritesSinceInit = statsSinceInit.nPageWritesSinceInit;
01139     stats.nRejectedPrefetchesSinceInit =
01140         statsSinceInit.nRejectedPrefetchesSinceInit;
01141     stats.nIoRetriesSinceInit =
01142         statsSinceInit.nIoRetriesSinceInit;
01143     stats.nSuccessfulPrefetchesSinceInit =
01144         statsSinceInit.nSuccessfulPrefetchesSinceInit;
01145     stats.nLazyWritesSinceInit = statsSinceInit.nLazyWritesSinceInit;
01146     stats.nLazyWriteCallsSinceInit = statsSinceInit.nLazyWriteCallsSinceInit;
01147     stats.nVictimizationWritesSinceInit =
01148         statsSinceInit.nVictimizationWritesSinceInit;
01149     stats.nCheckpointWritesSinceInit =
01150         statsSinceInit.nCheckpointWritesSinceInit;
01151 }
01152 
01153 template <class PageT,class VictimPolicyT>
01154 void CacheImpl<PageT,VictimPolicyT>
01155 ::flushSomePages()
01156 {
01157     // TODO:  parameterize
01158     uint nToFlush = std::min<uint>(5,nDirtyPages);
01159     if (!nToFlush) {
01160         // in case there aren't any dirty buffers to start with
01161         return;
01162     }
01163 
01164     // Only flush if we're within the dirty threshholds
01165     if (!inFlushMode) {
01166         if (nDirtyPages < dirtyHighWaterMark) {
01167             return;
01168         }
01169         inFlushMode = true;
01170     }
01171     if (nDirtyPages < dirtyLowWaterMark) {
01172         inFlushMode = false;
01173         return;
01174     }
01175 
01176     incrementStatsCounter(nLazyWriteCalls);
01177     uint nFlushed = 0;
01178     VictimSharedGuard victimSharedGuard(victimPolicy.getMutex());
01179     std::pair<DirtyVictimPageIterator,DirtyVictimPageIterator> victimRange(
01180         victimPolicy.getDirtyVictimRange());
01181     for (; victimRange.first != victimRange.second; ++(victimRange.first)) {
01182         PageT &page = *(victimRange.first);
01183         // if page mutex is unavailable, just skip it
01184         StrictMutexGuard pageGuard(page.mutex, boost::try_to_lock);
01185         if (!pageGuard.owns_lock()) {
01186             continue;
01187         }
01188         if (!page.isDirty()) {
01189             continue;
01190         }
01191         if (page.isScratchLocked()) {
01192             // someone has the page scratch-locked
01193             continue;
01194         }
01195         if (!page.lock.waitFor(LOCKMODE_S_NOWAIT)) {
01196             // someone has the page write-locked
01197             continue;
01198         } else {
01199             // release our test lock just acquired
01200             page.lock.release(LOCKMODE_S);
01201         }
01202         if (page.pMappedPageListener &&
01203             !page.pMappedPageListener->canFlushPage(page))
01204         {
01205             continue;
01206         }
01207         incrementStatsCounter(nLazyWrites);
01208         // If the write request required retry, don't submit any
01209         // additional write requests
01210         if (!writePageAsync(page)) {
01211             break;
01212         }
01213         nFlushed++;
01214         if (nFlushed >= nToFlush) {
01215             break;
01216         }
01217     }
01218 }
01219 
01220 template <class PageT,class VictimPolicyT>
01221 void CacheImpl<PageT,VictimPolicyT>
01222 ::unmapPage(PageT &page,StrictMutexGuard &pageGuard, bool discard)
01223 {
01224     assert(!page.nReferences);
01225     assert(pageGuard.owns_lock());
01226 
01227     victimPolicy.notifyPageUnmap(page, discard);
01228     if (page.pMappedPageListener) {
01229         page.pMappedPageListener->notifyPageUnmap(page);
01230         page.pMappedPageListener = NULL;
01231     }
01232     if (page.isDirty()) {
01233         decrementCounter(nDirtyPages);
01234     }
01235 
01236     // NOTE:  to get the locking sequence safe for deadlock avoidance,
01237     // we're going to have to release the page mutex.  To indicate that the
01238     // page is being unmapped (so that no one else tries to lock it or
01239     // victimize it), we first clear the BlockId, saving it for our own use.
01240     BlockId blockId = page.getBlockId();
01241     page.blockId = NULL_BLOCK_ID;
01242     page.dataStatus = CachePage::DATA_INVALID;
01243     pageGuard.unlock();
01244 
01245     PageBucketT &bucket = getHashBucket(blockId);
01246     SXMutexExclusiveGuard bucketGuard(bucket.mutex);
01247     bool bFound = bucket.pageList.remove(page);
01248     assert(bFound);
01249 }
01250 
01251 template <class PageT,class VictimPolicyT>
01252 void CacheImpl<PageT,VictimPolicyT>
01253 ::unmapAndFreeDiscardedPage(PageT &page,StrictMutexGuard &pageGuard)
01254 {
01255     while (page.isTransferInProgress()) {
01256         page.waitForPendingIO(pageGuard);
01257     }
01258     unmapPage(page,pageGuard,true);
01259     pageGuard.lock();
01260     assert(!page.nReferences);
01261     freePage(page);
01262 }
01263 
01264 template <class PageT,class VictimPolicyT>
01265 PageT &CacheImpl<PageT,VictimPolicyT>
01266 ::mapPage(
01267     PageBucketT &bucket,PageT &page,BlockId blockId,
01268     MappedPageListener *pMappedPageListener,
01269     bool bPendingRead,bool bIncRef)
01270 {
01271     assert(!page.hasBlockId());
01272     assert(!page.isDirty());
01273     assert(getDevice(CompoundId::getDeviceId(blockId)).get());
01274     assertCorrectBucket(bucket,blockId);
01275 
01276     // check existing pages in hash bucket in case someone else just mapped the
01277     // same page
01278     SXMutexExclusiveGuard bucketGuard(bucket.mutex);
01279     for (PageBucketIter iter(bucket.pageList); iter; ++iter) {
01280         StrictMutexGuard pageGuard(iter->mutex);
01281         if (iter->getBlockId() == blockId) {
01282             // blockId already mapped; discard new page and return existing page
01283             freePage(page);
01284             if (bIncRef) {
01285                 iter->nReferences++;
01286             }
01287             bucketGuard.unlock();
01288             assert(pMappedPageListener == iter->pMappedPageListener);
01289             victimPolicy.notifyPageAccess(*iter, bIncRef);
01290             return *iter;
01291         }
01292     }
01293 
01294     // not found:  add new page instead
01295     StrictMutexGuard pageGuard(page.mutex);
01296     page.blockId = blockId;
01297     assert(!page.pMappedPageListener);
01298     page.pMappedPageListener = pMappedPageListener;
01299     if (bIncRef) {
01300         page.nReferences++;
01301     }
01302     if (bPendingRead) {
01303         page.dataStatus = CachePage::DATA_READ;
01304     }
01305     bucket.pageList.push_back(page);
01306     bucketGuard.unlock();
01307     victimPolicy.notifyPageMap(page, bIncRef);
01308     if (pMappedPageListener) {
01309         pMappedPageListener->notifyPageMap(page);
01310     }
01311     return page;
01312 }
01313 
01314 template <class PageT,class VictimPolicyT>
01315 bool CacheImpl<PageT,VictimPolicyT>
01316 ::transferPageAsync(PageT &page)
01317 {
01318     SharedRandomAccessDevice &pDevice =
01319         getDevice(CompoundId::getDeviceId(page.getBlockId()));
01320     RandomAccessRequest request;
01321     request.pDevice = pDevice.get();
01322     request.cbOffset = getPageOffset(page.getBlockId());
01323     request.cbTransfer = getPageSize();
01324     if (page.dataStatus == CachePage::DATA_WRITE) {
01325         request.type = RandomAccessRequest::WRITE;
01326     } else {
01327         assert(page.dataStatus == CachePage::DATA_READ);
01328         request.type = RandomAccessRequest::READ;
01329     }
01330     request.bindingList.push_back(page);
01331     bool rc = getDeviceAccessScheduler(*pDevice).schedule(request);
01332     if (!rc) {
01333         ioRetry();
01334     }
01335     return rc;
01336 }
01337 
01338 template <class PageT,class VictimPolicyT>
01339 CacheAllocator &CacheImpl<PageT,VictimPolicyT>
01340 ::getAllocator() const
01341 {
01342     return bufferAllocator;
01343 }
01344 
01345 template <class PageT,class VictimPolicyT>
01346 inline bool CacheImpl<PageT,VictimPolicyT>
01347 ::readPageAsync(PageT &page)
01348 {
01349     page.dataStatus = CachePage::DATA_READ;
01350     incrementStatsCounter(nPageReads);
01351     return transferPageAsync(page);
01352 }
01353 
01354 template <class PageT,class VictimPolicyT>
01355 inline bool CacheImpl<PageT,VictimPolicyT>
01356 ::writePageAsync(PageT &page)
01357 {
01358     assert(page.isDirty());
01359     if (page.pMappedPageListener) {
01360         assert(page.pMappedPageListener->canFlushPage(page));
01361         page.pMappedPageListener->notifyBeforePageFlush(page);
01362     }
01363     page.dataStatus = CachePage::DATA_WRITE;
01364     incrementStatsCounter(nPageWrites);
01365     if (!transferPageAsync(page)) {
01366         return false;
01367     } else {
01368         return true;
01369     }
01370 }
01371 
01372 template <class PageT,class VictimPolicyT>
01373 inline FileSize CacheImpl<PageT,VictimPolicyT>
01374 ::getPageOffset(BlockId const &blockId)
01375 {
01376     return ((FileSize) CompoundId::getBlockNum(blockId))
01377         * (FileSize) cbPage;
01378 }
01379 
01380 template <class PageT,class VictimPolicyT>
01381 inline PageBucket<PageT> &CacheImpl<PageT,VictimPolicyT>
01382 ::getHashBucket(BlockId const &blockId)
01383 {
01384     std::hash<BlockId> hasher;
01385     size_t hashCode = hasher(blockId);
01386     return *(pageTable[hashCode % pageTable.size()]);
01387 }
01388 
01389 template <class PageT,class VictimPolicyT>
01390 inline void CacheImpl<PageT,VictimPolicyT>
01391 ::assertCorrectBucket(PageBucketT &bucket,BlockId const &blockId)
01392 {
01393     assert(&bucket == &(getHashBucket(blockId)));
01394 }
01395 
01396 template <class PageT,class VictimPolicyT>
01397 inline void CacheImpl<PageT,VictimPolicyT>
01398 ::freePage(PageT &page)
01399 {
01400     SXMutexExclusiveGuard unmappedBucketGuard(unmappedBucket.mutex);
01401     unmappedBucket.pageList.push_back(page);
01402 }
01403 
01404 template <class PageT,class VictimPolicyT>
01405 inline bool CacheImpl<PageT,VictimPolicyT>
01406 ::canVictimizePage(PageT &page)
01407 {
01408     // NOTE:  the hasBlockId() check is to prevent us from trying to
01409     // victimize a page that is in transit between the free list and
01410     // a mapping; maybe such pages should have nReferences
01411     // non-zero instead?
01412     return page.hasBlockId()
01413         && !page.nReferences
01414         && !page.isTransferInProgress();
01415 }
01416 
01417 template <class PageT,class VictimPolicyT>
01418 inline void CacheImpl<PageT,VictimPolicyT>
01419 ::incrementCounter(AtomicCounter &x)
01420 {
01421     ++x;
01422 }
01423 
01424 template <class PageT,class VictimPolicyT>
01425 inline void CacheImpl<PageT,VictimPolicyT>
01426 ::decrementCounter(AtomicCounter &x)
01427 {
01428     --x;
01429 }
01430 
01431 template <class PageT,class VictimPolicyT>
01432 inline void CacheImpl<PageT,VictimPolicyT>
01433 ::incrementStatsCounter(AtomicCounter &x)
01434 {
01435     incrementCounter(x);
01436 }
01437 
01438 template <class PageT,class VictimPolicyT>
01439 inline void CacheImpl<PageT,VictimPolicyT>
01440 ::decrementStatsCounter(AtomicCounter &x)
01441 {
01442     decrementCounter(x);
01443 }
01444 
01445 FENNEL_END_NAMESPACE
01446 
01447 #endif
01448 
01449 // End CacheMethodsImpl.h

Generated on Mon Jun 22 04:00:13 2009 for Fennel by  doxygen 1.5.1