00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/VersionedSegment.h"
00026 #include "fennel/segment/WALSegment.h"
00027 #include "fennel/segment/SegPageLock.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029 #include "fennel/cache/CachePage.h"
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $");
00032
00033
00034
00035
00041 struct VersionedPageFooter
00042 {
00048 PageId dataPageId;
00049
00053 SegVersionNum versionNumber;
00054
00058 PseudoUuid onlineUuid;
00059
00065 uint64_t checksum;
00066 };
00067
00068 VersionedSegment::VersionedSegment(
00069 SharedSegment dataSegmentInit,
00070 SharedSegment logSegmentInit,
00071 PseudoUuid const &onlineUuidInit,
00072 SegVersionNum versionNumberInit)
00073 : DelegatingSegment(dataSegmentInit)
00074 {
00075 logSegment = logSegmentInit;
00076 pWALSegment = SegmentFactory::dynamicCast<WALSegment *>(logSegment);
00077 assert(pWALSegment);
00078
00079 setUsablePageSize(
00080 DelegatingSegment::getUsablePageSize()
00081 - sizeof(VersionedPageFooter));
00082
00083 onlineUuid = onlineUuidInit;
00084 versionNumber = versionNumberInit;
00085 oldestLogPageId = NULL_PAGE_ID;
00086 newestLogPageId = NULL_PAGE_ID;
00087 lastCheckpointLogPageId = NULL_PAGE_ID;
00088 inRecovery = false;
00089 }
00090
00091 VersionedSegment::~VersionedSegment()
00092 {
00093 pWALSegment = NULL;
00094 assert(dataToLogMap.empty());
00095 }
00096
00097
00098
00099 void VersionedSegment::delegatedCheckpoint(
00100 Segment &delegatingSegment,CheckpointType checkpointType)
00101 {
00102 if (checkpointType != CHECKPOINT_DISCARD) {
00103
00104
00105 logSegment->checkpoint(checkpointType);
00106 assert(pWALSegment->getMinDirtyPageId() == NULL_PAGE_ID);
00107 }
00108 if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00109 MappedPageListenerPredicate pagePredicate(delegatingSegment);
00110 fuzzyCheckpointSet.setDelegatePagePredicate(pagePredicate);
00111 pCache->checkpointPages(fuzzyCheckpointSet,checkpointType);
00112 fuzzyCheckpointSet.finishCheckpoint();
00113 if (lastCheckpointLogPageId != NULL_PAGE_ID) {
00114 oldestLogPageId = logSegment->getPageSuccessor(
00115 lastCheckpointLogPageId);
00116 } else {
00117 oldestLogPageId = NULL_PAGE_ID;
00118 }
00119 } else {
00120 DelegatingSegment::delegatedCheckpoint(
00121 delegatingSegment,checkpointType);
00122 fuzzyCheckpointSet.clear();
00123 oldestLogPageId = NULL_PAGE_ID;
00124 }
00125
00126 if (checkpointType == CHECKPOINT_DISCARD) {
00127 logSegment->checkpoint(checkpointType);
00128 }
00129
00130 StrictMutexGuard mutexGuard(mutex);
00131 ++versionNumber;
00132 dataToLogMap.clear();
00133 }
00134
00135 void VersionedSegment::deallocateCheckpointedLog(CheckpointType checkpointType)
00136 {
00137 if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00138 if (lastCheckpointLogPageId != NULL_PAGE_ID) {
00139 logSegment->deallocatePageRange(
00140 NULL_PAGE_ID,lastCheckpointLogPageId);
00141 if (lastCheckpointLogPageId == newestLogPageId) {
00142 newestLogPageId = NULL_PAGE_ID;
00143 }
00144 }
00145 } else {
00146 logSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID);
00147 newestLogPageId = NULL_PAGE_ID;
00148 }
00149 lastCheckpointLogPageId = newestLogPageId;
00150 }
00151
00152 void VersionedSegment::deallocatePageRange(
00153 PageId startPageId,PageId endPageId)
00154 {
00155
00156 assert(startPageId == endPageId);
00157 assert(startPageId != NULL_PAGE_ID);
00158
00159
00160 DelegatingSegment::deallocatePageRange(startPageId,endPageId);
00161 }
00162
00163 void VersionedSegment::notifyPageDirty(CachePage &page,bool bDataValid)
00164 {
00165 DelegatingSegment::notifyPageDirty(page,bDataValid);
00166
00167 if (inRecovery) {
00168
00169
00170
00171 return;
00172 }
00173
00174 VersionedPageFooter *pDataFooter = reinterpret_cast<VersionedPageFooter *>(
00175 getWritableFooter(page));
00176
00177 if (!bDataValid) {
00178
00179 pDataFooter->dataPageId = NULL_PAGE_ID;
00180 pDataFooter->onlineUuid.generateInvalid();
00181 pDataFooter->versionNumber = versionNumber;
00182 pDataFooter->checksum = 0;
00183 return;
00184 }
00185
00186 assert(pDataFooter->versionNumber <= versionNumber);
00187 if (pDataFooter->versionNumber == versionNumber) {
00188
00189 return;
00190 }
00191
00192
00193 SegmentAccessor logSegmentAccessor(logSegment,pCache);
00194 SegPageLock logPageLock(logSegmentAccessor);
00195 PageId logPageId = logPageLock.allocatePage();
00196
00197
00198
00199
00200 PBuffer pLogPageBuffer = logPageLock.getPage().getWritableData();
00201 memcpy(
00202 pLogPageBuffer,
00203 page.getReadableData(),
00204 getUsablePageSize());
00205 VersionedPageFooter *pLogFooter = reinterpret_cast<VersionedPageFooter *>(
00206 getWritableFooter(logPageLock.getPage()));
00207 pLogFooter->versionNumber = versionNumber - 1;
00208 pLogFooter->onlineUuid = onlineUuid;
00209 PageId dataPageId = DelegatingSegment::translateBlockId(
00210 page.getBlockId());
00211 pLogFooter->dataPageId = dataPageId;
00212
00213 pLogFooter->checksum = computeChecksum(pLogPageBuffer);
00214
00215
00216 pDataFooter->versionNumber = versionNumber;
00217
00218
00219 pCache->nicePage(logPageLock.getPage());
00220
00221 StrictMutexGuard mutexGuard(mutex);
00222 dataToLogMap[dataPageId] = logPageId;
00223 if ((newestLogPageId == NULL_PAGE_ID) || (logPageId > newestLogPageId)) {
00224 newestLogPageId = logPageId;
00225 }
00226 if ((oldestLogPageId == NULL_PAGE_ID) || (logPageId < oldestLogPageId)) {
00227 oldestLogPageId = logPageId;
00228 }
00229 }
00230
00231 SegVersionNum VersionedSegment::computeChecksum(void const *pPageData)
00232 {
00233 crcComputer.reset();
00234 crcComputer.process_bytes(pPageData,getUsablePageSize());
00235 return crcComputer.checksum();
00236 }
00237
00238 bool VersionedSegment::canFlushPage(CachePage &page)
00239 {
00240
00241
00242 PageId minLogPageId = pWALSegment->getMinDirtyPageId();
00243 if (minLogPageId == NULL_PAGE_ID) {
00244 return DelegatingSegment::canFlushPage(page);
00245 }
00246
00247 StrictMutexGuard mutexGuard(mutex);
00248 PageId dataPageId = DelegatingSegment::translateBlockId(
00249 page.getBlockId());
00250 PageMapConstIter pLogPageId = dataToLogMap.find(dataPageId);
00251 if (pLogPageId == dataToLogMap.end()) {
00252
00253 return DelegatingSegment::canFlushPage(page);
00254 }
00255 PageId logPageId = pLogPageId->second;
00256 if (logPageId >= minLogPageId) {
00257 return false;
00258 }
00259 return DelegatingSegment::canFlushPage(page);
00260 }
00261
00262 void VersionedSegment::prepareOnlineRecovery()
00263 {
00264
00265
00266 logSegment->checkpoint(CHECKPOINT_FLUSH_ALL);
00267
00268 StrictMutexGuard mutexGuard(mutex);
00269
00270 dataToLogMap.clear();
00271 oldestLogPageId = NULL_PAGE_ID;
00272 }
00273
00274 void VersionedSegment::recover(
00275 SharedSegment pDelegatingSegment,
00276 PageId firstLogPageId,
00277 SegVersionNum versionNumberInit,
00278 PseudoUuid const &onlineUuidInit)
00279 {
00280 onlineUuid = onlineUuidInit;
00281 recover(pDelegatingSegment, firstLogPageId, versionNumberInit);
00282 }
00283
00284 void VersionedSegment::recover(
00285 SharedSegment pDelegatingSegment,
00286 PageId firstLogPageId,
00287 SegVersionNum versionNumberInit)
00288 {
00289 assert(dataToLogMap.empty());
00290 assert(pWALSegment->getMinDirtyPageId() == NULL_PAGE_ID);
00291
00292 inRecovery = true;
00293
00294 if (!isMAXU(versionNumberInit)) {
00295 versionNumber = versionNumberInit;
00296 }
00297
00298
00299
00300
00301
00302
00303 std::hash_set<PageId> recoveredPageSet;
00304
00305
00306
00307
00308
00309 SegmentAccessor logSegmentAccessor(logSegment,pCache);
00310 SegmentAccessor dataSegmentAccessor(pDelegatingSegment,pCache);
00311 for (; firstLogPageId != NULL_PAGE_ID;
00312 firstLogPageId = logSegment->getPageSuccessor(firstLogPageId))
00313 {
00314 SegPageLock logPageLock(logSegmentAccessor);
00315 logPageLock.lockShared(firstLogPageId);
00316 if (!logPageLock.getPage().isDataValid()) {
00317 break;
00318 }
00319 PConstBuffer pLogPageBuffer = logPageLock.getPage().getReadableData();
00320 VersionedPageFooter const *pLogFooter =
00321 reinterpret_cast<VersionedPageFooter const *>(
00322 getReadableFooter(logPageLock.getPage()));
00323 if (pLogFooter->checksum != computeChecksum(pLogPageBuffer)) {
00324 break;
00325 }
00326 if (pLogFooter->onlineUuid != onlineUuid) {
00327 break;
00328 }
00329 assert(pLogFooter->versionNumber < (versionNumber + 2));
00330 if (pLogFooter->versionNumber < versionNumber) {
00331 continue;
00332 }
00333 if (recoveredPageSet.find(pLogFooter->dataPageId)
00334 != recoveredPageSet.end())
00335 {
00336 assert(pLogFooter->versionNumber > versionNumber);
00337 continue;
00338 }
00339
00340 SegPageLock dataPageLock(dataSegmentAccessor);
00341 dataPageLock.lockExclusive(pLogFooter->dataPageId);
00342 memcpy(
00343 dataPageLock.getPage().getWritableData(),
00344 pLogPageBuffer,
00345 getFullPageSize());
00346 recoveredPageSet.insert(pLogFooter->dataPageId);
00347 }
00348
00349 inRecovery = false;
00350 }
00351
00352 SegVersionNum VersionedSegment::getPageVersion(CachePage &page)
00353 {
00354 VersionedPageFooter const *pFooter =
00355 reinterpret_cast<VersionedPageFooter const *>(
00356 getReadableFooter(page));
00357 return pFooter->versionNumber;
00358 }
00359
00360 SegVersionNum VersionedSegment::getVersionNumber() const
00361 {
00362 return versionNumber;
00363 }
00364
00365 SharedSegment VersionedSegment::getLogSegment() const
00366 {
00367 return logSegment;
00368 }
00369
00370 PageId VersionedSegment::getOnlineRecoveryPageId() const
00371 {
00372 return oldestLogPageId;
00373 }
00374
00375 PageId VersionedSegment::getRecoveryPageId() const
00376 {
00377 if (oldestLogPageId == NULL_PAGE_ID) {
00378
00379
00380 return FIRST_LINEAR_PAGE_ID;
00381 } else {
00382 return oldestLogPageId;
00383 }
00384 }
00385
00386 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $");
00387
00388