LogicalTxnLog.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/txn/LogicalTxnLog.cpp#18 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/txn/LogicalTxnLog.h"
00026 #include "fennel/txn/LogicalTxn.h"
00027 #include "fennel/txn/LogicalTxnStoredStructs.h"
00028 #include "fennel/segment/CrcSegOutputStream.h"
00029 #include "fennel/segment/SpillOutputStream.h"
00030 #include "fennel/segment/SegmentFactory.h"
00031 #include "fennel/common/ByteInputStream.h"
00032 #include "fennel/cache/QuotaCacheAccessor.h"
00033 
00034 #include <boost/bind.hpp>
00035 #include <sstream>
00036 
00037 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxnLog.cpp#18 $");
00038 
00039 // TODO:  factor out the management of long log segments via a separate
00040 // extensibility interface
00041 
00042 LogicalTxnLog::LogicalTxnLog(
00043     SegmentAccessor const &logSegmentAccessorInit,
00044     PseudoUuid const &onlineUuid,
00045     SharedSegmentFactory pSegmentFactoryInit)
00046     : pSegmentFactory(pSegmentFactoryInit),
00047       logSegmentAccessor(logSegmentAccessorInit)
00048 {
00049     // Set up cache accessor so that all page locks will be taken out
00050     // with a reserved TxnId.  Just for sanity-checking, set up a quota to make
00051     // sure logging never locks more than two pages at a time.
00052     logSegmentAccessor.pCacheAccessor = SharedCacheAccessor(
00053         new QuotaCacheAccessor(
00054             SharedQuotaCacheAccessor(),
00055             logSegmentAccessor.pCacheAccessor,
00056             2));
00057 
00058     // TODO: Support an option to skip CRC's for optimized non-durable logging.
00059     // Also support a paranoid option for recording CRC's for long logs.
00060     pOutputStream = CrcSegOutputStream::newCrcSegOutputStream(
00061         logSegmentAccessor,onlineUuid);
00062 
00063     // NOTE:  We only write one page at a time to the main log, and we always
00064     // need to wait after each page.  So request synchronous writes.
00065     pOutputStream->setWriteLatency(WRITE_EAGER_SYNC);
00066 
00067     pOutputStream->getSegPos(lastCheckpointMemento.logPosition);
00068     lastCheckpointMemento.nUncommittedTxns = 0;
00069     nCommittedBeforeLastCheckpoint = 0;
00070 
00071     groupCommitInterval = pSegmentFactory->getConfigMap().getIntParam(
00072         "groupCommitInterval", 30);
00073 }
00074 
00075 void LogicalTxnLog::setNextTxnId(TxnId nextTxnIdInit)
00076 {
00077     nextTxnId = nextTxnIdInit;
00078     logSegmentAccessor.pCacheAccessor->setTxnId(nextTxnId);
00079 }
00080 
00081 SharedLogicalTxnLog LogicalTxnLog::newLogicalTxnLog(
00082     SegmentAccessor const &logSegmentAccessor,
00083     PseudoUuid const &onlineUuid,
00084     SharedSegmentFactory pSegmentFactory)
00085 {
00086     return SharedLogicalTxnLog(
00087         new LogicalTxnLog(logSegmentAccessor,onlineUuid,pSegmentFactory));
00088 }
00089 
00090 LogicalTxnLog::~LogicalTxnLog()
00091 {
00092     assert(uncommittedTxns.empty());
00093     assert(committedLongLogSegments.empty());
00094 }
00095 
00096 SharedLogicalTxn LogicalTxnLog::newLogicalTxn(
00097     SharedCacheAccessor pCacheAccessor)
00098 {
00099     StrictMutexGuard mutexGuard(mutex);
00100     // Set up cache accessor so that all page locks will be taken out
00101     // with the new TxnId.  Just for sanity-checking, set up a quota to make
00102     // sure logging never locks more than two pages at a time.
00103     pCacheAccessor = SharedCacheAccessor(
00104         new QuotaCacheAccessor(
00105             SharedQuotaCacheAccessor(),
00106             pCacheAccessor,
00107             2));
00108     pCacheAccessor->setTxnId(nextTxnId);
00109     SharedLogicalTxn pTxn(
00110         new LogicalTxn(nextTxnId,shared_from_this(),pCacheAccessor));
00111     uncommittedTxns.push_back(pTxn);
00112     ++nextTxnId;
00113     return pTxn;
00114 }
00115 
00116 void LogicalTxnLog::removeTxn(SharedLogicalTxn pTxn)
00117 {
00118     TxnListIter pFound = std::find(
00119         uncommittedTxns.begin(),
00120         uncommittedTxns.end(),
00121         pTxn);
00122     assert(pFound != uncommittedTxns.end());
00123     uncommittedTxns.erase(pFound);
00124 }
00125 
00126 void LogicalTxnLog::commitTxn(SharedLogicalTxn pTxn)
00127 {
00128     LogicalTxnEventMemento memento;
00129     memento.event = LogicalTxnEventMemento::EVENT_COMMIT;
00130     memento.txnId = pTxn->txnId;
00131     memento.cbActionLast = pTxn->svpt.cbActionPrev;
00132     memento.nParticipants = pTxn->participants.size();
00133     SharedSegment pSegment = pTxn->pOutputStream->getSegment();
00134     if (pSegment) {
00135         assert(pTxn->pOutputStream.unique());
00136         pTxn->pOutputStream->hardPageBreak();
00137         pTxn->pOutputStream->getSegOutputStream()->getSegPos(
00138             memento.logPosition);
00139         pTxn->pOutputStream.reset();
00140         pSegment->checkpoint(CHECKPOINT_FLUSH_AND_UNMAP);
00141         StrictMutexGuard mutexGuard(mutex);
00142         committedLongLogSegments.push_back(pSegment);
00143     } else {
00144         if (!pTxn->svpt.cbLogged) {
00145             // NOTE jvs 27-Feb-2006: "empty commit" is an important
00146             // optimization for queries in autocommit mode, where JDBC
00147             // specifies a commit whenever a cursor is closed.
00148             StrictMutexGuard mutexGuard(mutex);
00149             removeTxn(pTxn);
00150             return;
00151         }
00152         CompoundId::setPageId(memento.logPosition.segByteId,NULL_PAGE_ID);
00153         CompoundId::setByteOffset(
00154             memento.logPosition.segByteId,
00155             pTxn->svpt.cbLogged);
00156         memento.logPosition.cbOffset = pTxn->svpt.cbLogged;
00157     }
00158     memento.longLog = pSegment ? true : false;
00159     StrictMutexGuard mutexGuard(mutex);
00160     pOutputStream->writeValue(memento);
00161     if (!pSegment) {
00162         SharedByteInputStream pInputStream =
00163             pTxn->pOutputStream->getInputStream();
00164         uint cbActual;
00165         PConstBuffer pBuffer = pInputStream->getReadPointer(1,&cbActual);
00166         pOutputStream->writeBytes(pBuffer,cbActual);
00167     }
00168 
00169     commitTxnWithGroup(mutexGuard);
00170     removeTxn(pTxn);
00171 }
00172 
00173 void LogicalTxnLog::commitTxnWithGroup(StrictMutexGuard &mutexGuard)
00174 {
00175     boost::xtime groupCommitExpiration;
00176     if (groupCommitInterval) {
00177         convertTimeout(groupCommitInterval,groupCommitExpiration);
00178     }
00179     SegStreamPosition logPos;
00180     pOutputStream->getSegPos(logPos);
00181     PageId startPageId = CompoundId::getPageId(logPos.segByteId);
00182     for (;;) {
00183         bool timeout = true;
00184         if (groupCommitInterval) {
00185             timeout = !condition.timed_wait(mutexGuard,groupCommitExpiration);
00186 
00187             pOutputStream->getSegPos(logPos);
00188             PageId lastPageId = CompoundId::getPageId(logPos.segByteId);
00189             if (lastPageId != startPageId) {
00190                 // someone else has flushed for us
00191                 break;
00192             }
00193         }
00194 
00195         if (timeout) {
00196             // timeout:  we're in charge of flushing
00197 
00198             // NOTE:  Since we're using synchronous writes, there's no need to
00199             // checkpoint (assuming the underlying device has been correctly
00200             // initialized to write through).
00201             pOutputStream->hardPageBreak();
00202             condition.notify_all();
00203             break;
00204         } else {
00205             // spurious wakeup:  go 'round again
00206         }
00207     }
00208 }
00209 
00210 void LogicalTxnLog::rollbackTxn(SharedLogicalTxn pTxn)
00211 {
00212     if (!pTxn->checkpointed) {
00213         // we never stored a checkpoint record for this txn, so during recovery
00214         // it can be ignored entirely
00215         StrictMutexGuard mutexGuard(mutex);
00216         removeTxn(pTxn);
00217         return;
00218     }
00219     // otherwise, write an EVENT_ROLLBACK so that the txn's fate is known
00220     // during recovery (eliminating the need for multiple passes over the log)
00221     LogicalTxnEventMemento memento;
00222     memento.event = LogicalTxnEventMemento::EVENT_ROLLBACK;
00223     memento.txnId = pTxn->txnId;
00224     memento.cbActionLast = 0;
00225     memento.nParticipants = 0;
00226     CompoundId::setPageId(memento.logPosition.segByteId,NULL_PAGE_ID);
00227     CompoundId::setByteOffset(memento.logPosition.segByteId,0);
00228     memento.logPosition.cbOffset = 0;
00229     memento.longLog = true;
00230     StrictMutexGuard mutexGuard(mutex);
00231     pOutputStream->writeValue(memento);
00232     // no need for group commit since caller doesn't need to wait for
00233     // commit confirmation
00234     removeTxn(pTxn);
00235 }
00236 
00237 void LogicalTxnLog::checkpoint(
00238     LogicalTxnLogCheckpointMemento &memento,
00239     CheckpointType checkpointType)
00240 {
00241     StrictMutexGuard mutexGuard(mutex);
00242     if (checkpointType == CHECKPOINT_DISCARD) {
00243         uncommittedTxns.clear();
00244         committedLongLogSegments.clear();
00245         return;
00246     }
00247     pOutputStream->getSegPos(memento.logPosition);
00248     memento.nUncommittedTxns = uncommittedTxns.size();
00249     memento.nextTxnId = nextTxnId;
00250     std::for_each(
00251         uncommittedTxns.begin(),
00252         uncommittedTxns.end(),
00253         boost::bind(&LogicalTxnLog::checkpointTxn,this,_1));
00254     pOutputStream->hardPageBreak();
00255     logSegmentAccessor.pSegment->checkpoint(checkpointType);
00256     if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00257         // memento gets lastCheckpointMemento, and lastCheckpointMemento gets
00258         // new memento just created above
00259         std::swap(memento,lastCheckpointMemento);
00260     }
00261 }
00262 
00263 void LogicalTxnLog::deallocateCheckpointedLog(
00264     LogicalTxnLogCheckpointMemento const &memento,
00265     CheckpointType checkpointType)
00266 {
00267     PageId lastObsoletePageId =
00268         CompoundId::getPageId(memento.logPosition.segByteId);
00269     if (lastObsoletePageId != FIRST_LINEAR_PAGE_ID) {
00270         assert(lastObsoletePageId != NULL_PAGE_ID);
00271         // Segment::deallocatePageRange is inclusive, so decrement to
00272         // exclude the checkpoint page
00273         CompoundId::decBlockNum(lastObsoletePageId);
00274         if (logSegmentAccessor.pSegment->isPageIdAllocated(
00275                 lastObsoletePageId))
00276         {
00277             logSegmentAccessor.pSegment->deallocatePageRange(
00278                 NULL_PAGE_ID,lastObsoletePageId);
00279         }
00280     }
00281 
00282     if (checkpointType == CHECKPOINT_FLUSH_FUZZY) {
00283         committedLongLogSegments.erase(
00284             committedLongLogSegments.begin(),
00285             committedLongLogSegments.begin() + nCommittedBeforeLastCheckpoint);
00286     } else {
00287         committedLongLogSegments.clear();
00288     }
00289     nCommittedBeforeLastCheckpoint = committedLongLogSegments.size();
00290 }
00291 
00292 void LogicalTxnLog::checkpointTxn(SharedLogicalTxn pTxn)
00293 {
00294     // NOTE: hardPageBreak will automatically convert small txns into large
00295     // ones.  It would probably be better to record their incomplete state in
00296     // the main log instead.
00297     LogicalTxnEventMemento memento;
00298     pTxn->describeAllParticipants();
00299     pTxn->pOutputStream->hardPageBreak();
00300     pTxn->pOutputStream->getSegOutputStream()->getSegPos(
00301         memento.logPosition);
00302     // TODO:  see previous comment on pLogSegment->checkpoint()
00303     pTxn->pOutputStream->getSegment()->checkpoint();
00304     memento.event = LogicalTxnEventMemento::EVENT_CHECKPOINT;
00305     memento.txnId = pTxn->txnId;
00306     memento.cbActionLast = pTxn->svpt.cbActionPrev;
00307     memento.longLog = true;
00308     memento.nParticipants = pTxn->participants.size();
00309     pOutputStream->writeValue(memento);
00310     pTxn->checkpointed = true;
00311 }
00312 
00313 TxnId LogicalTxnLog::getOldestActiveTxnId()
00314 {
00315     StrictMutexGuard mutexGuard(mutex);
00316     TxnId oldestTxnId = NULL_TXN_ID;
00317     for (TxnListIter ppTxn = uncommittedTxns.begin();
00318         ppTxn != uncommittedTxns.end();
00319         ++ppTxn)
00320     {
00321         SharedLogicalTxn pTxn = *ppTxn;
00322         if (oldestTxnId == NULL_TXN_ID || pTxn->getTxnId() < oldestTxnId) {
00323             oldestTxnId = pTxn->getTxnId();
00324         }
00325     }
00326 
00327     // If there are no active txns, return the txnId that will be assigned to
00328     // the next, new txn
00329     if (oldestTxnId == NULL_TXN_ID) {
00330         return nextTxnId;
00331     } else {
00332         return oldestTxnId;
00333     }
00334 }
00335 
00336 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxnLog.cpp#18 $");
00337 
00338 // End LogicalTxnLog.cpp

Generated on Mon Jun 22 04:00:21 2009 for Fennel by  doxygen 1.5.1