#include <LogicalTxn.h>
Public Member Functions | |
virtual | ~LogicalTxn () |
void | addParticipant (SharedLogicalTxnParticipant pParticipant) |
Registers a participant which is joining the transaction. | |
ByteOutputStream & | beginLogicalAction (LogicalTxnParticipant &participant, LogicalActionType actionType) |
Begins an action description log entry. | |
void | endLogicalAction () |
Ends the log entry for an action description. | |
SavepointId | createSavepoint () |
Creates a savepoint representing the current transaction state. | |
void | commitSavepoint (SavepointId svptId) |
Commits a given savepoint and any later savepoints. | |
void | rollback (SavepointId const *pSvptId=NULL) |
Aborts the transaction. | |
void | commit () |
Commits the transaction. | |
SharedLogicalTxnLog | getLog () |
| |
TxnId | getTxnId () const |
| |
bool | isEnded () const |
| |
Private Types | |
enum | State { STATE_LOGGING_TXN, STATE_LOGGING_ACTION, STATE_ROLLING_BACK, STATE_ROLLED_BACK, STATE_COMMITTED } |
Valid transaction states. More... | |
Private Member Functions | |
LogicalTxn (TxnId txnId, SharedLogicalTxnLog pLog, SharedCacheAccessor pCacheAccessor) | |
void | describeParticipant (SharedLogicalTxnParticipant) |
void | describeAllParticipants () |
void | forgetAllParticipants () |
void | rollbackToSavepoint (LogicalTxnSavepoint &oldSvpt) |
ByteOutputStream & | beginLogicalAction (LogicalTxnParticipant *pParticipant, LogicalActionType actionType) |
Private Attributes | |
TxnId | txnId |
Identifier for this transaction. | |
SharedLogicalTxnLog | pLog |
Log containing this txn. | |
SharedCacheAccessor | pCacheAccessor |
CacheAccessor to use for writing to log. | |
SharedSpillOutputStream | pOutputStream |
Stream for writing to this transaction's log. | |
State | state |
Current txn state. | |
bool | checkpointed |
Was this txn active at the last checkpoint? | |
LogicalTxnSavepoint | svpt |
Potential savepoint representing current log position. | |
std::vector< LogicalTxnSavepoint > | savepoints |
Savepoints previously returned via createSavepoint. | |
std::vector< SharedLogicalTxnParticipant > | participants |
Collection of LogicalTxnParticipants which have joined this txn. | |
Friends | |
class | LogicalTxnLog |
Definition at line 41 of file LogicalTxn.h.
enum LogicalTxn::State [private] |
Valid transaction states.
STATE_LOGGING_TXN | |
STATE_LOGGING_ACTION | |
STATE_ROLLING_BACK | |
STATE_ROLLED_BACK | |
STATE_COMMITTED |
Definition at line 50 of file LogicalTxn.h.
00051 { 00052 STATE_LOGGING_TXN, 00053 00054 STATE_LOGGING_ACTION, 00055 00056 STATE_ROLLING_BACK, 00057 00058 STATE_ROLLED_BACK, 00059 00060 STATE_COMMITTED 00061 };
LogicalTxn::LogicalTxn | ( | TxnId | txnId, | |
SharedLogicalTxnLog | pLog, | |||
SharedCacheAccessor | pCacheAccessor | |||
) | [explicit, private] |
Definition at line 38 of file LogicalTxn.cpp.
References LogicalTxnSavepoint::cbActionPrev, LogicalTxnSavepoint::cbLogged, checkpointed, LogicalRecoveryLog::getLongLogFileName(), SpillOutputStream::newSpillOutputStream(), pCacheAccessor, pLog, pOutputStream, state, STATE_LOGGING_TXN, svpt, txnId, and WRITE_EAGER_ASYNC.
00042 : txnId(txnIdInit), 00043 pLog(pLogInit), 00044 pCacheAccessor(pCacheAccessorInit) 00045 { 00046 pOutputStream = SpillOutputStream::newSpillOutputStream( 00047 pLog->pSegmentFactory, 00048 pCacheAccessor, 00049 LogicalRecoveryLog::getLongLogFileName(txnId)); 00050 00051 // REVIEW: We could use something like a WALSegment to keep track of page 00052 // states, eliminating the overhead of a full-cache checkpoint when log is 00053 // committed. 00054 pOutputStream->setWriteLatency(WRITE_EAGER_ASYNC); 00055 00056 state = STATE_LOGGING_TXN; 00057 svpt.cbActionPrev = 0; 00058 svpt.cbLogged = 0; 00059 checkpointed = false; 00060 }
LogicalTxn::~LogicalTxn | ( | ) | [virtual] |
Definition at line 62 of file LogicalTxn.cpp.
References isEnded(), and participants.
00063 { 00064 assert(isEnded()); 00065 assert(participants.empty()); 00066 }
void LogicalTxn::describeParticipant | ( | SharedLogicalTxnParticipant | ) | [private] |
Definition at line 187 of file LogicalTxn.cpp.
References ACTION_TXN_DESCRIBE_PARTICIPANT, beginLogicalAction(), endLogicalAction(), and pOutputStream.
Referenced by addParticipant(), and describeAllParticipants().
00188 { 00189 beginLogicalAction(*pParticipant,ACTION_TXN_DESCRIBE_PARTICIPANT); 00190 LogicalTxnClassId classId = 00191 pParticipant->getParticipantClassId(); 00192 pOutputStream->writeValue(classId); 00193 pParticipant->describeParticipant(*pOutputStream); 00194 endLogicalAction(); 00195 }
void LogicalTxn::describeAllParticipants | ( | ) | [private] |
Definition at line 179 of file LogicalTxn.cpp.
References describeParticipant(), and participants.
00180 { 00181 std::for_each( 00182 participants.begin(), 00183 participants.end(), 00184 boost::bind(&LogicalTxn::describeParticipant,this,_1)); 00185 }
void LogicalTxn::forgetAllParticipants | ( | ) | [private] |
Definition at line 197 of file LogicalTxn.cpp.
References LogicalTxnParticipant::clearLogicalTxn(), and participants.
Referenced by commit(), and rollback().
00198 { 00199 std::for_each( 00200 participants.begin(), 00201 participants.end(), 00202 boost::bind(&LogicalTxnParticipant::clearLogicalTxn,_1)); 00203 participants.clear(); 00204 }
void LogicalTxn::rollbackToSavepoint | ( | LogicalTxnSavepoint & | oldSvpt | ) | [private] |
Definition at line 206 of file LogicalTxn.cpp.
References ACTION_TXN_ROLLBACK_TO_SAVEPOINT, beginLogicalAction(), LogicalTxnSavepoint::cbLogged, LogicalTxnParticipant::enableLogging(), endLogicalAction(), MAXU, participants, pOutputStream, SEEK_STREAM_END, state, STATE_LOGGING_TXN, STATE_ROLLING_BACK, svpt, and LogicalRecoveryTxn::undoActions().
Referenced by rollback().
00207 { 00208 assert(oldSvpt.cbLogged <= svpt.cbLogged); 00209 // disable logging for all participants during rollback 00210 std::for_each( 00211 participants.begin(), 00212 participants.end(), 00213 boost::bind(&LogicalTxnParticipant::enableLogging,_1,false)); 00214 00215 // TODO: for short logs, could just reuse memory 00216 00217 SharedByteInputStream pInputStream = 00218 pOutputStream->getInputStream(SEEK_STREAM_END); 00219 assert(svpt.cbLogged == pInputStream->getOffset()); 00220 { 00221 state = STATE_ROLLING_BACK; 00222 LogicalRecoveryTxn recoveryTxn(pInputStream,NULL); 00223 recoveryTxn.undoActions(svpt,MAXU,oldSvpt.cbLogged); 00224 state = STATE_LOGGING_TXN; 00225 } 00226 pInputStream.reset(); 00227 00228 // re-enable logging for all participants 00229 std::for_each( 00230 participants.begin(), 00231 participants.end(), 00232 boost::bind(&LogicalTxnParticipant::enableLogging,_1,true)); 00233 00234 // write log entry noting the partial rollback 00235 beginLogicalAction(NULL,ACTION_TXN_ROLLBACK_TO_SAVEPOINT); 00236 pOutputStream->writeValue(oldSvpt); 00237 endLogicalAction(); 00238 }
ByteOutputStream & LogicalTxn::beginLogicalAction | ( | LogicalTxnParticipant * | pParticipant, | |
LogicalActionType | actionType | |||
) | [private] |
Definition at line 88 of file LogicalTxn.cpp.
References LogicalTxnActionHeader::actionType, LogicalTxnSavepoint::cbActionPrev, LogicalTxnActionHeader::cbActionPrev, pOutputStream, LogicalTxnActionHeader::pParticipant, state, STATE_LOGGING_ACTION, STATE_LOGGING_TXN, and svpt.
Referenced by beginLogicalAction(), BTreeWriter::deleteCurrent(), describeParticipant(), FtrsTableWriter::execute(), DatabaseTest::executeIncrementAction(), BTreeWriter::insertTupleFromBuffer(), and rollbackToSavepoint().
00091 { 00092 assert(state == STATE_LOGGING_TXN); 00093 LogicalTxnActionHeader actionHeader; 00094 actionHeader.pParticipant = pParticipant; 00095 actionHeader.actionType = actionType; 00096 actionHeader.cbActionPrev = svpt.cbActionPrev; 00097 pOutputStream->writeValue(actionHeader); 00098 state = STATE_LOGGING_ACTION; 00099 return *pOutputStream; 00100 }
void LogicalTxn::addParticipant | ( | SharedLogicalTxnParticipant | pParticipant | ) |
Registers a participant which is joining the transaction.
Must be called before the participant can log any actions.
pParticipant | the participant to join; reference will be retained for the duration of the txn |
Definition at line 68 of file LogicalTxn.cpp.
References describeParticipant(), and participants.
00069 { 00070 if (pParticipant->pTxn) { 00071 assert(pParticipant->pTxn == this); 00072 return; 00073 } 00074 participants.push_back(pParticipant); 00075 pParticipant->pTxn = this; 00076 pParticipant->enableLogging(true); 00077 describeParticipant(pParticipant); 00078 }
ByteOutputStream & LogicalTxn::beginLogicalAction | ( | LogicalTxnParticipant & | participant, | |
LogicalActionType | actionType | |||
) |
Begins an action description log entry.
After this, the participant must write the action description to the txn's output stream.
participant | the LogicalTxnParticipant initiating the action | |
actionType | participant-defined LogicalActionType |
Definition at line 80 of file LogicalTxn.cpp.
References beginLogicalAction(), and LogicalTxnParticipant::pTxn.
00083 { 00084 assert(participant.pTxn == this); 00085 return beginLogicalAction(&participant,actionType); 00086 }
void LogicalTxn::endLogicalAction | ( | ) |
Ends the log entry for an action description.
After this, the ByteOutputStream returned from beginLogicalAction is no longer valid.
Definition at line 102 of file LogicalTxn.cpp.
References LogicalTxnSavepoint::cbActionPrev, LogicalTxnSavepoint::cbLogged, pOutputStream, state, STATE_LOGGING_ACTION, STATE_LOGGING_TXN, and svpt.
Referenced by BTreeWriter::deleteCurrent(), describeParticipant(), FtrsTableWriter::execute(), DatabaseTest::executeIncrementAction(), BTreeWriter::insertTupleFromBuffer(), rollbackToSavepoint(), LogicalTxnTest::testActions(), and LogicalTxnTest::testTxn().
00103 { 00104 assert(state == STATE_LOGGING_ACTION); 00105 state = STATE_LOGGING_TXN; 00106 svpt.cbActionPrev = 00107 pOutputStream->getOffset() - svpt.cbLogged; 00108 svpt.cbLogged = pOutputStream->getOffset(); 00109 }
SavepointId LogicalTxn::createSavepoint | ( | ) |
Creates a savepoint representing the current transaction state.
Definition at line 111 of file LogicalTxn.cpp.
References savepoints, state, STATE_LOGGING_TXN, and svpt.
Referenced by LogicalTxnTest::testTxn().
00112 { 00113 assert(state == STATE_LOGGING_TXN); 00114 SavepointId svptId = SavepointId(savepoints.size()); 00115 savepoints.push_back(svpt); 00116 return svptId; 00117 }
void LogicalTxn::commitSavepoint | ( | SavepointId | svptId | ) |
Commits a given savepoint and any later savepoints.
Note that committing a savepoint does not make its actions durable; it just releases any information required to rollback to that savepoint.
svptId | savepoint to commit |
Definition at line 119 of file LogicalTxn.cpp.
References opaqueToInt(), savepoints, state, and STATE_LOGGING_TXN.
00120 { 00121 assert(state == STATE_LOGGING_TXN); 00122 uint iSvpt = opaqueToInt(svptId); 00123 assert(iSvpt < savepoints.size()); 00124 savepoints.resize(iSvpt); 00125 }
void LogicalTxn::rollback | ( | SavepointId const * | pSvptId = NULL |
) |
Aborts the transaction.
pSvptId | Id of the savepoint to which to rollback (or default NULL to rollback and end the entire transaction) |
Definition at line 127 of file LogicalTxn.cpp.
References LogicalTxnSavepoint::cbLogged, CHECKPOINT_DISCARD, forgetAllParticipants(), opaqueToInt(), pLog, pOutputStream, rollbackToSavepoint(), savepoints, SEEK_STREAM_END, state, STATE_LOGGING_TXN, STATE_ROLLED_BACK, STATE_ROLLING_BACK, svpt, and LogicalRecoveryTxn::undoActions().
Referenced by LogicalTxnTest::rollbackFull(), LogicalTxnTest::testCheckpointCommitSavepoint(), and LogicalTxnTest::testRollbackSavepoint().
00128 { 00129 assert(state == STATE_LOGGING_TXN); 00130 if (pSvptId) { 00131 uint iSvpt = opaqueToInt(*pSvptId); 00132 assert(iSvpt < savepoints.size()); 00133 savepoints.resize(iSvpt + 1); 00134 rollbackToSavepoint(savepoints[iSvpt]); 00135 return; 00136 } 00137 00138 // NOTE: this protects against implicit self-delete until end-of-method 00139 SharedLogicalTxn pThis = shared_from_this(); 00140 00141 // do this now so that participants don't try to write to log during 00142 // rollback 00143 forgetAllParticipants(); 00144 00145 SharedSegment pLongLogSegment = pOutputStream->getSegment(); 00146 SharedByteInputStream pInputStream = 00147 pOutputStream->getInputStream(SEEK_STREAM_END); 00148 pOutputStream->close(); 00149 assert(svpt.cbLogged == pInputStream->getOffset()); 00150 00151 { 00152 state = STATE_ROLLING_BACK; 00153 LogicalRecoveryTxn recoveryTxn(pInputStream,NULL); 00154 recoveryTxn.undoActions(svpt); 00155 } 00156 00157 svpt.cbLogged = pInputStream->getOffset(); 00158 state = STATE_ROLLED_BACK; 00159 pInputStream.reset(); 00160 if (pLongLogSegment) { 00161 pLongLogSegment->checkpoint(CHECKPOINT_DISCARD); 00162 pLongLogSegment.reset(); 00163 } 00164 pLog->rollbackTxn(pThis); 00165 pLog.reset(); 00166 pOutputStream.reset(); 00167 }
void LogicalTxn::commit | ( | ) |
Commits the transaction.
Definition at line 169 of file LogicalTxn.cpp.
References forgetAllParticipants(), pLog, state, and STATE_COMMITTED.
Referenced by LogicalTxnTest::commit().
00170 { 00171 // NOTE: this protects against implicit self-delete until end-of-method 00172 SharedLogicalTxn pThis = shared_from_this(); 00173 pLog->commitTxn(pThis); 00174 pLog.reset(); 00175 state = STATE_COMMITTED; 00176 forgetAllParticipants(); 00177 }
SharedLogicalTxnLog LogicalTxn::getLog | ( | ) |
Definition at line 240 of file LogicalTxn.cpp.
References pLog.
00241 { 00242 return pLog; 00243 }
TxnId LogicalTxn::getTxnId | ( | ) | const |
Definition at line 250 of file LogicalTxn.cpp.
References txnId.
Referenced by LogicalTxnTest::testTxnIdSequence().
00251 { 00252 return txnId; 00253 }
bool LogicalTxn::isEnded | ( | ) | const |
Definition at line 245 of file LogicalTxn.cpp.
References state, STATE_COMMITTED, and STATE_ROLLED_BACK.
Referenced by ~LogicalTxn().
00246 { 00247 return state == STATE_ROLLED_BACK || state == STATE_COMMITTED; 00248 }
friend class LogicalTxnLog [friend] |
Definition at line 45 of file LogicalTxn.h.
TxnId LogicalTxn::txnId [private] |
Identifier for this transaction.
Note that this is assigned when the txn is created, so it does not reflect commit order.
Definition at line 67 of file LogicalTxn.h.
Referenced by getTxnId(), and LogicalTxn().
SharedLogicalTxnLog LogicalTxn::pLog [private] |
Log containing this txn.
Definition at line 72 of file LogicalTxn.h.
Referenced by commit(), getLog(), LogicalTxn(), and rollback().
CacheAccessor to use for writing to log.
Definition at line 77 of file LogicalTxn.h.
Referenced by LogicalTxn().
Stream for writing to this transaction's log.
Definition at line 82 of file LogicalTxn.h.
Referenced by beginLogicalAction(), describeParticipant(), endLogicalAction(), LogicalTxn(), rollback(), and rollbackToSavepoint().
State LogicalTxn::state [private] |
Current txn state.
Definition at line 87 of file LogicalTxn.h.
Referenced by beginLogicalAction(), commit(), commitSavepoint(), createSavepoint(), endLogicalAction(), isEnded(), LogicalTxn(), rollback(), and rollbackToSavepoint().
bool LogicalTxn::checkpointed [private] |
Was this txn active at the last checkpoint?
Definition at line 92 of file LogicalTxn.h.
Referenced by LogicalTxn().
LogicalTxnSavepoint LogicalTxn::svpt [private] |
Potential savepoint representing current log position.
Definition at line 97 of file LogicalTxn.h.
Referenced by beginLogicalAction(), createSavepoint(), endLogicalAction(), LogicalTxn(), rollback(), and rollbackToSavepoint().
std::vector<LogicalTxnSavepoint> LogicalTxn::savepoints [private] |
Savepoints previously returned via createSavepoint.
SavepointId is an index into this vector.
Definition at line 103 of file LogicalTxn.h.
Referenced by commitSavepoint(), createSavepoint(), and rollback().
std::vector<SharedLogicalTxnParticipant> LogicalTxn::participants [private] |
Collection of LogicalTxnParticipants which have joined this txn.
Definition at line 108 of file LogicalTxn.h.
Referenced by addParticipant(), describeAllParticipants(), forgetAllParticipants(), rollbackToSavepoint(), and ~LogicalTxn().