00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/txn/LogicalTxn.h"
00026 #include "fennel/txn/LogicalTxnParticipant.h"
00027 #include "fennel/segment/SegOutputStream.h"
00028 #include "fennel/segment/SegInputStream.h"
00029 #include "fennel/segment/SpillOutputStream.h"
00030 #include "fennel/txn/LogicalTxnLog.h"
00031 #include "fennel/txn/LogicalRecoveryLog.h"
00032 #include "fennel/txn/LogicalRecoveryTxn.h"
00033
00034 #include <boost/bind.hpp>
00035
00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $");
00037
00038 LogicalTxn::LogicalTxn(
00039 TxnId txnIdInit,
00040 SharedLogicalTxnLog pLogInit,
00041 SharedCacheAccessor pCacheAccessorInit)
00042 : txnId(txnIdInit),
00043 pLog(pLogInit),
00044 pCacheAccessor(pCacheAccessorInit)
00045 {
00046 pOutputStream = SpillOutputStream::newSpillOutputStream(
00047 pLog->pSegmentFactory,
00048 pCacheAccessor,
00049 LogicalRecoveryLog::getLongLogFileName(txnId));
00050
00051
00052
00053
00054 pOutputStream->setWriteLatency(WRITE_EAGER_ASYNC);
00055
00056 state = STATE_LOGGING_TXN;
00057 svpt.cbActionPrev = 0;
00058 svpt.cbLogged = 0;
00059 checkpointed = false;
00060 }
00061
00062 LogicalTxn::~LogicalTxn()
00063 {
00064 assert(isEnded());
00065 assert(participants.empty());
00066 }
00067
00068 void LogicalTxn::addParticipant(SharedLogicalTxnParticipant pParticipant)
00069 {
00070 if (pParticipant->pTxn) {
00071 assert(pParticipant->pTxn == this);
00072 return;
00073 }
00074 participants.push_back(pParticipant);
00075 pParticipant->pTxn = this;
00076 pParticipant->enableLogging(true);
00077 describeParticipant(pParticipant);
00078 }
00079
00080 ByteOutputStream &LogicalTxn::beginLogicalAction(
00081 LogicalTxnParticipant &participant,
00082 LogicalActionType actionType)
00083 {
00084 assert(participant.pTxn == this);
00085 return beginLogicalAction(&participant,actionType);
00086 }
00087
00088 ByteOutputStream &LogicalTxn::beginLogicalAction(
00089 LogicalTxnParticipant *pParticipant,
00090 LogicalActionType actionType)
00091 {
00092 assert(state == STATE_LOGGING_TXN);
00093 LogicalTxnActionHeader actionHeader;
00094 actionHeader.pParticipant = pParticipant;
00095 actionHeader.actionType = actionType;
00096 actionHeader.cbActionPrev = svpt.cbActionPrev;
00097 pOutputStream->writeValue(actionHeader);
00098 state = STATE_LOGGING_ACTION;
00099 return *pOutputStream;
00100 }
00101
00102 void LogicalTxn::endLogicalAction()
00103 {
00104 assert(state == STATE_LOGGING_ACTION);
00105 state = STATE_LOGGING_TXN;
00106 svpt.cbActionPrev =
00107 pOutputStream->getOffset() - svpt.cbLogged;
00108 svpt.cbLogged = pOutputStream->getOffset();
00109 }
00110
00111 SavepointId LogicalTxn::createSavepoint()
00112 {
00113 assert(state == STATE_LOGGING_TXN);
00114 SavepointId svptId = SavepointId(savepoints.size());
00115 savepoints.push_back(svpt);
00116 return svptId;
00117 }
00118
00119 void LogicalTxn::commitSavepoint(SavepointId svptId)
00120 {
00121 assert(state == STATE_LOGGING_TXN);
00122 uint iSvpt = opaqueToInt(svptId);
00123 assert(iSvpt < savepoints.size());
00124 savepoints.resize(iSvpt);
00125 }
00126
00127 void LogicalTxn::rollback(SavepointId const *pSvptId)
00128 {
00129 assert(state == STATE_LOGGING_TXN);
00130 if (pSvptId) {
00131 uint iSvpt = opaqueToInt(*pSvptId);
00132 assert(iSvpt < savepoints.size());
00133 savepoints.resize(iSvpt + 1);
00134 rollbackToSavepoint(savepoints[iSvpt]);
00135 return;
00136 }
00137
00138
00139 SharedLogicalTxn pThis = shared_from_this();
00140
00141
00142
00143 forgetAllParticipants();
00144
00145 SharedSegment pLongLogSegment = pOutputStream->getSegment();
00146 SharedByteInputStream pInputStream =
00147 pOutputStream->getInputStream(SEEK_STREAM_END);
00148 pOutputStream->close();
00149 assert(svpt.cbLogged == pInputStream->getOffset());
00150
00151 {
00152 state = STATE_ROLLING_BACK;
00153 LogicalRecoveryTxn recoveryTxn(pInputStream,NULL);
00154 recoveryTxn.undoActions(svpt);
00155 }
00156
00157 svpt.cbLogged = pInputStream->getOffset();
00158 state = STATE_ROLLED_BACK;
00159 pInputStream.reset();
00160 if (pLongLogSegment) {
00161 pLongLogSegment->checkpoint(CHECKPOINT_DISCARD);
00162 pLongLogSegment.reset();
00163 }
00164 pLog->rollbackTxn(pThis);
00165 pLog.reset();
00166 pOutputStream.reset();
00167 }
00168
00169 void LogicalTxn::commit()
00170 {
00171
00172 SharedLogicalTxn pThis = shared_from_this();
00173 pLog->commitTxn(pThis);
00174 pLog.reset();
00175 state = STATE_COMMITTED;
00176 forgetAllParticipants();
00177 }
00178
00179 void LogicalTxn::describeAllParticipants()
00180 {
00181 std::for_each(
00182 participants.begin(),
00183 participants.end(),
00184 boost::bind(&LogicalTxn::describeParticipant,this,_1));
00185 }
00186
00187 void LogicalTxn::describeParticipant(SharedLogicalTxnParticipant pParticipant)
00188 {
00189 beginLogicalAction(*pParticipant,ACTION_TXN_DESCRIBE_PARTICIPANT);
00190 LogicalTxnClassId classId =
00191 pParticipant->getParticipantClassId();
00192 pOutputStream->writeValue(classId);
00193 pParticipant->describeParticipant(*pOutputStream);
00194 endLogicalAction();
00195 }
00196
00197 void LogicalTxn::forgetAllParticipants()
00198 {
00199 std::for_each(
00200 participants.begin(),
00201 participants.end(),
00202 boost::bind(&LogicalTxnParticipant::clearLogicalTxn,_1));
00203 participants.clear();
00204 }
00205
00206 void LogicalTxn::rollbackToSavepoint(LogicalTxnSavepoint &oldSvpt)
00207 {
00208 assert(oldSvpt.cbLogged <= svpt.cbLogged);
00209
00210 std::for_each(
00211 participants.begin(),
00212 participants.end(),
00213 boost::bind(&LogicalTxnParticipant::enableLogging,_1,false));
00214
00215
00216
00217 SharedByteInputStream pInputStream =
00218 pOutputStream->getInputStream(SEEK_STREAM_END);
00219 assert(svpt.cbLogged == pInputStream->getOffset());
00220 {
00221 state = STATE_ROLLING_BACK;
00222 LogicalRecoveryTxn recoveryTxn(pInputStream,NULL);
00223 recoveryTxn.undoActions(svpt,MAXU,oldSvpt.cbLogged);
00224 state = STATE_LOGGING_TXN;
00225 }
00226 pInputStream.reset();
00227
00228
00229 std::for_each(
00230 participants.begin(),
00231 participants.end(),
00232 boost::bind(&LogicalTxnParticipant::enableLogging,_1,true));
00233
00234
00235 beginLogicalAction(NULL,ACTION_TXN_ROLLBACK_TO_SAVEPOINT);
00236 pOutputStream->writeValue(oldSvpt);
00237 endLogicalAction();
00238 }
00239
00240 SharedLogicalTxnLog LogicalTxn::getLog()
00241 {
00242 return pLog;
00243 }
00244
00245 bool LogicalTxn::isEnded() const
00246 {
00247 return state == STATE_ROLLED_BACK || state == STATE_COMMITTED;
00248 }
00249
00250 TxnId LogicalTxn::getTxnId() const
00251 {
00252 return txnId;
00253 }
00254
00255 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $");
00256
00257