00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/txn/LogicalTxnLog.h"
00026 #include "fennel/txn/LogicalTxn.h"
00027 #include "fennel/txn/LogicalTxnStoredStructs.h"
00028 #include "fennel/segment/CrcSegOutputStream.h"
00029 #include "fennel/segment/SpillOutputStream.h"
00030 #include "fennel/segment/SegmentFactory.h"
00031 #include "fennel/common/ByteInputStream.h"
00032 #include "fennel/cache/QuotaCacheAccessor.h"
00033
00034 #include <boost/bind.hpp>
00035 #include <sstream>
00036
00037 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxnLog.cpp#18 $");
00038
00039
00040
00041
00042 LogicalTxnLog::LogicalTxnLog(
00043 SegmentAccessor const &logSegmentAccessorInit,
00044 PseudoUuid const &onlineUuid,
00045 SharedSegmentFactory pSegmentFactoryInit)
00046 : pSegmentFactory(pSegmentFactoryInit),
00047 logSegmentAccessor(logSegmentAccessorInit)
00048 {
00049
00050
00051
00052 logSegmentAccessor.pCacheAccessor = SharedCacheAccessor(
00053 new QuotaCacheAccessor(
00054 SharedQuotaCacheAccessor(),
00055 logSegmentAccessor.pCacheAccessor,
00056 2));
00057
00058
00059
00060 pOutputStream = CrcSegOutputStream::newCrcSegOutputStream(
00061 logSegmentAccessor,onlineUuid);
00062
00063
00064
00065 pOutputStream->setWriteLatency(WRITE_EAGER_SYNC);
00066
00067 pOutputStream->getSegPos(lastCheckpointMemento.logPosition);
00068 lastCheckpointMemento.nUncommittedTxns = 0;
00069 nCommittedBeforeLastCheckpoint = 0;
00070
00071 groupCommitInterval = pSegmentFactory->getConfigMap().getIntParam(
00072 "groupCommitInterval", 30);
00073 }
00074
00075 void LogicalTxnLog::setNextTxnId(TxnId nextTxnIdInit)
00076 {
00077 nextTxnId = nextTxnIdInit;
00078 logSegmentAccessor.pCacheAccessor->setTxnId(nextTxnId);
00079 }
00080
00081 SharedLogicalTxnLog LogicalTxnLog::newLogicalTxnLog(
00082 SegmentAccessor const &logSegmentAccessor,
00083 PseudoUuid const &onlineUuid,
00084 SharedSegmentFactory pSegmentFactory)
00085 {
00086 return SharedLogicalTxnLog(
00087 new LogicalTxnLog(logSegmentAccessor,onlineUuid,pSegmentFactory));
00088 }
00089
00090 LogicalTxnLog::~LogicalTxnLog()
00091 {
00092 assert(uncommittedTxns.empty());
00093 assert(committedLongLogSegments.empty());
00094 }
00095
00096 SharedLogicalTxn LogicalTxnLog::newLogicalTxn(
00097 SharedCacheAccessor pCacheAccessor)
00098 {
00099 StrictMutexGuard mutexGuard(mutex);
00100
00101
00102
00103 pCacheAccessor = SharedCacheAccessor(
00104 new QuotaCacheAccessor(
00105 SharedQuotaCacheAccessor(),
00106 pCacheAccessor,
00107 2));
00108 pCacheAccessor->setTxnId(nextTxnId);
00109 SharedLogicalTxn pTxn(
00110 new LogicalTxn(nextTxnId,shared_from_this(),pCacheAccessor));
00111 uncommittedTxns.push_back(pTxn);
00112 ++nextTxnId;
00113 return pTxn;
00114 }
00115
00116 void LogicalTxnLog::removeTxn(SharedLogicalTxn pTxn)
00117 {
00118 TxnListIter pFound = std::find(
00119 uncommittedTxns.begin(),
00120 uncommittedTxns.end(),
00121 pTxn);
00122 assert(pFound != uncommittedTxns.end());
00123 uncommittedTxns.erase(pFound);
00124 }
00125
00126 void LogicalTxnLog::commitTxn(SharedLogicalTxn pTxn)
00127 {
00128 LogicalTxnEventMemento memento;
00129 memento.event = LogicalTxnEventMemento::EVENT_COMMIT;
00130 memento.txnId = pTxn->txnId;
00131 memento.cbActionLast = pTxn->svpt.cbActionPrev;
00132 memento.nParticipants = pTxn->participants.size();
00133 SharedSegment pSegment = pTxn->pOutputStream->getSegment();
00134 if (pSegment) {
00135 assert(pTxn->pOutputStream.unique());
00136 pTxn->pOutputStream->hardPageBreak();
00137 pTxn->pOutputStream->getSegOutputStream()->getSegPos(
00138 memento.logPosition);
00139 pTxn->pOutputStream.reset();
00140 pSegment->checkpoint(CHECKPOINT_FLUSH_AND_UNMAP);
00141 StrictMutexGuard mutexGuard(mutex);
00142 committedLongLogSegments.push_back(pSegment);
00143 } else {
00144 if (!pTxn->svpt.cbLogged) {
00145
00146
00147
00148 StrictMutexGuard mutexGuard(mutex);
00149 removeTxn(pTxn);
00150 return;
00151 }
00152 CompoundId::setPageId(memento.logPosition.segByteId,NULL_PAGE_ID);
00153 CompoundId::setByteOffset(
00154 memento.logPosition.segByteId,
00155 pTxn->svpt.cbLogged);
00156 memento.logPosition.cbOffset = pTxn->svpt.cbLogged;
00157 }
00158 memento.longLog = pSegment ? true : false;
00159 StrictMutexGuard mutexGuard(mutex);
00160 pOutputStream->writeValue(memento);
00161 if (!pSegment) {
00162 SharedByteInputStream pInputStream =
00163 pTxn->pOutputStream->getInputStream();
00164 uint cbActual;
00165 PConstBuffer pBuffer = pInputStream->getReadPointer(1,&cbActual);
00166 pOutputStream->writeBytes(pBuffer,cbActual);
00167 }
00168
00169 commitTxnWithGroup(mutexGuard);
00170 removeTxn(pTxn);
00171 }
00172
00173 void LogicalTxnLog::commitTxnWithGroup(StrictMutexGuard &mutexGuard)
00174 {
00175 boost::xtime groupCommitExpiration;
00176 if (groupCommitInterval) {
00177 convertTimeout(groupCommitInterval,groupCommitExpiration);
00178 }
00179 SegStreamPosition logPos;
00180 pOutputStream->getSegPos(logPos);
00181 PageId startPageId = CompoundId::getPageId(logPos.segByteId);
00182 for (;;) {
00183 bool timeout = true;
00184 if (groupCommitInterval) {
00185 timeout = !condition.timed_wait(mutexGuard,groupCommitExpiration);
00186
00187 pOutputStream->getSegPos(logPos);
00188 PageId lastPageId = CompoundId::getPageId(logPos.segByteId);
00189 if (lastPageId != startPageId) {
00190
00191 break;
00192 }
00193 }
00194
00195 if (timeout) {
00196
00197
00198
00199
00200
00201 pOutputStream->hardPageBreak();
00202 condition.notify_all();
00203 break;
00204 } else {
00205
00206 }
00207 }
00208 }
00209
00210 void LogicalTxnLog::rollbackTxn(SharedLogicalTxn pTxn)
00211 {
00212 if (!pTxn->checkpointed) {
00213
00214
00215 StrictMutexGuard mutexGuard(mutex);
00216 removeTxn(pTxn);
00217 return;
00218 }
00219
00220
00221 LogicalTxnEventMemento memento;
00222 memento.event = LogicalTxnEventMemento::EVENT_ROLLBACK;
00223 memento.txnId = pTxn->txnId;
00224 memento.cbActionLast = 0;
00225 memento.nParticipants = 0;
00226 CompoundId::setPageId(memento.logPosition.segByteId,NULL_PAGE_ID);
00227 CompoundId::setByteOffset(memento.logPosition.segByteId,0);
00228 memento.logPosition.cbOffset = 0;
00229 memento.longLog = true;
00230 StrictMutexGuard mutexGuard(mutex);
00231 pOutputStream->writeValue(memento);
00232
00233
00234 removeTxn(pTxn);
00235 }
00236
00237 void LogicalTxnLog::checkpoint(
00238 LogicalTxnLogCheckpointMemento &memento,
00239 CheckpointType checkpointType)
00240 {
00241 StrictMutexGuard mutexGuard(mutex);
00242 if (checkpointType == CHECKPOINT_DISCARD) {
00243 uncommittedTxns.clear();
00244 committedLongLogSegments.clear();
00245 return;
00246 }
00247 pOutputStream->getSegPos(memento.logPosition);
00248 memento.nUncommittedTxns = uncommittedTxns.size();
00249 memento.nextTxnId = nextTxnId;
00250 std::for_each(
00251 uncommittedTxns.begin(),
00252 uncommittedTxns.end(),
00253 boost::bind(&LogicalTxnLog::checkpointTxn,this,_1));
00254 pOutputStream->hardPageBreak();
00255 logSegmentAccessor.pSegment->checkpoint(checkpointType);
00256 if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00257
00258
00259 std::swap(memento,lastCheckpointMemento);
00260 }
00261 }
00262
00263 void LogicalTxnLog::deallocateCheckpointedLog(
00264 LogicalTxnLogCheckpointMemento const &memento,
00265 CheckpointType checkpointType)
00266 {
00267 PageId lastObsoletePageId =
00268 CompoundId::getPageId(memento.logPosition.segByteId);
00269 if (lastObsoletePageId != FIRST_LINEAR_PAGE_ID) {
00270 assert(lastObsoletePageId != NULL_PAGE_ID);
00271
00272
00273 CompoundId::decBlockNum(lastObsoletePageId);
00274 if (logSegmentAccessor.pSegment->isPageIdAllocated(
00275 lastObsoletePageId))
00276 {
00277 logSegmentAccessor.pSegment->deallocatePageRange(
00278 NULL_PAGE_ID,lastObsoletePageId);
00279 }
00280 }
00281
00282 if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00283 committedLongLogSegments.erase(
00284 committedLongLogSegments.begin(),
00285 committedLongLogSegments.begin() + nCommittedBeforeLastCheckpoint);
00286 } else {
00287 committedLongLogSegments.clear();
00288 }
00289 nCommittedBeforeLastCheckpoint = committedLongLogSegments.size();
00290 }
00291
00292 void LogicalTxnLog::checkpointTxn(SharedLogicalTxn pTxn)
00293 {
00294
00295
00296
00297 LogicalTxnEventMemento memento;
00298 pTxn->describeAllParticipants();
00299 pTxn->pOutputStream->hardPageBreak();
00300 pTxn->pOutputStream->getSegOutputStream()->getSegPos(
00301 memento.logPosition);
00302
00303 pTxn->pOutputStream->getSegment()->checkpoint();
00304 memento.event = LogicalTxnEventMemento::EVENT_CHECKPOINT;
00305 memento.txnId = pTxn->txnId;
00306 memento.cbActionLast = pTxn->svpt.cbActionPrev;
00307 memento.longLog = true;
00308 memento.nParticipants = pTxn->participants.size();
00309 pOutputStream->writeValue(memento);
00310 pTxn->checkpointed = true;
00311 }
00312
00313 TxnId LogicalTxnLog::getOldestActiveTxnId()
00314 {
00315 StrictMutexGuard mutexGuard(mutex);
00316 TxnId oldestTxnId = NULL_TXN_ID;
00317 for (TxnListIter ppTxn = uncommittedTxns.begin();
00318 ppTxn != uncommittedTxns.end();
00319 ++ppTxn)
00320 {
00321 SharedLogicalTxn pTxn = *ppTxn;
00322 if (oldestTxnId == NULL_TXN_ID || pTxn->getTxnId() < oldestTxnId) {
00323 oldestTxnId = pTxn->getTxnId();
00324 }
00325 }
00326
00327
00328
00329 if (oldestTxnId == NULL_TXN_ID) {
00330 return nextTxnId;
00331 } else {
00332 return oldestTxnId;
00333 }
00334 }
00335
00336 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxnLog.cpp#18 $");
00337
00338