00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/common/FennelResource.h"
00026 #include "fennel/common/FileSystem.h"
00027 #include "fennel/segment/SegPageBackupRestoreDevice.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $");
00031
00032 void BackupRestorePage::setParent(WeakSegPageBackupRestoreDevice pParentInit)
00033 {
00034 pParent = pParentInit;
00035 }
00036
00037 PBuffer BackupRestorePage::getBuffer() const
00038 {
00039 return pBuffer;
00040 }
00041
00042 void BackupRestorePage::setBufferSize(uint bufferSizeInit)
00043 {
00044 bufferSize = bufferSizeInit;
00045 }
00046
00047 uint BackupRestorePage::getBufferSize() const
00048 {
00049 return bufferSize;
00050 }
00051
00052 void BackupRestorePage::setBuffer(PBuffer pBufferInit)
00053 {
00054 pBuffer = pBufferInit;
00055 }
00056
00057 BlockNum BackupRestorePage::getPageCounter()
00058 {
00059 return pageCounter;
00060 }
00061
00062 void BackupRestorePage::setPageCounter(BlockNum counter)
00063 {
00064 pageCounter = counter;
00065 }
00066
00067 void BackupRestorePage::setReadRequest(bool isReadInit)
00068 {
00069 isRead = isReadInit;
00070 }
00071
00072 void BackupRestorePage::notifyTransferCompletion(bool bSuccess)
00073 {
00074 SharedSegPageBackupRestoreDevice sharedPtr = pParent.lock();
00075 StrictMutexGuard mutexGuard(sharedPtr->getMutex());
00076
00077 if (isRead) {
00078 sharedPtr->notifyReadTransferCompletion(*this, bSuccess);
00079 } else {
00080 sharedPtr->notifyWriteTransferCompletion(*this, bSuccess);
00081 }
00082
00083
00084
00085 sharedPtr.reset();
00086 }
00087
00088 SharedSegPageBackupRestoreDevice
00089 SegPageBackupRestoreDevice::newSegPageBackupRestoreDevice(
00090 const std::string &backupFilePathname,
00091 const char *mode,
00092 const std::string &compressionProgram,
00093 uint nScratchPages,
00094 uint nReservedPages,
00095 SegmentAccessor &scratchAccessor,
00096 DeviceAccessScheduler &scheduler,
00097 SharedRandomAccessDevice pDataDevice)
00098 {
00099 SharedSegPageBackupRestoreDevice pDevice =
00100 SharedSegPageBackupRestoreDevice(
00101 new SegPageBackupRestoreDevice(
00102 backupFilePathname,
00103 mode,
00104 compressionProgram,
00105 nScratchPages,
00106 nReservedPages,
00107 scratchAccessor,
00108 scheduler,
00109 pDataDevice),
00110 ClosableObjectDestructor());
00111 pDevice->init();
00112 return pDevice;
00113 }
00114
00115 SegPageBackupRestoreDevice::SegPageBackupRestoreDevice(
00116 const std::string &backupFilePathnameInit,
00117 const char *modeInit,
00118 const std::string &compressionProgramInit,
00119 uint nScratchPagesInit,
00120 uint nReservedPagesInit,
00121 SegmentAccessor &scratchAccessorInit,
00122 DeviceAccessScheduler &schedulerInit,
00123 SharedRandomAccessDevice pDataDeviceInit) :
00124 backupFilePathname(backupFilePathnameInit),
00125 nReservedPages(nReservedPagesInit),
00126 scratchAccessor(scratchAccessorInit),
00127 scheduler(schedulerInit),
00128 pDataDevice(pDataDeviceInit)
00129 {
00130 mode = (char *) modeInit;
00131 setCompressionProgramPathname(compressionProgramInit);
00132 nScratchPages = nScratchPagesInit;
00133 currPageReadCount = 0;
00134 currPageWriteCount = 0;
00135 backupFile = NULL;
00136 }
00137
00138 void SegPageBackupRestoreDevice::init()
00139 {
00140 if (compressionProgram.length() == 0) {
00141 backupFile = fopen(backupFilePathname.c_str(), mode);
00142 isCompressed = false;
00143 } else {
00144 #ifdef __MSVC__
00145 throw FennelExcn(
00146 FennelResource::instance().unsupportedOperation("popen"));
00147 #else
00148 std::ostringstream cmd;
00149 if (mode[0] == 'r') {
00150 cmd << compressionProgram.c_str() << " -dc "
00151 << backupFilePathname.c_str();
00152 } else {
00153 cmd << compressionProgram.c_str() << " > "
00154 << backupFilePathname.c_str();
00155 }
00156 backupFile = popen(cmd.str().c_str(), mode);
00157 isCompressed = true;
00158 #endif
00159 }
00160
00161 if (backupFile == NULL) {
00162 throw SysCallExcn(
00163 FennelResource::instance().openBackupFileFailed(
00164 backupFilePathname));
00165 }
00166
00167 scratchLock.accessSegment(scratchAccessor);
00168 pageSize = scratchAccessor.pSegment->getUsablePageSize();
00169 initScratchPages(scratchLock, nScratchPages, nReservedPages, pageSize);
00170 }
00171
00172 void SegPageBackupRestoreDevice::setCompressionProgramPathname(
00173 const std::string &programName)
00174 {
00175 if (programName.length() == 0) {
00176 compressionProgram = "";
00177 } else {
00178 std::string path = "/bin/";
00179 compressionProgram = path + programName;
00180 if (!FileSystem::doesFileExist(compressionProgram.c_str())) {
00181 path = "/usr/bin/";
00182 compressionProgram = path + programName;
00183 if (!FileSystem::doesFileExist(compressionProgram.c_str())) {
00184 compressionProgram = programName;
00185 }
00186 }
00187 }
00188 }
00189
00190 void SegPageBackupRestoreDevice::initScratchPages(
00191 SegPageLock &scratchLock,
00192 uint nScratchPages,
00193 uint nReservedPages,
00194 uint bufferSize)
00195 {
00196 StrictMutexGuard mutexGuard(mutex);
00197
00198
00199
00200 backupRestorePages.reset(new BackupRestorePage[nScratchPages]);
00201 for (uint i = 0; i < nScratchPages; i++) {
00202 scratchLock.allocatePage();
00203 backupRestorePages[i].setParent(
00204 WeakSegPageBackupRestoreDevice(shared_from_this()));
00205 backupRestorePages[i].setBufferSize(pageSize);
00206 backupRestorePages[i].setBuffer(
00207 scratchLock.getPage().getWritableData());
00208 freeScratchPageQueue.push_back(&backupRestorePages[i]);
00209 }
00210
00211 reservedPages.reset(new PBuffer[nReservedPages]);
00212 for (uint i = 0; i < nReservedPages; i++) {
00213 scratchLock.allocatePage();
00214 reservedPages[i] = scratchLock.getPage().getWritableData();
00215 }
00216 }
00217
00218 PBuffer SegPageBackupRestoreDevice::getReservedBufferPage()
00219 {
00220 if (nReservedPages == 0) {
00221 assert(false);
00222 }
00223 return reservedPages[--nReservedPages];
00224 }
00225
00226 void SegPageBackupRestoreDevice::writeBackupPage(PConstBuffer pageBuffer)
00227 {
00228 writeBackupPage(pageBuffer, false);
00229 }
00230
00231 void SegPageBackupRestoreDevice::writeBackupPage(
00232 PConstBuffer pageBuffer,
00233 bool scheduledWrite)
00234 {
00235 size_t pagesWritten = fwrite(pageBuffer, pageSize, 1, backupFile);
00236 if (pagesWritten < 1) {
00237 if (!scheduledWrite) {
00238 throw SysCallExcn(
00239 FennelResource::instance().writeBackupFileFailed(
00240 backupFilePathname));
00241 } else if (!pPendingExcn) {
00242
00243
00244
00245 pPendingExcn.reset(
00246 new SysCallExcn(
00247 FennelResource::instance().writeBackupFileFailed(
00248 backupFilePathname)));
00249 }
00250 }
00251 }
00252
00253 void SegPageBackupRestoreDevice::backupPage(BlockId blockId)
00254 {
00255
00256
00257
00258
00259 BackupRestorePage *pScratchPage = getFreeScratchPage();
00260
00261 pScratchPage->setPageCounter(currPageReadCount++);
00262 RandomAccessRequest request;
00263 request.pDevice = pDataDevice.get();
00264 request.cbOffset = (FileSize) pageSize * CompoundId::getBlockNum(blockId);
00265 request.cbTransfer = pageSize;
00266 request.type = RandomAccessRequest::READ;
00267 pScratchPage->setReadRequest(true);
00268 request.bindingList.push_back(*pScratchPage);
00269 scheduler.schedule(request);
00270 }
00271
00272 BackupRestorePage *SegPageBackupRestoreDevice::getFreeScratchPage()
00273 {
00274 while (true) {
00275 StrictMutexGuard mutexGuard(mutex);
00276 if (!freeScratchPageQueue.empty()) {
00277 BackupRestorePage *freePage = freeScratchPageQueue.back();
00278 freeScratchPageQueue.pop_back();
00279 return freePage;
00280 }
00281
00282
00283
00284 boost::xtime atv;
00285 convertTimeout(100, atv);
00286 freeScratchPageCondition.timed_wait(mutexGuard, atv);
00287 checkPendingException();
00288 }
00289 }
00290
00291 void SegPageBackupRestoreDevice::notifyReadTransferCompletion(
00292 BackupRestorePage &scratchPage,
00293 bool bSuccess)
00294 {
00295 if (!bSuccess) {
00296 if (!pPendingExcn) {
00297 pPendingExcn.reset(
00298 new SysCallExcn(
00299 FennelResource::instance().readDataPageFailed()));
00300 }
00301 return;
00302 }
00303
00304
00305
00306
00307
00308 if (scratchPage.getPageCounter() == currPageWriteCount) {
00309 writeBackupPage(scratchPage.getBuffer(), true);
00310 freeScratchPage(scratchPage);
00311 } else {
00312 pendingWriteMap[scratchPage.getPageCounter()] = &scratchPage;
00313 }
00314
00315
00316
00317 while (true) {
00318 PendingWriteMapIter iter = pendingWriteMap.find(currPageWriteCount);
00319 if (iter == pendingWriteMap.end()) {
00320 break;
00321 }
00322 BackupRestorePage *nextPage = iter->second;
00323 pendingWriteMap.erase(currPageWriteCount);
00324 writeBackupPage(nextPage->getBuffer(), true);
00325 freeScratchPage(*nextPage);
00326
00327
00328 }
00329 }
00330
00331 void SegPageBackupRestoreDevice::freeScratchPage(BackupRestorePage &scratchPage)
00332 {
00333 ++currPageWriteCount;
00334 freeScratchPageQueue.push_back(&scratchPage);
00335 freeScratchPageCondition.notify_all();
00336 }
00337
00338 void SegPageBackupRestoreDevice::waitForPendingWrites()
00339 {
00340 StrictMutexGuard mutexGuard(mutex);
00341 while (currPageWriteCount < currPageReadCount) {
00342 freeScratchPageCondition.wait(mutexGuard);
00343 checkPendingException();
00344 }
00345
00346 checkPendingException();
00347 }
00348
00349 void SegPageBackupRestoreDevice::restorePage(BlockId blockId)
00350 {
00351
00352 BackupRestorePage *pScratchPage = getFreeScratchPage();
00353
00354 size_t pagesRead =
00355 fread(pScratchPage->getBuffer(), pageSize, 1, backupFile);
00356 if (pagesRead < 1) {
00357 throw SysCallExcn(
00358 FennelResource::instance().readBackupFileFailed(
00359 backupFilePathname));
00360 }
00361
00362
00363 pScratchPage->setPageCounter(currPageReadCount++);
00364 RandomAccessRequest request;
00365 request.pDevice = pDataDevice.get();
00366 request.cbOffset = (FileSize) pageSize * CompoundId::getBlockNum(blockId);
00367 request.cbTransfer = pageSize;
00368 request.type = RandomAccessRequest::WRITE;
00369 pScratchPage->setReadRequest(false);
00370 request.bindingList.push_back(*pScratchPage);
00371 scheduler.schedule(request);
00372 }
00373
00374 void SegPageBackupRestoreDevice::notifyWriteTransferCompletion(
00375 BackupRestorePage &scratchPage,
00376 bool bSuccess)
00377 {
00378 if (!bSuccess) {
00379 if (!pPendingExcn) {
00380 pPendingExcn.reset(
00381 new SysCallExcn(
00382 FennelResource::instance().writeDataPageFailed()));
00383 }
00384 return;
00385 }
00386
00387 freeScratchPage(scratchPage);
00388 }
00389
00390 void SegPageBackupRestoreDevice::checkPendingException()
00391 {
00392 if (pPendingExcn) {
00393 pPendingExcn->throwSelf();
00394 }
00395 }
00396
00397 void SegPageBackupRestoreDevice::closeImpl()
00398 {
00399
00400
00401
00402 waitForPendingWrites();
00403
00404 StrictMutexGuard mutexGuard(mutex);
00405 if (scratchAccessor.pSegment) {
00406 scratchAccessor.pSegment->deallocatePageRange(
00407 NULL_PAGE_ID, NULL_PAGE_ID);
00408 }
00409 backupRestorePages.reset();
00410 reservedPages.reset();
00411 freeScratchPageQueue.clear();
00412 if (backupFile != NULL) {
00413 if (isCompressed) {
00414 #ifdef __MSVC__
00415 throw FennelExcn(
00416 FennelResource::instance().unsupportedOperation("pclose"));
00417 #else
00418 pclose(backupFile);
00419 #endif
00420 } else {
00421 fclose(backupFile);
00422 }
00423 backupFile = NULL;
00424 }
00425 }
00426
00427 StrictMutex &SegPageBackupRestoreDevice::getMutex()
00428 {
00429 return mutex;
00430 }
00431
00432 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $");
00433
00434