LogicalTxn.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/txn/LogicalTxn.h"
00026 #include "fennel/txn/LogicalTxnParticipant.h"
00027 #include "fennel/segment/SegOutputStream.h"
00028 #include "fennel/segment/SegInputStream.h"
00029 #include "fennel/segment/SpillOutputStream.h"
00030 #include "fennel/txn/LogicalTxnLog.h"
00031 #include "fennel/txn/LogicalRecoveryLog.h"
00032 #include "fennel/txn/LogicalRecoveryTxn.h"
00033 
00034 #include <boost/bind.hpp>
00035 
00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $");
00037 
00038 LogicalTxn::LogicalTxn(
00039     TxnId txnIdInit,
00040     SharedLogicalTxnLog pLogInit,
00041     SharedCacheAccessor pCacheAccessorInit)
00042     : txnId(txnIdInit),
00043       pLog(pLogInit),
00044       pCacheAccessor(pCacheAccessorInit)
00045 {
00046     pOutputStream = SpillOutputStream::newSpillOutputStream(
00047         pLog->pSegmentFactory,
00048         pCacheAccessor,
00049         LogicalRecoveryLog::getLongLogFileName(txnId));
00050 
00051     // REVIEW: We could use something like a WALSegment to keep track of page
00052     // states, eliminating the overhead of a full-cache checkpoint when log is
00053     // committed.
00054     pOutputStream->setWriteLatency(WRITE_EAGER_ASYNC);
00055 
00056     state = STATE_LOGGING_TXN;
00057     svpt.cbActionPrev = 0;
00058     svpt.cbLogged = 0;
00059     checkpointed = false;
00060 }
00061 
00062 LogicalTxn::~LogicalTxn()
00063 {
00064     assert(isEnded());
00065     assert(participants.empty());
00066 }
00067 
00068 void LogicalTxn::addParticipant(SharedLogicalTxnParticipant pParticipant)
00069 {
00070     if (pParticipant->pTxn) {
00071         assert(pParticipant->pTxn == this);
00072         return;
00073     }
00074     participants.push_back(pParticipant);
00075     pParticipant->pTxn = this;
00076     pParticipant->enableLogging(true);
00077     describeParticipant(pParticipant);
00078 }
00079 
00080 ByteOutputStream &LogicalTxn::beginLogicalAction(
00081     LogicalTxnParticipant &participant,
00082     LogicalActionType actionType)
00083 {
00084     assert(participant.pTxn == this);
00085     return beginLogicalAction(&participant,actionType);
00086 }
00087 
00088 ByteOutputStream &LogicalTxn::beginLogicalAction(
00089     LogicalTxnParticipant *pParticipant,
00090     LogicalActionType actionType)
00091 {
00092     assert(state == STATE_LOGGING_TXN);
00093     LogicalTxnActionHeader actionHeader;
00094     actionHeader.pParticipant = pParticipant;
00095     actionHeader.actionType = actionType;
00096     actionHeader.cbActionPrev = svpt.cbActionPrev;
00097     pOutputStream->writeValue(actionHeader);
00098     state = STATE_LOGGING_ACTION;
00099     return *pOutputStream;
00100 }
00101 
00102 void LogicalTxn::endLogicalAction()
00103 {
00104     assert(state == STATE_LOGGING_ACTION);
00105     state = STATE_LOGGING_TXN;
00106     svpt.cbActionPrev =
00107         pOutputStream->getOffset() - svpt.cbLogged;
00108     svpt.cbLogged = pOutputStream->getOffset();
00109 }
00110 
00111 SavepointId LogicalTxn::createSavepoint()
00112 {
00113     assert(state == STATE_LOGGING_TXN);
00114     SavepointId svptId = SavepointId(savepoints.size());
00115     savepoints.push_back(svpt);
00116     return svptId;
00117 }
00118 
00119 void LogicalTxn::commitSavepoint(SavepointId svptId)
00120 {
00121     assert(state == STATE_LOGGING_TXN);
00122     uint iSvpt = opaqueToInt(svptId);
00123     assert(iSvpt < savepoints.size());
00124     savepoints.resize(iSvpt);
00125 }
00126 
00127 void LogicalTxn::rollback(SavepointId const *pSvptId)
00128 {
00129     assert(state == STATE_LOGGING_TXN);
00130     if (pSvptId) {
00131         uint iSvpt = opaqueToInt(*pSvptId);
00132         assert(iSvpt < savepoints.size());
00133         savepoints.resize(iSvpt + 1);
00134         rollbackToSavepoint(savepoints[iSvpt]);
00135         return;
00136     }
00137 
00138     // NOTE:  this protects against implicit self-delete until end-of-method
00139     SharedLogicalTxn pThis = shared_from_this();
00140 
00141     // do this now so that participants don't try to write to log during
00142     // rollback
00143     forgetAllParticipants();
00144 
00145     SharedSegment pLongLogSegment = pOutputStream->getSegment();
00146     SharedByteInputStream pInputStream =
00147         pOutputStream->getInputStream(SEEK_STREAM_END);
00148     pOutputStream->close();
00149     assert(svpt.cbLogged == pInputStream->getOffset());
00150 
00151     {
00152         state = STATE_ROLLING_BACK;
00153         LogicalRecoveryTxn recoveryTxn(pInputStream,NULL);
00154         recoveryTxn.undoActions(svpt);
00155     }
00156 
00157     svpt.cbLogged = pInputStream->getOffset();
00158     state = STATE_ROLLED_BACK;
00159     pInputStream.reset();
00160     if (pLongLogSegment) {
00161         pLongLogSegment->checkpoint(CHECKPOINT_DISCARD);
00162         pLongLogSegment.reset();
00163     }
00164     pLog->rollbackTxn(pThis);
00165     pLog.reset();
00166     pOutputStream.reset();
00167 }
00168 
00169 void LogicalTxn::commit()
00170 {
00171     // NOTE:  this protects against implicit self-delete until end-of-method
00172     SharedLogicalTxn pThis = shared_from_this();
00173     pLog->commitTxn(pThis);
00174     pLog.reset();
00175     state = STATE_COMMITTED;
00176     forgetAllParticipants();
00177 }
00178 
00179 void LogicalTxn::describeAllParticipants()
00180 {
00181     std::for_each(
00182         participants.begin(),
00183         participants.end(),
00184         boost::bind(&LogicalTxn::describeParticipant,this,_1));
00185 }
00186 
00187 void LogicalTxn::describeParticipant(SharedLogicalTxnParticipant pParticipant)
00188 {
00189     beginLogicalAction(*pParticipant,ACTION_TXN_DESCRIBE_PARTICIPANT);
00190     LogicalTxnClassId classId =
00191         pParticipant->getParticipantClassId();
00192     pOutputStream->writeValue(classId);
00193     pParticipant->describeParticipant(*pOutputStream);
00194     endLogicalAction();
00195 }
00196 
00197 void LogicalTxn::forgetAllParticipants()
00198 {
00199     std::for_each(
00200         participants.begin(),
00201         participants.end(),
00202         boost::bind(&LogicalTxnParticipant::clearLogicalTxn,_1));
00203     participants.clear();
00204 }
00205 
00206 void LogicalTxn::rollbackToSavepoint(LogicalTxnSavepoint &oldSvpt)
00207 {
00208     assert(oldSvpt.cbLogged <= svpt.cbLogged);
00209     // disable logging for all participants during rollback
00210     std::for_each(
00211         participants.begin(),
00212         participants.end(),
00213         boost::bind(&LogicalTxnParticipant::enableLogging,_1,false));
00214 
00215     // TODO:  for short logs, could just reuse memory
00216 
00217     SharedByteInputStream pInputStream =
00218         pOutputStream->getInputStream(SEEK_STREAM_END);
00219     assert(svpt.cbLogged == pInputStream->getOffset());
00220     {
00221         state = STATE_ROLLING_BACK;
00222         LogicalRecoveryTxn recoveryTxn(pInputStream,NULL);
00223         recoveryTxn.undoActions(svpt,MAXU,oldSvpt.cbLogged);
00224         state = STATE_LOGGING_TXN;
00225     }
00226     pInputStream.reset();
00227 
00228     // re-enable logging for all participants
00229     std::for_each(
00230         participants.begin(),
00231         participants.end(),
00232         boost::bind(&LogicalTxnParticipant::enableLogging,_1,true));
00233 
00234     // write log entry noting the partial rollback
00235     beginLogicalAction(NULL,ACTION_TXN_ROLLBACK_TO_SAVEPOINT);
00236     pOutputStream->writeValue(oldSvpt);
00237     endLogicalAction();
00238 }
00239 
00240 SharedLogicalTxnLog LogicalTxn::getLog()
00241 {
00242     return pLog;
00243 }
00244 
00245 bool LogicalTxn::isEnded() const
00246 {
00247     return state == STATE_ROLLED_BACK || state == STATE_COMMITTED;
00248 }
00249 
00250 TxnId LogicalTxn::getTxnId() const
00251 {
00252     return txnId;
00253 }
00254 
00255 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $");
00256 
00257 // End LogicalTxn.cpp

Generated on Mon Jun 22 04:00:21 2009 for Fennel by  doxygen 1.5.1