VersionedSegment.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/VersionedSegment.h"
00026 #include "fennel/segment/WALSegment.h"
00027 #include "fennel/segment/SegPageLock.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029 #include "fennel/cache/CachePage.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $");
00032 
00033 // NOTE:  read comments on struct StoredNode before modifying
00034 // VersionedPageFooter
00035 
00041 struct VersionedPageFooter
00042 {
00048     PageId dataPageId;
00049 
00053     SegVersionNum versionNumber;
00054 
00058     PseudoUuid onlineUuid;
00059 
00065     uint64_t checksum;
00066 };
00067 
00068 VersionedSegment::VersionedSegment(
00069     SharedSegment dataSegmentInit,
00070     SharedSegment logSegmentInit,
00071     PseudoUuid const &onlineUuidInit,
00072     SegVersionNum versionNumberInit)
00073     : DelegatingSegment(dataSegmentInit)
00074 {
00075     logSegment = logSegmentInit;
00076     pWALSegment = SegmentFactory::dynamicCast<WALSegment *>(logSegment);
00077     assert(pWALSegment);
00078 
00079     setUsablePageSize(
00080         DelegatingSegment::getUsablePageSize()
00081         - sizeof(VersionedPageFooter));
00082 
00083     onlineUuid = onlineUuidInit;
00084     versionNumber = versionNumberInit;
00085     oldestLogPageId = NULL_PAGE_ID;
00086     newestLogPageId = NULL_PAGE_ID;
00087     lastCheckpointLogPageId = NULL_PAGE_ID;
00088     inRecovery = false;
00089 }
00090 
00091 VersionedSegment::~VersionedSegment()
00092 {
00093     pWALSegment = NULL;
00094     assert(dataToLogMap.empty());
00095 }
00096 
00097 // TODO: comments on checkpoint concurrency requirements
00098 
00099 void VersionedSegment::delegatedCheckpoint(
00100     Segment &delegatingSegment,CheckpointType checkpointType)
00101 {
00102     if (checkpointType != CHECKPOINT_DISCARD) {
00103         // TODO:  for a fuzzy checkpoint, only need to force the log pages for
00104         // data pages that are going to be flushed
00105         logSegment->checkpoint(checkpointType);
00106         assert(pWALSegment->getMinDirtyPageId() == NULL_PAGE_ID);
00107     }
00108     if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00109         MappedPageListenerPredicate pagePredicate(delegatingSegment);
00110         fuzzyCheckpointSet.setDelegatePagePredicate(pagePredicate);
00111         pCache->checkpointPages(fuzzyCheckpointSet,checkpointType);
00112         fuzzyCheckpointSet.finishCheckpoint();
00113         if (lastCheckpointLogPageId != NULL_PAGE_ID) {
00114             oldestLogPageId = logSegment->getPageSuccessor(
00115                 lastCheckpointLogPageId);
00116         } else {
00117             oldestLogPageId = NULL_PAGE_ID;
00118         }
00119     } else {
00120         DelegatingSegment::delegatedCheckpoint(
00121             delegatingSegment,checkpointType);
00122         fuzzyCheckpointSet.clear();
00123         oldestLogPageId = NULL_PAGE_ID;
00124     }
00125 
00126     if (checkpointType == CHECKPOINT_DISCARD) {
00127         logSegment->checkpoint(checkpointType);
00128     }
00129 
00130     StrictMutexGuard mutexGuard(mutex);
00131     ++versionNumber;
00132     dataToLogMap.clear();
00133 }
00134 
00135 void VersionedSegment::deallocateCheckpointedLog(CheckpointType checkpointType)
00136 {
00137     if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00138         if (lastCheckpointLogPageId != NULL_PAGE_ID) {
00139             logSegment->deallocatePageRange(
00140                 NULL_PAGE_ID,lastCheckpointLogPageId);
00141             if (lastCheckpointLogPageId == newestLogPageId) {
00142                 newestLogPageId = NULL_PAGE_ID;
00143             }
00144         }
00145     } else {
00146         logSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID);
00147         newestLogPageId = NULL_PAGE_ID;
00148     }
00149     lastCheckpointLogPageId = newestLogPageId;
00150 }
00151 
00152 void VersionedSegment::deallocatePageRange(
00153     PageId startPageId,PageId endPageId)
00154 {
00155     // TODO:  support real truncations?
00156     assert(startPageId == endPageId);
00157     assert(startPageId != NULL_PAGE_ID);
00158 
00159     // TODO:  need to log copy of deallocated page
00160     DelegatingSegment::deallocatePageRange(startPageId,endPageId);
00161 }
00162 
00163 void VersionedSegment::notifyPageDirty(CachePage &page,bool bDataValid)
00164 {
00165     DelegatingSegment::notifyPageDirty(page,bDataValid);
00166 
00167     if (inRecovery) {
00168         // REVIEW jvs 8-Aug-2006: It would be nice to assert instead.  But we
00169         // can get here in online recovery when we replace pages which were
00170         // abandoned but not discarded.
00171         return;
00172     }
00173 
00174     VersionedPageFooter *pDataFooter = reinterpret_cast<VersionedPageFooter *>(
00175         getWritableFooter(page));
00176 
00177     if (!bDataValid) {
00178         // newly allocated page
00179         pDataFooter->dataPageId = NULL_PAGE_ID;
00180         pDataFooter->onlineUuid.generateInvalid();
00181         pDataFooter->versionNumber = versionNumber;
00182         pDataFooter->checksum = 0;
00183         return;
00184     }
00185 
00186     assert(pDataFooter->versionNumber <= versionNumber);
00187     if (pDataFooter->versionNumber == versionNumber) {
00188         // already logged this page
00189         return;
00190     }
00191 
00192     // write before-image to the log
00193     SegmentAccessor logSegmentAccessor(logSegment,pCache);
00194     SegPageLock logPageLock(logSegmentAccessor);
00195     PageId logPageId = logPageLock.allocatePage();
00196 
00197     // REVIEW:  what if there's other footer information to copy?
00198 
00199     // TODO:  remember logPageId in version map
00200     PBuffer pLogPageBuffer = logPageLock.getPage().getWritableData();
00201     memcpy(
00202         pLogPageBuffer,
00203         page.getReadableData(),
00204         getUsablePageSize());
00205     VersionedPageFooter *pLogFooter = reinterpret_cast<VersionedPageFooter *>(
00206         getWritableFooter(logPageLock.getPage()));
00207     pLogFooter->versionNumber = versionNumber - 1;
00208     pLogFooter->onlineUuid = onlineUuid;
00209     PageId dataPageId = DelegatingSegment::translateBlockId(
00210         page.getBlockId());
00211     pLogFooter->dataPageId = dataPageId;
00212 
00213     pLogFooter->checksum = computeChecksum(pLogPageBuffer);
00214 
00215     // record new version number for soon-to-be-modified data page
00216     pDataFooter->versionNumber = versionNumber;
00217 
00218     // tell the cache that the log page is a good candidate for victimization
00219     pCache->nicePage(logPageLock.getPage());
00220 
00221     StrictMutexGuard mutexGuard(mutex);
00222     dataToLogMap[dataPageId] = logPageId;
00223     if ((newestLogPageId == NULL_PAGE_ID) || (logPageId > newestLogPageId)) {
00224         newestLogPageId = logPageId;
00225     }
00226     if ((oldestLogPageId == NULL_PAGE_ID) || (logPageId < oldestLogPageId)) {
00227         oldestLogPageId = logPageId;
00228     }
00229 }
00230 
00231 SegVersionNum VersionedSegment::computeChecksum(void const *pPageData)
00232 {
00233     crcComputer.reset();
00234     crcComputer.process_bytes(pPageData,getUsablePageSize());
00235     return crcComputer.checksum();
00236 }
00237 
00238 bool VersionedSegment::canFlushPage(CachePage &page)
00239 {
00240     // this implements the WAL constraint
00241 
00242     PageId minLogPageId = pWALSegment->getMinDirtyPageId();
00243     if (minLogPageId == NULL_PAGE_ID) {
00244         return DelegatingSegment::canFlushPage(page);
00245     }
00246 
00247     StrictMutexGuard mutexGuard(mutex);
00248     PageId dataPageId = DelegatingSegment::translateBlockId(
00249         page.getBlockId());
00250     PageMapConstIter pLogPageId = dataToLogMap.find(dataPageId);
00251     if (pLogPageId == dataToLogMap.end()) {
00252         // newly allocated page
00253         return DelegatingSegment::canFlushPage(page);
00254     }
00255     PageId logPageId = pLogPageId->second;
00256     if (logPageId >= minLogPageId) {
00257         return false;
00258     }
00259     return DelegatingSegment::canFlushPage(page);
00260 }
00261 
00262 void VersionedSegment::prepareOnlineRecovery()
00263 {
00264     // For simplicity, force entire log out to disk first, but don't discard
00265     // it, since we're about to read it during recovery.
00266     logSegment->checkpoint(CHECKPOINT_FLUSH_ALL);
00267 
00268     StrictMutexGuard mutexGuard(mutex);
00269 
00270     dataToLogMap.clear();
00271     oldestLogPageId = NULL_PAGE_ID;
00272 }
00273 
00274 void VersionedSegment::recover(
00275     SharedSegment pDelegatingSegment,
00276     PageId firstLogPageId,
00277     SegVersionNum versionNumberInit,
00278     PseudoUuid const &onlineUuidInit)
00279 {
00280     onlineUuid = onlineUuidInit;
00281     recover(pDelegatingSegment, firstLogPageId, versionNumberInit);
00282 }
00283 
00284 void VersionedSegment::recover(
00285     SharedSegment pDelegatingSegment,
00286     PageId firstLogPageId,
00287     SegVersionNum versionNumberInit)
00288 {
00289     assert(dataToLogMap.empty());
00290     assert(pWALSegment->getMinDirtyPageId() == NULL_PAGE_ID);
00291 
00292     inRecovery = true;
00293 
00294     if (!isMAXU(versionNumberInit)) {
00295         versionNumber = versionNumberInit;
00296     }
00297 
00298     // The conventional thing to do is to scan forward to find the log end, and
00299     // then recover backwards, guaranteeing that earlier shadows replace later
00300     // shadows (in case of a fuzzy checkpoint).  Instead, we keep track of
00301     // which pages have already been recovered and skip them if they are
00302     // encountered again.
00303     std::hash_set<PageId> recoveredPageSet;
00304 
00305     // TODO:  use PageIters
00306 
00307     // TODO:  what about when one shadow log stores pages for multiple
00308     // VersionedSegments?
00309     SegmentAccessor logSegmentAccessor(logSegment,pCache);
00310     SegmentAccessor dataSegmentAccessor(pDelegatingSegment,pCache);
00311     for (; firstLogPageId != NULL_PAGE_ID;
00312          firstLogPageId = logSegment->getPageSuccessor(firstLogPageId))
00313     {
00314         SegPageLock logPageLock(logSegmentAccessor);
00315         logPageLock.lockShared(firstLogPageId);
00316         if (!logPageLock.getPage().isDataValid()) {
00317             break;
00318         }
00319         PConstBuffer pLogPageBuffer = logPageLock.getPage().getReadableData();
00320         VersionedPageFooter const *pLogFooter =
00321             reinterpret_cast<VersionedPageFooter const *>(
00322                 getReadableFooter(logPageLock.getPage()));
00323         if (pLogFooter->checksum != computeChecksum(pLogPageBuffer)) {
00324             break;
00325         }
00326         if (pLogFooter->onlineUuid != onlineUuid) {
00327             break;
00328         }
00329         assert(pLogFooter->versionNumber < (versionNumber + 2));
00330         if (pLogFooter->versionNumber < versionNumber) {
00331             continue;
00332         }
00333         if (recoveredPageSet.find(pLogFooter->dataPageId)
00334             != recoveredPageSet.end())
00335         {
00336             assert(pLogFooter->versionNumber > versionNumber);
00337             continue;
00338         }
00339 
00340         SegPageLock dataPageLock(dataSegmentAccessor);
00341         dataPageLock.lockExclusive(pLogFooter->dataPageId);
00342         memcpy(
00343             dataPageLock.getPage().getWritableData(),
00344             pLogPageBuffer,
00345             getFullPageSize());
00346         recoveredPageSet.insert(pLogFooter->dataPageId);
00347     }
00348 
00349     inRecovery = false;
00350 }
00351 
00352 SegVersionNum VersionedSegment::getPageVersion(CachePage &page)
00353 {
00354     VersionedPageFooter const *pFooter =
00355         reinterpret_cast<VersionedPageFooter const *>(
00356             getReadableFooter(page));
00357     return pFooter->versionNumber;
00358 }
00359 
00360 SegVersionNum VersionedSegment::getVersionNumber() const
00361 {
00362     return versionNumber;
00363 }
00364 
00365 SharedSegment VersionedSegment::getLogSegment() const
00366 {
00367     return logSegment;
00368 }
00369 
00370 PageId VersionedSegment::getOnlineRecoveryPageId() const
00371 {
00372     return oldestLogPageId;
00373 }
00374 
00375 PageId VersionedSegment::getRecoveryPageId() const
00376 {
00377     if (oldestLogPageId == NULL_PAGE_ID) {
00378         // if we've truncated the log, then recovery should start from the
00379         // first new shadow page after a crash
00380         return FIRST_LINEAR_PAGE_ID;
00381     } else {
00382         return oldestLogPageId;
00383     }
00384 }
00385 
00386 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $");
00387 
00388 // End VersionedSegment.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1