00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/txn/LogicalRecoveryLog.h"
00026 #include "fennel/txn/LogicalRecoveryTxn.h"
00027 #include "fennel/txn/LogicalTxnStoredStructs.h"
00028 #include "fennel/txn/LogicalTxnLog.h"
00029 #include "fennel/segment/CrcSegInputStream.h"
00030 #include "fennel/device/DeviceMode.h"
00031 #include "fennel/txn/LogicalTxnParticipantFactory.h"
00032 #include "fennel/segment/SegmentFactory.h"
00033
00034 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalRecoveryLog.cpp#9 $");
00035
00036 LogicalRecoveryLog::LogicalRecoveryLog(
00037 LogicalTxnParticipantFactory &participantFactoryInit,
00038 SegmentAccessor const &logSegmentAccessorInit,
00039 PseudoUuid const &onlineUuid,
00040 SharedSegmentFactory pSegmentFactoryInit)
00041 : participantFactory(participantFactoryInit),
00042 pSegmentFactory(pSegmentFactoryInit),
00043 logSegmentAccessor(logSegmentAccessorInit)
00044 {
00045 pInputStream = CrcSegInputStream::newCrcSegInputStream(
00046 logSegmentAccessor,onlineUuid);
00047 }
00048
00049 SharedLogicalRecoveryLog LogicalRecoveryLog::newLogicalRecoveryLog(
00050 LogicalTxnParticipantFactory &participantFactory,
00051 SegmentAccessor const &logSegmentAccessor,
00052 PseudoUuid const &onlineUuid,
00053 SharedSegmentFactory pSegmentFactory)
00054 {
00055 return SharedLogicalRecoveryLog(
00056 new LogicalRecoveryLog(
00057 participantFactory,logSegmentAccessor,onlineUuid,pSegmentFactory));
00058 }
00059
00060 LogicalRecoveryLog::~LogicalRecoveryLog()
00061 {
00062 }
00063
00064 void LogicalRecoveryLog::recover(
00065 LogicalTxnLogCheckpointMemento const &logMemento)
00066 {
00067 LogicalTxnEventMemento txnMemento;
00068 pInputStream->seekSegPos(logMemento.logPosition);
00069 for (uint i = 0; i < logMemento.nUncommittedTxns; ++i) {
00070 uint cb = pInputStream->readValue(txnMemento);
00071 assert(cb == sizeof(txnMemento));
00072 assert(txnMemento.event == LogicalTxnEventMemento::EVENT_CHECKPOINT);
00073 checkpointTxnMap[txnMemento.txnId] = txnMemento;
00074 }
00075 for (;;) {
00076 uint cb = pInputStream->readValue(txnMemento);
00077 if (cb < sizeof(txnMemento)) {
00078 break;
00079 }
00080 TxnId txnId = txnMemento.txnId;
00081 TxnMapIter pTxnEntry = checkpointTxnMap.find(txnId);
00082 SharedSegInputStream pTxnInputStream;
00083 if (txnMemento.longLog) {
00084 pTxnInputStream = openLongLogStream(txnId);
00085 } else {
00086
00087
00088 pTxnInputStream = pInputStream;
00089 }
00090 switch (txnMemento.event) {
00091 case LogicalTxnEventMemento::EVENT_COMMIT:
00092 if (pTxnEntry == checkpointTxnMap.end()) {
00093 redoTxn(txnMemento,NULL,pTxnInputStream);
00094 } else {
00095 redoTxn(
00096 txnMemento,
00097 &(pTxnEntry->second),
00098 pTxnInputStream);
00099 checkpointTxnMap.erase(pTxnEntry);
00100 }
00101 break;
00102 case LogicalTxnEventMemento::EVENT_ROLLBACK:
00103 assert(pTxnEntry != checkpointTxnMap.end());
00104 undoTxn(pTxnEntry->second,pTxnInputStream);
00105 checkpointTxnMap.erase(txnId);
00106 break;
00107 case LogicalTxnEventMemento::EVENT_CHECKPOINT:
00108 break;
00109 default:
00110 permAssert(false);
00111 }
00112 }
00113 for (TxnMapIter pTxnEntry = checkpointTxnMap.begin();
00114 pTxnEntry != checkpointTxnMap.end(); ++pTxnEntry)
00115 {
00116 SharedSegInputStream pTxnInputStream =
00117 openLongLogStream(pTxnEntry->first);
00118 undoTxn(pTxnEntry->second,pTxnInputStream);
00119 }
00120 checkpointTxnMap.clear();
00121 }
00122
00123 SharedSegInputStream LogicalRecoveryLog::openLongLogStream(TxnId txnId)
00124 {
00125 DeviceMode openMode = DeviceMode::load;
00126 openMode.readOnly = true;
00127 SharedSegment pTxnLogSegment =
00128 pSegmentFactory->newTempDeviceSegment(
00129 logSegmentAccessor.pCacheAccessor->getCache(),
00130 openMode,
00131 getLongLogFileName(txnId));
00132 SegmentAccessor txnSegmentAccessor(
00133 pTxnLogSegment,
00134 logSegmentAccessor.pCacheAccessor);
00135 return SegInputStream::newSegInputStream(txnSegmentAccessor);
00136 }
00137
00138 void LogicalRecoveryLog::redoTxn(
00139 LogicalTxnEventMemento const &commitMemento,
00140 LogicalTxnEventMemento const *pCheckpointMemento,
00141 SharedSegInputStream pTxnInputStream)
00142 {
00143 LogicalRecoveryTxn recoveryTxn(
00144 pTxnInputStream,
00145 &participantFactory);
00146 FileSize cbRedo;
00147 if (pCheckpointMemento) {
00148 pTxnInputStream->seekSegPos(pCheckpointMemento->logPosition);
00149
00150 LogicalTxnSavepoint svpt;
00151 svpt.cbActionPrev = pCheckpointMemento->cbActionLast;
00152 svpt.cbLogged = pCheckpointMemento->logPosition.cbOffset;
00153 recoveryTxn.undoActions(
00154 svpt,
00155 pCheckpointMemento->nParticipants);
00156
00157 cbRedo = commitMemento.logPosition.cbOffset
00158 - pCheckpointMemento->logPosition.cbOffset;
00159 pTxnInputStream->seekSegPos(pCheckpointMemento->logPosition);
00160 } else {
00161 cbRedo = commitMemento.logPosition.cbOffset;
00162 }
00163 recoveryTxn.redoActions(cbRedo);
00164 }
00165
00166 void LogicalRecoveryLog::undoTxn(
00167 LogicalTxnEventMemento const &checkpointMemento,
00168 SharedSegInputStream pTxnInputStream)
00169 {
00170 LogicalTxnSavepoint svpt;
00171 svpt.cbActionPrev = checkpointMemento.cbActionLast;
00172 svpt.cbLogged = checkpointMemento.logPosition.cbOffset;
00173 pTxnInputStream->seekSegPos(checkpointMemento.logPosition);
00174 LogicalRecoveryTxn recoveryTxn(
00175 pTxnInputStream,
00176 &participantFactory);
00177 recoveryTxn.undoActions(svpt);
00178 }
00179
00180 std::string LogicalRecoveryLog::getLongLogFileName(TxnId txnId)
00181 {
00182 std::ostringstream oss;
00183 oss << "txn";
00184 oss << txnId;
00185 oss << ".dat";
00186 return oss.str();
00187 }
00188
00189 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalRecoveryLog.cpp#9 $");
00190
00191