LogicalRecoveryTxn.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/txn/LogicalRecoveryTxn.cpp#9 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/txn/LogicalRecoveryTxn.h"
00026 #include "fennel/txn/LogicalTxnStoredStructs.h"
00027 #include "fennel/common/ByteInputStream.h"
00028 #include "fennel/txn/LogicalTxnParticipant.h"
00029 #include "fennel/txn/LogicalTxnParticipantFactory.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalRecoveryTxn.cpp#9 $");
00032 
00033 LogicalRecoveryTxn::LogicalRecoveryTxn(
00034     SharedByteInputStream pTxnInputStreamInit,
00035     LogicalTxnParticipantFactory *pParticipantFactoryInit)
00036     : pTxnInputStream(pTxnInputStreamInit),
00037       pParticipantFactory(pParticipantFactoryInit)
00038 {
00039 }
00040 
00041 LogicalRecoveryTxn::~LogicalRecoveryTxn()
00042 {
00043 }
00044 
00045 void LogicalRecoveryTxn::redoActions(
00046     uint cbRedo)
00047 {
00048     FileSize cbStart = pTxnInputStream->getOffset();
00049     while (pTxnInputStream->getOffset() < (cbStart + cbRedo)) {
00050         LogicalTxnActionHeader actionHeader;
00051         pTxnInputStream->readValue(actionHeader);
00052 
00053         switch (actionHeader.actionType) {
00054         case ACTION_TXN_DESCRIBE_PARTICIPANT:
00055             recoverParticipant(actionHeader.pParticipant);
00056             break;
00057         case ACTION_TXN_ROLLBACK_TO_SAVEPOINT:
00058             {
00059                 LogicalTxnSavepoint oldSvpt;
00060                 pTxnInputStream->readValue(oldSvpt);
00061                 // remember current position
00062                 FileSize offset = pTxnInputStream->getOffset();
00063                 // rewind this action
00064                 pTxnInputStream->seekBackward(
00065                     sizeof(oldSvpt) + sizeof(actionHeader));
00066                 LogicalTxnSavepoint svptEnd;
00067                 svptEnd.cbLogged = pTxnInputStream->getOffset();
00068                 svptEnd.cbActionPrev = actionHeader.cbActionPrev;
00069                 // redo rollback
00070                 undoActions(svptEnd,MAXU,oldSvpt.cbLogged);
00071                 // restore position
00072                 pTxnInputStream->seekForward(
00073                     offset - pTxnInputStream->getOffset());
00074             }
00075             break;
00076         default:
00077             {
00078                 LogicalTxnParticipant *pParticipant = swizzleParticipant(
00079                     actionHeader.pParticipant);
00080                 pParticipant->redoLogicalAction(
00081                     actionHeader.actionType,
00082                     *pTxnInputStream);
00083             }
00084             break;
00085         }
00086     }
00087     assert(pTxnInputStream->getOffset() == (cbStart + cbRedo));
00088 }
00089 
00090 void LogicalRecoveryTxn::recoverParticipant(
00091     LogicalTxnParticipant *pLoggedParticipant)
00092 {
00093     if (isOnline()) {
00094         return;
00095     }
00096     LogicalTxnClassId classId;
00097     pTxnInputStream->readValue(classId);
00098     SharedLogicalTxnParticipant pRecoveredParticipant =
00099         pParticipantFactory->loadParticipant(classId,*pTxnInputStream);
00100     participantMap[pLoggedParticipant] = pRecoveredParticipant;
00101 }
00102 
00103 void LogicalRecoveryTxn::undoActions(
00104     LogicalTxnSavepoint const &svptEnd,
00105     uint nActionsMax,
00106     FileSize minActionOffset)
00107 {
00108     assert(isMAXU(nActionsMax) || !minActionOffset);
00109 
00110     uint nActions = 0;
00111     uint cbActionExpected = svptEnd.cbActionPrev;
00112     uint seekDist = 0;
00113     while (cbActionExpected && (nActions < nActionsMax)) {
00114         seekDist += cbActionExpected;
00115         pTxnInputStream->seekBackward(seekDist);
00116         FileSize actionOffset = pTxnInputStream->getOffset();
00117         if (actionOffset < minActionOffset) {
00118             break;
00119         }
00120         LogicalTxnActionHeader actionHeader;
00121         pTxnInputStream->readValue(actionHeader);
00122 
00123         switch (actionHeader.actionType) {
00124         case ACTION_TXN_DESCRIBE_PARTICIPANT:
00125             if (swizzleParticipant(actionHeader.pParticipant)) {
00126                 // ignore log data since the participant is already available
00127                 seekDist = sizeof(actionHeader);
00128             } else {
00129                 recoverParticipant(actionHeader.pParticipant);
00130                 assert(pTxnInputStream->getOffset() ==
00131                        actionOffset + cbActionExpected);
00132                 seekDist = cbActionExpected;
00133             }
00134             break;
00135         case ACTION_TXN_ROLLBACK_TO_SAVEPOINT:
00136             {
00137                 // skip everything back to savepoint, since it was already
00138                 // undone
00139                 LogicalTxnSavepoint oldSvpt;
00140                 pTxnInputStream->readValue(oldSvpt);
00141                 assert(oldSvpt.cbLogged < pTxnInputStream->getOffset());
00142                 actionHeader.cbActionPrev = oldSvpt.cbActionPrev;
00143                 seekDist = pTxnInputStream->getOffset() - oldSvpt.cbLogged;
00144             }
00145             break;
00146         default:
00147             {
00148                 LogicalTxnParticipant *pParticipant = swizzleParticipant(
00149                     actionHeader.pParticipant);
00150                 pParticipant->undoLogicalAction(
00151                     actionHeader.actionType,
00152                     *pTxnInputStream);
00153                 assert(pTxnInputStream->getOffset() ==
00154                        actionOffset + cbActionExpected);
00155                 seekDist = cbActionExpected;
00156             }
00157             break;
00158         }
00159         cbActionExpected = actionHeader.cbActionPrev;
00160         ++nActions;
00161     }
00162 }
00163 
00164 LogicalTxnParticipant *LogicalRecoveryTxn::swizzleParticipant(
00165     LogicalTxnParticipant *pParticipant)
00166 {
00167     if (isOnline()) {
00168         return pParticipant;
00169     }
00170     ParticipantMapIter pParticipantEntry = participantMap.find(pParticipant);
00171     if (pParticipantEntry != participantMap.end()) {
00172         return pParticipantEntry->second.get();
00173     } else {
00174         return NULL;
00175     }
00176 }
00177 
00178 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalRecoveryTxn.cpp#9 $");
00179 
00180 // End LogicalRecoveryTxn.cpp

Generated on Mon Jun 22 04:00:21 2009 for Fennel by  doxygen 1.5.1