SegPageBackupRestoreDevice.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/common/FennelResource.h"
00026 #include "fennel/common/FileSystem.h"
00027 #include "fennel/segment/SegPageBackupRestoreDevice.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $");
00031 
00032 void BackupRestorePage::setParent(WeakSegPageBackupRestoreDevice pParentInit)
00033 {
00034     pParent = pParentInit;
00035 }
00036 
00037 PBuffer BackupRestorePage::getBuffer() const
00038 {
00039     return pBuffer;
00040 }
00041 
00042 void BackupRestorePage::setBufferSize(uint bufferSizeInit)
00043 {
00044     bufferSize = bufferSizeInit;
00045 }
00046 
00047 uint BackupRestorePage::getBufferSize() const
00048 {
00049     return bufferSize;
00050 }
00051 
00052 void BackupRestorePage::setBuffer(PBuffer pBufferInit)
00053 {
00054     pBuffer = pBufferInit;
00055 }
00056 
00057 BlockNum BackupRestorePage::getPageCounter()
00058 {
00059     return pageCounter;
00060 }
00061 
00062 void BackupRestorePage::setPageCounter(BlockNum counter)
00063 {
00064     pageCounter = counter;
00065 }
00066 
00067 void BackupRestorePage::setReadRequest(bool isReadInit)
00068 {
00069     isRead = isReadInit;
00070 }
00071 
00072 void BackupRestorePage::notifyTransferCompletion(bool bSuccess)
00073 {
00074     SharedSegPageBackupRestoreDevice sharedPtr = pParent.lock();
00075     StrictMutexGuard mutexGuard(sharedPtr->getMutex());
00076 
00077     if (isRead) {
00078         sharedPtr->notifyReadTransferCompletion(*this, bSuccess);
00079     } else {
00080         sharedPtr->notifyWriteTransferCompletion(*this, bSuccess);
00081     }
00082 
00083     // release the reference to the shared pointer while we're still holding
00084     // the parent mutex
00085     sharedPtr.reset();
00086 }
00087 
00088 SharedSegPageBackupRestoreDevice
00089 SegPageBackupRestoreDevice::newSegPageBackupRestoreDevice(
00090     const std::string &backupFilePathname,
00091     const char *mode,
00092     const std::string &compressionProgram,
00093     uint nScratchPages,
00094     uint nReservedPages,
00095     SegmentAccessor &scratchAccessor,
00096     DeviceAccessScheduler &scheduler,
00097     SharedRandomAccessDevice pDataDevice)
00098 {
00099     SharedSegPageBackupRestoreDevice pDevice =
00100         SharedSegPageBackupRestoreDevice(
00101             new SegPageBackupRestoreDevice(
00102                 backupFilePathname,
00103                 mode,
00104                 compressionProgram,
00105                 nScratchPages,
00106                 nReservedPages,
00107                 scratchAccessor,
00108                 scheduler,
00109                 pDataDevice),
00110             ClosableObjectDestructor());
00111     pDevice->init();
00112     return pDevice;
00113 }
00114 
00115 SegPageBackupRestoreDevice::SegPageBackupRestoreDevice(
00116     const std::string &backupFilePathnameInit,
00117     const char *modeInit,
00118     const std::string &compressionProgramInit,
00119     uint nScratchPagesInit,
00120     uint nReservedPagesInit,
00121     SegmentAccessor &scratchAccessorInit,
00122     DeviceAccessScheduler &schedulerInit,
00123     SharedRandomAccessDevice pDataDeviceInit) :
00124     backupFilePathname(backupFilePathnameInit),
00125     nReservedPages(nReservedPagesInit),
00126     scratchAccessor(scratchAccessorInit),
00127     scheduler(schedulerInit),
00128     pDataDevice(pDataDeviceInit)
00129 {
00130     mode = (char *) modeInit;
00131     setCompressionProgramPathname(compressionProgramInit);
00132     nScratchPages = nScratchPagesInit;
00133     currPageReadCount = 0;
00134     currPageWriteCount = 0;
00135     backupFile = NULL;
00136 }
00137 
00138 void SegPageBackupRestoreDevice::init()
00139 {
00140     if (compressionProgram.length() == 0) {
00141         backupFile = fopen(backupFilePathname.c_str(), mode);
00142         isCompressed = false;
00143     } else {
00144 #ifdef __MSVC__
00145         throw FennelExcn(
00146             FennelResource::instance().unsupportedOperation("popen"));
00147 #else
00148         std::ostringstream cmd;
00149         if (mode[0] == 'r') {
00150             cmd << compressionProgram.c_str() << " -dc "
00151                 << backupFilePathname.c_str();
00152         } else {
00153             cmd << compressionProgram.c_str() << " > "
00154                 << backupFilePathname.c_str();
00155         }
00156         backupFile = popen(cmd.str().c_str(), mode);
00157         isCompressed = true;
00158 #endif
00159     }
00160 
00161     if (backupFile == NULL) {
00162         throw SysCallExcn(
00163             FennelResource::instance().openBackupFileFailed(
00164                 backupFilePathname));
00165     }
00166 
00167     scratchLock.accessSegment(scratchAccessor);
00168     pageSize = scratchAccessor.pSegment->getUsablePageSize();
00169     initScratchPages(scratchLock, nScratchPages, nReservedPages, pageSize);
00170 }
00171 
00172 void SegPageBackupRestoreDevice::setCompressionProgramPathname(
00173     const std::string &programName)
00174 {
00175     if (programName.length() == 0) {
00176         compressionProgram = "";
00177     } else {
00178         std::string path = "/bin/";
00179         compressionProgram = path + programName;
00180         if (!FileSystem::doesFileExist(compressionProgram.c_str())) {
00181             path = "/usr/bin/";
00182             compressionProgram = path + programName;
00183             if (!FileSystem::doesFileExist(compressionProgram.c_str())) {
00184                 compressionProgram = programName;
00185             }
00186         }
00187     }
00188 }
00189 
00190 void SegPageBackupRestoreDevice::initScratchPages(
00191     SegPageLock &scratchLock,
00192     uint nScratchPages,
00193     uint nReservedPages,
00194     uint bufferSize)
00195 {
00196     StrictMutexGuard mutexGuard(mutex);
00197 
00198     // Allocate an array of requests and associate a scratch page with each
00199     // one.  Initialize the free scratch page queue with these requests.
00200     backupRestorePages.reset(new BackupRestorePage[nScratchPages]);
00201     for (uint i = 0; i < nScratchPages; i++) {
00202         scratchLock.allocatePage();
00203         backupRestorePages[i].setParent(
00204             WeakSegPageBackupRestoreDevice(shared_from_this()));
00205         backupRestorePages[i].setBufferSize(pageSize);
00206         backupRestorePages[i].setBuffer(
00207             scratchLock.getPage().getWritableData());
00208         freeScratchPageQueue.push_back(&backupRestorePages[i]);
00209     }
00210 
00211     reservedPages.reset(new PBuffer[nReservedPages]);
00212     for (uint i = 0; i < nReservedPages; i++) {
00213         scratchLock.allocatePage();
00214         reservedPages[i] = scratchLock.getPage().getWritableData();
00215     }
00216 }
00217 
00218 PBuffer SegPageBackupRestoreDevice::getReservedBufferPage()
00219 {
00220     if (nReservedPages == 0) {
00221         assert(false);
00222     }
00223     return reservedPages[--nReservedPages];
00224 }
00225 
00226 void SegPageBackupRestoreDevice::writeBackupPage(PConstBuffer pageBuffer)
00227 {
00228     writeBackupPage(pageBuffer, false);
00229 }
00230 
00231 void SegPageBackupRestoreDevice::writeBackupPage(
00232     PConstBuffer pageBuffer,
00233     bool scheduledWrite)
00234 {
00235     size_t pagesWritten = fwrite(pageBuffer, pageSize, 1, backupFile);
00236     if (pagesWritten < 1) {
00237         if (!scheduledWrite) {
00238             throw SysCallExcn(
00239                 FennelResource::instance().writeBackupFileFailed(
00240                     backupFilePathname));
00241         } else if (!pPendingExcn) {
00242             // For scheduled writes, indicate that an exception has occurred
00243             // so the calling thread can return the exception, unless there
00244             // already is a pending exception.
00245             pPendingExcn.reset(
00246                 new SysCallExcn(
00247                     FennelResource::instance().writeBackupFileFailed(
00248                         backupFilePathname)));
00249         }
00250     }
00251 }
00252 
00253 void SegPageBackupRestoreDevice::backupPage(BlockId blockId)
00254 {
00255     // This only schedules the read request.  Later when the read request
00256     // has been met, the write will be done in notifyReadTransferCompletion.
00257 
00258     // First, find a free scratch page
00259     BackupRestorePage *pScratchPage = getFreeScratchPage();
00260 
00261     pScratchPage->setPageCounter(currPageReadCount++);
00262     RandomAccessRequest request;
00263     request.pDevice = pDataDevice.get();
00264     request.cbOffset = (FileSize) pageSize * CompoundId::getBlockNum(blockId);
00265     request.cbTransfer = pageSize;
00266     request.type = RandomAccessRequest::READ;
00267     pScratchPage->setReadRequest(true);
00268     request.bindingList.push_back(*pScratchPage);
00269     scheduler.schedule(request);
00270 }
00271 
00272 BackupRestorePage *SegPageBackupRestoreDevice::getFreeScratchPage()
00273 {
00274     while (true) {
00275         StrictMutexGuard mutexGuard(mutex);
00276         if (!freeScratchPageQueue.empty()) {
00277             BackupRestorePage *freePage = freeScratchPageQueue.back();
00278             freeScratchPageQueue.pop_back();
00279             return freePage;
00280         }
00281 
00282         // If no page is available, wait for one to become available (with
00283         // timeout just in case)
00284         boost::xtime atv;
00285         convertTimeout(100, atv);
00286         freeScratchPageCondition.timed_wait(mutexGuard, atv);
00287         checkPendingException();
00288     }
00289 }
00290 
00291 void SegPageBackupRestoreDevice::notifyReadTransferCompletion(
00292     BackupRestorePage &scratchPage,
00293     bool bSuccess)
00294 {
00295     if (!bSuccess) {
00296         if (!pPendingExcn) {
00297             pPendingExcn.reset(
00298                 new SysCallExcn(
00299                     FennelResource::instance().readDataPageFailed()));
00300         }
00301         return;
00302     }
00303 
00304 
00305     // If the page read is the next one to be written, write it out to the
00306     // backup file.  Otherwise, add it to the map of pending pages to
00307     // be written out.
00308     if (scratchPage.getPageCounter() == currPageWriteCount) {
00309         writeBackupPage(scratchPage.getBuffer(), true);
00310         freeScratchPage(scratchPage);
00311     } else {
00312         pendingWriteMap[scratchPage.getPageCounter()] = &scratchPage;
00313     }
00314 
00315     // Go back and see if the next set of pages that need to be written are
00316     // already in the pending write queue.  If they are, write them out.
00317     while (true) {
00318         PendingWriteMapIter iter = pendingWriteMap.find(currPageWriteCount);
00319         if (iter == pendingWriteMap.end()) {
00320             break;
00321         }
00322         BackupRestorePage *nextPage = iter->second;
00323         pendingWriteMap.erase(currPageWriteCount);
00324         writeBackupPage(nextPage->getBuffer(), true);
00325         freeScratchPage(*nextPage);
00326         // freeScratchPage increments currPageWriteCount, so loop to see
00327         // if that next page is available to be written
00328     }
00329 }
00330 
00331 void SegPageBackupRestoreDevice::freeScratchPage(BackupRestorePage &scratchPage)
00332 {
00333     ++currPageWriteCount;
00334     freeScratchPageQueue.push_back(&scratchPage);
00335     freeScratchPageCondition.notify_all();
00336 }
00337 
00338 void SegPageBackupRestoreDevice::waitForPendingWrites()
00339 {
00340     StrictMutexGuard mutexGuard(mutex);
00341     while (currPageWriteCount < currPageReadCount) {
00342         freeScratchPageCondition.wait(mutexGuard);
00343         checkPendingException();
00344     }
00345 
00346     checkPendingException();
00347 }
00348 
00349 void SegPageBackupRestoreDevice::restorePage(BlockId blockId)
00350 {
00351     // Get a free scratch buffer to read the page from the backup file
00352     BackupRestorePage *pScratchPage = getFreeScratchPage();
00353 
00354     size_t pagesRead =
00355         fread(pScratchPage->getBuffer(), pageSize, 1, backupFile);
00356     if (pagesRead < 1) {
00357         throw SysCallExcn(
00358             FennelResource::instance().readBackupFileFailed(
00359                 backupFilePathname));
00360     }
00361 
00362     // Schedule a request to write it
00363     pScratchPage->setPageCounter(currPageReadCount++);
00364     RandomAccessRequest request;
00365     request.pDevice = pDataDevice.get();
00366     request.cbOffset = (FileSize) pageSize * CompoundId::getBlockNum(blockId);
00367     request.cbTransfer = pageSize;
00368     request.type = RandomAccessRequest::WRITE;
00369     pScratchPage->setReadRequest(false);
00370     request.bindingList.push_back(*pScratchPage);
00371     scheduler.schedule(request);
00372 }
00373 
00374 void SegPageBackupRestoreDevice::notifyWriteTransferCompletion(
00375     BackupRestorePage &scratchPage,
00376     bool bSuccess)
00377 {
00378     if (!bSuccess) {
00379         if (!pPendingExcn) {
00380             pPendingExcn.reset(
00381                 new SysCallExcn(
00382                     FennelResource::instance().writeDataPageFailed()));
00383         }
00384         return;
00385     }
00386 
00387     freeScratchPage(scratchPage);
00388 }
00389 
00390 void SegPageBackupRestoreDevice::checkPendingException()
00391 {
00392     if (pPendingExcn) {
00393         pPendingExcn->throwSelf();
00394     }
00395 }
00396 
00397 void SegPageBackupRestoreDevice::closeImpl()
00398 {
00399     // Wait for any pending writes to complete before freeing up structures
00400     // because those writes were issued as async requests and their
00401     // notifications will need to come back to this object.
00402     waitForPendingWrites();
00403 
00404     StrictMutexGuard mutexGuard(mutex);
00405     if (scratchAccessor.pSegment) {
00406         scratchAccessor.pSegment->deallocatePageRange(
00407             NULL_PAGE_ID, NULL_PAGE_ID);
00408     }
00409     backupRestorePages.reset();
00410     reservedPages.reset();
00411     freeScratchPageQueue.clear();
00412     if (backupFile != NULL) {
00413         if (isCompressed) {
00414 #ifdef __MSVC__
00415         throw FennelExcn(
00416             FennelResource::instance().unsupportedOperation("pclose"));
00417 #else
00418             pclose(backupFile);
00419 #endif
00420         } else {
00421             fclose(backupFile);
00422         }
00423         backupFile = NULL;
00424     }
00425 }
00426 
00427 StrictMutex &SegPageBackupRestoreDevice::getMutex()
00428 {
00429     return mutex;
00430 }
00431 
00432 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $");
00433 
00434 // End SegPageBackupRestoreDevice.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1