CmdInterpreter.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2003-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/CmdInterpreter.h"
00026 #include "fennel/farrago/JavaErrorTarget.h"
00027 #include "fennel/farrago/JavaTraceTarget.h"
00028 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00029 #include "fennel/exec/SimpleExecStreamGovernor.h"
00030 #include "fennel/farrago/ExecStreamBuilder.h"
00031 #include "fennel/cache/CacheParams.h"
00032 #include "fennel/common/ConfigMap.h"
00033 #include "fennel/common/FennelExcn.h"
00034 #include "fennel/common/FennelResource.h"
00035 #include "fennel/common/InvalidParamExcn.h"
00036 #include "fennel/common/Backtrace.h"
00037 #include "fennel/btree/BTreeBuilder.h"
00038 #include "fennel/db/Database.h"
00039 #include "fennel/db/CheckpointThread.h"
00040 #include "fennel/txn/LogicalTxn.h"
00041 #include "fennel/txn/LogicalTxnLog.h"
00042 #include "fennel/tuple/StoredTypeDescriptorFactory.h"
00043 #include "fennel/segment/SegmentFactory.h"
00044 #include "fennel/segment/SnapshotRandomAllocationSegment.h"
00045 #include "fennel/exec/ParallelExecStreamScheduler.h"
00046 #include "fennel/exec/DfsTreeExecStreamScheduler.h"
00047 #include "fennel/exec/ExecStreamGraph.h"
00048 #include "fennel/farrago/ExecStreamFactory.h"
00049 #include "fennel/ftrs/FtrsTableWriterFactory.h"
00050 #include "fennel/btree/BTreeVerifier.h"
00051 
00052 #include <boost/lexical_cast.hpp>
00053 
00054 #include <malloc.h>
00055 
00056 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $");
00057 
00058 int64_t CmdInterpreter::executeCommand(
00059     ProxyCmd &cmd)
00060 {
00061     resultHandle = 0;
00062     // dispatch based on polymorphic command type
00063     FemVisitor::visitTbl.accept(*this,cmd);
00064     return resultHandle;
00065 }
00066 
00067 CmdInterpreter::DbHandle *CmdInterpreter::getDbHandle(
00068     SharedProxyDbHandle pHandle)
00069 {
00070     return reinterpret_cast<DbHandle *>(pHandle->getLongHandle());
00071 }
00072 
00073 CmdInterpreter::TxnHandle *CmdInterpreter::getTxnHandle(
00074     SharedProxyTxnHandle pHandle)
00075 {
00076     return reinterpret_cast<TxnHandle *>(pHandle->getLongHandle());
00077 }
00078 
00079 CmdInterpreter::StreamGraphHandle *CmdInterpreter::getStreamGraphHandle(
00080     SharedProxyStreamGraphHandle pHandle)
00081 {
00082     return reinterpret_cast<StreamGraphHandle *>(pHandle->getLongHandle());
00083 }
00084 
00085 SavepointId CmdInterpreter::getSavepointId(SharedProxySvptHandle pHandle)
00086 {
00087     return SavepointId(pHandle->getLongHandle());
00088 }
00089 
00090 TxnId CmdInterpreter::getCsn(SharedProxyCsnHandle pHandle)
00091 {
00092     return TxnId(pHandle->getLongHandle());
00093 }
00094 
00095 void CmdInterpreter::setDbHandle(
00096     SharedProxyDbHandle,DbHandle *pHandle)
00097 {
00098     resultHandle = reinterpret_cast<int64_t>(pHandle);
00099 }
00100 
00101 void CmdInterpreter::setTxnHandle(
00102     SharedProxyTxnHandle,TxnHandle *pHandle)
00103 {
00104     resultHandle = reinterpret_cast<int64_t>(pHandle);
00105 }
00106 
00107 void CmdInterpreter::setStreamGraphHandle(
00108     SharedProxyStreamGraphHandle,StreamGraphHandle *pHandle)
00109 {
00110     resultHandle = reinterpret_cast<int64_t>(pHandle);
00111 }
00112 
00113 void CmdInterpreter::setExecStreamHandle(
00114     SharedProxyStreamHandle,ExecStream *pStream)
00115 {
00116     resultHandle = reinterpret_cast<int64_t>(pStream);
00117 }
00118 
00119 void CmdInterpreter::setSvptHandle(
00120     SharedProxySvptHandle,SavepointId svptId)
00121 {
00122     resultHandle = opaqueToInt(svptId);
00123 }
00124 
00125 void CmdInterpreter::setCsnHandle(
00126     SharedProxyCsnHandle, TxnId csnId)
00127 {
00128     resultHandle = opaqueToInt(csnId);
00129 }
00130 
00131 CmdInterpreter::DbHandle* CmdInterpreter::newDbHandle()
00132 {
00133     return new DbHandle();
00134 }
00135 
00136 CmdInterpreter::TxnHandle* CmdInterpreter::newTxnHandle()
00137 {
00138     return new TxnHandle();
00139 }
00140 
00141 CmdInterpreter::DbHandle::~DbHandle()
00142 {
00143     statsTimer.stop();
00144 
00145     // close database before trace
00146     if (pDb) {
00147         pDb->close();
00148     }
00149     JniUtil::decrementHandleCount(DBHANDLE_TRACE_TYPE_STR, this);
00150 
00151     JniUtil::shutdown();
00152 }
00153 
00154 CmdInterpreter::TxnHandle::~TxnHandle()
00155 {
00156     JniUtil::decrementHandleCount(TXNHANDLE_TRACE_TYPE_STR, this);
00157 }
00158 
00159 CmdInterpreter::StreamGraphHandle::~StreamGraphHandle()
00160 {
00161     if (javaRuntimeContext) {
00162         JniEnvAutoRef pEnv;
00163         pEnv->DeleteGlobalRef(javaRuntimeContext);
00164     }
00165     JniUtil::decrementHandleCount(STREAMGRAPHHANDLE_TRACE_TYPE_STR, this);
00166 }
00167 
00168 JavaTraceTarget *CmdInterpreter::newTraceTarget()
00169 {
00170     return new JavaTraceTarget();
00171 }
00172 
00173 SharedErrorTarget CmdInterpreter::newErrorTarget(
00174     jobject fennelJavaErrorTarget)
00175 {
00176     SharedErrorTarget errorTarget;
00177     errorTarget.reset(new JavaErrorTarget(fennelJavaErrorTarget));
00178     return errorTarget;
00179 }
00180 
00181 void CmdInterpreter::visit(ProxyCmdOpenDatabase &cmd)
00182 {
00183     ConfigMap configMap;
00184 
00185     SharedProxyDatabaseParam pParam = cmd.getParams();
00186     for (; pParam; ++pParam) {
00187         configMap.setStringParam(pParam->getName(),pParam->getValue());
00188     }
00189 
00190     CacheParams cacheParams;
00191     cacheParams.readConfig(configMap);
00192     SharedCache pCache = Cache::newCache(cacheParams);
00193 
00194     JniUtilParams jniUtilParams;
00195     jniUtilParams.readConfig(configMap);
00196     JniUtil::configure(jniUtilParams);
00197 
00198     DeviceMode openMode = cmd.isCreateDatabase()
00199         ? DeviceMode::createNew
00200         : DeviceMode::load;
00201 
00202     std::auto_ptr<DbHandle> pDbHandle(newDbHandle());
00203     JniUtil::incrementHandleCount(DBHANDLE_TRACE_TYPE_STR, pDbHandle.get());
00204 
00205     JavaTraceTarget *pJavaTraceTarget = newTraceTarget();
00206     pDbHandle->pTraceTarget.reset(pJavaTraceTarget);
00207     // on a fatal error, echo the backtrace to the log file:
00208     AutoBacktrace::setTraceTarget(pDbHandle->pTraceTarget);
00209 
00210     SharedDatabase pDb;
00211     try {
00212         pDb = Database::newDatabase(
00213             pCache,
00214             configMap,
00215             openMode,
00216             pDbHandle->pTraceTarget,
00217             SharedPseudoUuidGenerator(new JniPseudoUuidGenerator()));
00218     } catch (...) {
00219         AutoBacktrace::setTraceTarget();
00220         throw;
00221     }
00222 
00223     pDbHandle->pDb = pDb;
00224 
00225     ExecStreamResourceKnobs knobSettings;
00226     knobSettings.cacheReservePercentage =
00227         configMap.getIntParam("cacheReservePercentage");
00228     knobSettings.expectedConcurrentStatements =
00229         configMap.getIntParam("expectedConcurrentStatements");
00230 
00231     ExecStreamResourceQuantity resourcesAvailable;
00232     resourcesAvailable.nCachePages = pCache->getMaxLockedPages();
00233 
00234     pDbHandle->pResourceGovernor =
00235         SharedExecStreamGovernor(
00236             new SimpleExecStreamGovernor(
00237                 knobSettings, resourcesAvailable,
00238                 pDbHandle->pTraceTarget,
00239                 "xo.resourceGovernor"));
00240 
00241     if (pDb->isRecoveryRequired()) {
00242         SegmentAccessor scratchAccessor =
00243             pDb->getSegmentFactory()->newScratchSegment(pDb->getCache());
00244         FtrsTableWriterFactory recoveryFactory(
00245             pDb,
00246             pDb->getCache(),
00247             pDb->getTypeFactory(),
00248             scratchAccessor);
00249         pDb->recover(recoveryFactory);
00250         cmd.setResultRecoveryRequired(true);
00251     } else {
00252         cmd.setResultRecoveryRequired(false);
00253     }
00254     pDbHandle->statsTimer.setTarget(*pJavaTraceTarget);
00255     pDbHandle->statsTimer.addSource(pDb);
00256     pDbHandle->statsTimer.addSource(pDbHandle->pResourceGovernor);
00257     pDbHandle->statsTimer.start();
00258 
00259     // Cache initialization may have been unable to allocate the requested
00260     // number of pages -- check for this case and report it in the log.
00261     if (pCache->getMaxAllocatedPageCount() != cacheParams.nMemPagesMax ||
00262         pCache->getAllocatedPageCount() != cacheParams.nMemPagesInit)
00263     {
00264         FENNEL_DELEGATE_TRACE(
00265             TRACE_WARNING,
00266             pDb,
00267             "Unable to allocate "
00268             << cacheParams.nMemPagesInit
00269             << " (of "
00270             << cacheParams.nMemPagesMax
00271             << " max) cache pages; allocated "
00272             << pCache->getAllocatedPageCount()
00273             << " cache pages.");
00274     }
00275 
00276     setDbHandle(cmd.getResultHandle(),pDbHandle.release());
00277 }
00278 
00279 void CmdInterpreter::visit(ProxyCmdCloseDatabase &cmd)
00280 {
00281     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00282     pDbHandle->pResourceGovernor.reset();
00283     AutoBacktrace::setTraceTarget();
00284     deleteAndNullify(pDbHandle);
00285 }
00286 
00287 void CmdInterpreter::visit(ProxyCmdCheckpoint &cmd)
00288 {
00289     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00290 
00291     pDbHandle->pDb->requestCheckpoint(
00292         cmd.isFuzzy() ? CHECKPOINT_FLUSH_FUZZY : CHECKPOINT_FLUSH_ALL,
00293         cmd.isAsync());
00294 }
00295 
00296 void CmdInterpreter::visit(ProxyCmdSetParam &cmd)
00297 {
00298     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00299     SharedProxyDatabaseParam pParam = cmd.getParam();
00300 
00301     std::string paramName = pParam->getName();
00302 
00303     if (paramName.compare("cachePagesInit") == 0) {
00304         int pageCount = boost::lexical_cast<int>(pParam->getValue());
00305         SharedCache pCache = pDbHandle->pDb->getCache();
00306         if (pageCount <= 0 || pageCount > pCache->getMaxAllocatedPageCount()) {
00307             throw InvalidParamExcn("1", "'cachePagesMax'");
00308         }
00309 
00310         bool decreasingPageCount = pageCount < pCache->getAllocatedPageCount();
00311         if (decreasingPageCount) {
00312             // Let governor veto a page count decrease
00313             ExecStreamResourceQuantity available;
00314             available.nCachePages = pageCount;
00315             if (!pDbHandle->pResourceGovernor->setResourceAvailability(
00316                     available, EXEC_RESOURCE_CACHE_PAGES))
00317             {
00318                 throw InvalidParamExcn(
00319                     "the number of pages currently assigned (plus reserve)",
00320                     "'cachePagesMax'");
00321             }
00322         }
00323 
00324         pCache->setAllocatedPageCount(pageCount);
00325 
00326         if (!decreasingPageCount) {
00327             // Notify governor of increased page count
00328             ExecStreamResourceQuantity available;
00329             available.nCachePages = pageCount;
00330             bool result =
00331                 pDbHandle->pResourceGovernor->setResourceAvailability(
00332                     available, EXEC_RESOURCE_CACHE_PAGES);
00333             assert(result);
00334         }
00335     } else if (paramName.compare("expectedConcurrentStatements") == 0) {
00336         int nStatements = boost::lexical_cast<int>(pParam->getValue());
00337         SharedCache pCache = pDbHandle->pDb->getCache();
00338         // need to set aside at least 5 pages per statement
00339         if (nStatements <= 0 ||
00340             nStatements > pCache->getMaxLockedPages() / 5)
00341         {
00342             throw InvalidParamExcn("1", "'cachePagesInit/5'");
00343         }
00344         ExecStreamResourceKnobs knob;
00345         knob.expectedConcurrentStatements = nStatements;
00346         pDbHandle->pResourceGovernor->setResourceKnob(
00347             knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS);
00348 
00349     } else if (paramName.compare("cacheReservePercentage") == 0) {
00350         int percent = boost::lexical_cast<int>(pParam->getValue());
00351         if (percent <= 0 || percent > 99) {
00352             throw InvalidParamExcn("1", "99");
00353         }
00354         ExecStreamResourceKnobs knob;
00355         knob.cacheReservePercentage = percent;
00356         if (!pDbHandle->pResourceGovernor->setResourceKnob(
00357             knob, EXEC_KNOB_CACHE_RESERVE_PERCENTAGE))
00358         {
00359             throw InvalidParamExcn(
00360                 "1",
00361                 "a percentage that sets aside fewer pages, to allow for pages already assigned");
00362         }
00363     }
00364 }
00365 
00366 void CmdInterpreter::getBTreeForIndexCmd(
00367     ProxyIndexCmd &cmd,PageId rootPageId,BTreeDescriptor &treeDescriptor)
00368 {
00369     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00370 
00371     readTupleDescriptor(
00372         treeDescriptor.tupleDescriptor,
00373         *(cmd.getTupleDesc()),pTxnHandle->pDb->getTypeFactory());
00374 
00375     CmdInterpreter::readTupleProjection(
00376         treeDescriptor.keyProjection,cmd.getKeyProj());
00377 
00378     treeDescriptor.pageOwnerId = PageOwnerId(cmd.getIndexId());
00379     treeDescriptor.segmentId = SegmentId(cmd.getSegmentId());
00380     treeDescriptor.segmentAccessor.pSegment =
00381         pTxnHandle->pDb->getSegmentById(
00382             treeDescriptor.segmentId,
00383             pTxnHandle->pSnapshotSegment);
00384     treeDescriptor.segmentAccessor.pCacheAccessor = pTxnHandle->pDb->getCache();
00385     treeDescriptor.rootPageId = rootPageId;
00386 }
00387 
00388 void CmdInterpreter::visit(ProxyCmdCreateIndex &cmd)
00389 {
00390     // block checkpoints during this method
00391     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00392     SXMutexSharedGuard actionMutexGuard(
00393         pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00394 
00395     BTreeDescriptor treeDescriptor;
00396     getBTreeForIndexCmd(cmd,NULL_PAGE_ID,treeDescriptor);
00397     BTreeBuilder builder(treeDescriptor);
00398     builder.createEmptyRoot();
00399     resultHandle = opaqueToInt(builder.getRootPageId());
00400 }
00401 
00402 void CmdInterpreter::visit(ProxyCmdTruncateIndex &cmd)
00403 {
00404     dropOrTruncateIndex(cmd, false);
00405 }
00406 
00407 void CmdInterpreter::visit(ProxyCmdDropIndex &cmd)
00408 {
00409     dropOrTruncateIndex(cmd, true);
00410 }
00411 
00412 void CmdInterpreter::visit(ProxyCmdVerifyIndex &cmd)
00413 {
00414     // block checkpoints during this method
00415     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00416     SXMutexSharedGuard actionMutexGuard(
00417         pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00418 
00419     BTreeDescriptor treeDescriptor;
00420     getBTreeForIndexCmd(cmd,PageId(cmd.getRootPageId()),treeDescriptor);
00421     TupleProjection leafPageIdProj;
00422     if (cmd.getLeafPageIdProj()) {
00423         CmdInterpreter::readTupleProjection(
00424             leafPageIdProj, cmd.getLeafPageIdProj());
00425     }
00426     bool estimate = cmd.isEstimate();
00427     bool includeTuples = cmd.isIncludeTuples();
00428     bool keys = (!estimate);
00429     bool leaf = ((!estimate) || includeTuples);
00430     BTreeVerifier verifier(treeDescriptor);
00431     verifier.verify(true, keys, leaf);
00432     BTreeStatistics statistics = verifier.getStatistics();
00433     long pageCount = statistics.nNonLeafNodes + statistics.nLeafNodes;
00434     if (includeTuples) {
00435         pageCount += statistics.nTuples;
00436     }
00437     cmd.setResultPageCount(pageCount);
00438 
00439     if (keys) {
00440         cmd.setResultUniqueKeyCount(statistics.nUniqueKeys);
00441     } else {
00442         cmd.clearResultUniqueKeyCount();
00443     }
00444 }
00445 
00446 void CmdInterpreter::dropOrTruncateIndex(
00447     ProxyCmdDropIndex &cmd, bool drop)
00448 {
00449     // block checkpoints during this method
00450     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00451     SXMutexSharedGuard actionMutexGuard(
00452         pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00453 
00454     BTreeDescriptor treeDescriptor;
00455     getBTreeForIndexCmd(cmd,PageId(cmd.getRootPageId()),treeDescriptor);
00456     TupleProjection leafPageIdProj;
00457     if (cmd.getLeafPageIdProj()) {
00458         CmdInterpreter::readTupleProjection(
00459             leafPageIdProj, cmd.getLeafPageIdProj());
00460     }
00461     BTreeBuilder builder(treeDescriptor);
00462     builder.truncate(drop, leafPageIdProj.size() ? &leafPageIdProj : NULL);
00463 }
00464 
00465 void CmdInterpreter::visit(ProxyCmdBeginTxn &cmd)
00466 {
00467     beginTxn(cmd, cmd.isReadOnly(), NULL_TXN_ID);
00468 }
00469 
00470 void CmdInterpreter::beginTxn(ProxyBeginTxnCmd &cmd, bool readOnly, TxnId csn)
00471 {
00472     assert(readOnly || csn == NULL_TXN_ID);
00473 
00474     // block checkpoints during this method
00475     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00476     SharedDatabase pDb = pDbHandle->pDb;
00477 
00478     SXMutexSharedGuard actionMutexGuard(
00479         pDb->getCheckpointThread()->getActionMutex());
00480 
00481     std::auto_ptr<TxnHandle> pTxnHandle(newTxnHandle());
00482     JniUtil::incrementHandleCount(TXNHANDLE_TRACE_TYPE_STR, pTxnHandle.get());
00483     pTxnHandle->pDb = pDb;
00484     pTxnHandle->readOnly = readOnly;
00485     // TODO:  CacheAccessor factory
00486     pTxnHandle->pTxn = pDb->getTxnLog()->newLogicalTxn(pDb->getCache());
00487     pTxnHandle->pResourceGovernor = pDbHandle->pResourceGovernor;
00488 
00489     // NOTE:  use a null scratchAccessor; individual ExecStreamGraphs
00490     // will have their own
00491     SegmentAccessor scratchAccessor;
00492 
00493     pTxnHandle->pFtrsTableWriterFactory = SharedFtrsTableWriterFactory(
00494         new FtrsTableWriterFactory(
00495             pDb,
00496             pDb->getCache(),
00497             pDb->getTypeFactory(),
00498             scratchAccessor));
00499 
00500     // If snapshots are enabled, set up 2 snapshot segments -- one of which
00501     // only reads committed data.  This will be used for streams that need to
00502     // read a snapshot of the data before other portions of the stream graph
00503     // have modified the segment.
00504     if (pDb->areSnapshotsEnabled()) {
00505         if (csn == NULL_TXN_ID) {
00506             csn = pTxnHandle->pTxn->getTxnId();
00507         }
00508         pTxnHandle->pSnapshotSegment =
00509             pDb->getSegmentFactory()->newSnapshotRandomAllocationSegment(
00510                 pDb->getDataSegment(),
00511                 pDb->getDataSegment(),
00512                 csn,
00513                 false);
00514         pTxnHandle->pReadCommittedSnapshotSegment =
00515             pDb->getSegmentFactory()->newSnapshotRandomAllocationSegment(
00516                 pDb->getDataSegment(),
00517                 pDb->getDataSegment(),
00518                 csn,
00519                 true);
00520     } else {
00521         assert(csn == NULL_TXN_ID);
00522     }
00523 
00524     setTxnHandle(cmd.getResultHandle(),pTxnHandle.release());
00525 }
00526 
00527 void CmdInterpreter::visit(ProxyCmdBeginTxnWithCsn &cmd)
00528 {
00529     beginTxn(cmd, true, getCsn(cmd.getCsnHandle()));
00530 }
00531 
00532 void CmdInterpreter::visit(ProxyCmdSavepoint &cmd)
00533 {
00534     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00535 
00536     // block checkpoints during this method
00537     SXMutexSharedGuard actionMutexGuard(
00538         pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00539 
00540     setSvptHandle(
00541         cmd.getResultHandle(),
00542         pTxnHandle->pTxn->createSavepoint());
00543 }
00544 
00545 void CmdInterpreter::visit(ProxyCmdCommit &cmd)
00546 {
00547     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00548     SharedDatabase pDb = pTxnHandle->pDb;
00549 
00550     // block checkpoints during this method
00551     bool txnBlocksCheckpoint = !pTxnHandle->readOnly && pDb->shouldForceTxns();
00552     SXMutexSharedGuard actionMutexGuard(
00553         pDb->getCheckpointThread()->getActionMutex());
00554 
00555     if (pDb->areSnapshotsEnabled()) {
00556         // Commit the current txn, and start a new one so the versioned
00557         // pages that we're now going to commit will be marked with a txnId
00558         // corresponding to the time of the commit.  At present, those pages
00559         // are marked with a txnId corresponding to the start of the txn.
00560         pTxnHandle->pTxn->commit();
00561         pTxnHandle->pTxn = pDb->getTxnLog()->newLogicalTxn(pDb->getCache());
00562         SnapshotRandomAllocationSegment *pSnapshotSegment =
00563             SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00564                 pTxnHandle->pSnapshotSegment);
00565         TxnId commitTxnId = pTxnHandle->pTxn->getTxnId();
00566         pSnapshotSegment->commitChanges(commitTxnId);
00567 
00568         // Flush pages associated with the snapshot segment.  Note that we
00569         // don't need to flush the underlying versioned segment first since
00570         // the snapshot pages are all new and therefore, are never logged.
00571         // Pages in the underlying versioned segment will be flushed in the
00572         // requestCheckpoint call further below.  Also note that the
00573         // checkpoint is not initiated through the dynamically cast segment
00574         // to ensure that the command is traced if tracing is turned on.
00575         if (txnBlocksCheckpoint) {
00576             pTxnHandle->pSnapshotSegment->checkpoint(CHECKPOINT_FLUSH_ALL);
00577         }
00578     }
00579 
00580     if (cmd.getSvptHandle()) {
00581         SavepointId svptId = getSavepointId(cmd.getSvptHandle());
00582         pTxnHandle->pTxn->commitSavepoint(svptId);
00583     } else {
00584         pTxnHandle->pTxn->commit();
00585         deleteAndNullify(pTxnHandle);
00586         if (txnBlocksCheckpoint) {
00587             // release the checkpoint lock acquired above
00588             actionMutexGuard.unlock();
00589             // force a checkpoint now to flush all data modified by transaction
00590             // to disk; wait for it to complete before reporting the
00591             // transaction as committed
00592             pDb->requestCheckpoint(CHECKPOINT_FLUSH_ALL, false);
00593         }
00594     }
00595 }
00596 
00597 void CmdInterpreter::visit(ProxyCmdRollback &cmd)
00598 {
00599     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00600     SharedDatabase pDb = pTxnHandle->pDb;
00601 
00602     // block checkpoints during this method
00603     bool txnBlocksCheckpoint = !pTxnHandle->readOnly && pDb->shouldForceTxns();
00604     SXMutexSharedGuard actionMutexGuard(
00605         pDb->getCheckpointThread()->getActionMutex());
00606 
00607     if (pDb->areSnapshotsEnabled()) {
00608         SnapshotRandomAllocationSegment *pSegment =
00609             SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00610                 pTxnHandle->pSnapshotSegment);
00611         pSegment->rollbackChanges();
00612     }
00613 
00614     if (cmd.getSvptHandle()) {
00615         SavepointId svptId = getSavepointId(cmd.getSvptHandle());
00616         pTxnHandle->pTxn->rollback(&svptId);
00617     } else {
00618         pTxnHandle->pTxn->rollback();
00619         deleteAndNullify(pTxnHandle);
00620         if (txnBlocksCheckpoint && !pDb->areSnapshotsEnabled()) {
00621             // Implement rollback by simulating crash recovery,
00622             // reverting all pages modified by transaction.  No need
00623             // to do this when snapshots are in use because no permanent
00624             // pages were modified.
00625             pDb->recoverOnline();
00626         }
00627     }
00628 }
00629 
00630 void CmdInterpreter::visit(ProxyCmdGetTxnCsn &cmd)
00631 {
00632     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00633     SharedDatabase pDb = pTxnHandle->pDb;
00634     assert(pDb->areSnapshotsEnabled());
00635     SnapshotRandomAllocationSegment *pSegment =
00636         SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00637             pTxnHandle->pSnapshotSegment);
00638     setCsnHandle(cmd.getResultHandle(), pSegment->getSnapshotCsn());
00639 }
00640 
00641 void CmdInterpreter::visit(ProxyCmdGetLastCommittedTxnId &cmd)
00642 {
00643     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00644     SharedDatabase pDb = pDbHandle->pDb;
00645     setCsnHandle(cmd.getResultHandle(), pDb->getLastCommittedTxnId());
00646 }
00647 
00648 void CmdInterpreter::visit(ProxyCmdCreateExecutionStreamGraph &cmd)
00649 {
00650 #if 0
00651     struct mallinfo minfo = mallinfo();
00652     std::cout << "Number of allocated bytes before stream graph construction = "
00653         << minfo.uordblks << " bytes" << std::endl;
00654 #endif
00655     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00656     SharedDatabase pDb = pTxnHandle->pDb;
00657     SharedExecStreamGraph pGraph =
00658         ExecStreamGraph::newExecStreamGraph();
00659     pGraph->setTxn(pTxnHandle->pTxn);
00660     pGraph->setResourceGovernor(pTxnHandle->pResourceGovernor);
00661     std::auto_ptr<StreamGraphHandle> pStreamGraphHandle(
00662         new StreamGraphHandle());
00663     JniUtil::incrementHandleCount(
00664         STREAMGRAPHHANDLE_TRACE_TYPE_STR, pStreamGraphHandle.get());
00665     pStreamGraphHandle->javaRuntimeContext = NULL;
00666     pStreamGraphHandle->pTxnHandle = pTxnHandle;
00667     pStreamGraphHandle->pExecStreamGraph = pGraph;
00668     pStreamGraphHandle->pExecStreamFactory.reset(
00669         new ExecStreamFactory(
00670             pDb,
00671             pTxnHandle->pFtrsTableWriterFactory,
00672             pStreamGraphHandle.get()));
00673     // When snapshots are enabled, allocate DynamicDelegatingSegments for the
00674     // stream graph so if the stream graph is executed in different txns,
00675     // we can reset the delegate to whatever is the snapshot segment associated
00676     // with the current txn.
00677     if (pDb->areSnapshotsEnabled()) {
00678         pStreamGraphHandle->pSegment =
00679             pDb->getSegmentFactory()->newDynamicDelegatingSegment(
00680                 pTxnHandle->pSnapshotSegment);
00681         pStreamGraphHandle->pReadCommittedSegment =
00682             pDb->getSegmentFactory()->newDynamicDelegatingSegment(
00683                 pTxnHandle->pReadCommittedSnapshotSegment);
00684     }
00685     setStreamGraphHandle(
00686         cmd.getResultHandle(),
00687         pStreamGraphHandle.release());
00688 }
00689 
00690 void CmdInterpreter::visit(ProxyCmdPrepareExecutionStreamGraph &cmd)
00691 {
00692     StreamGraphHandle *pStreamGraphHandle = getStreamGraphHandle(
00693         cmd.getStreamGraphHandle());
00694     TxnHandle *pTxnHandle = pStreamGraphHandle->pTxnHandle;
00695     // NOTE:  sequence is important here
00696     SharedExecStreamScheduler pScheduler;
00697     std::string schedulerName = "xo.scheduler";
00698     if (cmd.getDegreeOfParallelism() == 1) {
00699         pScheduler.reset(
00700             new DfsTreeExecStreamScheduler(
00701                 pTxnHandle->pDb->getSharedTraceTarget(),
00702                 schedulerName));
00703     } else {
00704         pScheduler.reset(
00705             new ParallelExecStreamScheduler(
00706                 pTxnHandle->pDb->getSharedTraceTarget(),
00707                 schedulerName,
00708                 JniUtil::getThreadTracker(),
00709                 cmd.getDegreeOfParallelism()));
00710     }
00711     ExecStreamGraphEmbryo graphEmbryo(
00712         pStreamGraphHandle->pExecStreamGraph,
00713         pScheduler,
00714         pTxnHandle->pDb->getCache(),
00715         pTxnHandle->pDb->getSegmentFactory());
00716     pStreamGraphHandle->pExecStreamFactory->setGraphEmbryo(graphEmbryo);
00717     ExecStreamBuilder streamBuilder(
00718         graphEmbryo,
00719         *(pStreamGraphHandle->pExecStreamFactory));
00720     streamBuilder.buildStreamGraph(cmd, true);
00721     pStreamGraphHandle->pExecStreamFactory.reset();
00722     pStreamGraphHandle->pScheduler = pScheduler;
00723 #if 0
00724     struct mallinfo minfo = mallinfo();
00725     std::cout << "Number of allocated bytes after stream graph construction = "
00726         << minfo.uordblks << " bytes" << std::endl;
00727 #endif
00728 }
00729 
00730 void CmdInterpreter::visit(ProxyCmdCreateStreamHandle &cmd)
00731 {
00732     StreamGraphHandle *pStreamGraphHandle = getStreamGraphHandle(
00733         cmd.getStreamGraphHandle());
00734     SharedExecStream pStream;
00735     if (cmd.isInput()) {
00736         pStream =
00737             pStreamGraphHandle->pExecStreamGraph->findLastStream(
00738             cmd.getStreamName(), 0);
00739     } else {
00740         pStream =
00741             pStreamGraphHandle->pExecStreamGraph->findStream(
00742             cmd.getStreamName());
00743     }
00744 
00745     setExecStreamHandle(
00746         cmd.getResultHandle(),
00747         pStream.get());
00748 }
00749 
00750 PageId CmdInterpreter::StreamGraphHandle::getRoot(PageOwnerId pageOwnerId)
00751 {
00752     JniEnvAutoRef pEnv;
00753     jlong x = opaqueToInt(pageOwnerId);
00754     x = pEnv->CallLongMethod(
00755         javaRuntimeContext,JniUtil::methGetIndexRoot,x);
00756     return PageId(x);
00757 }
00758 
00759 void CmdInterpreter::readTupleDescriptor(
00760     TupleDescriptor &tupleDesc,
00761     ProxyTupleDescriptor &javaTupleDesc,
00762     StoredTypeDescriptorFactory const &typeFactory)
00763 {
00764     tupleDesc.clear();
00765     SharedProxyTupleAttrDescriptor pAttr = javaTupleDesc.getAttrDescriptor();
00766     for (; pAttr; ++pAttr) {
00767         StoredTypeDescriptor const &typeDescriptor =
00768             typeFactory.newDataType(pAttr->getTypeOrdinal());
00769         tupleDesc.push_back(
00770             TupleAttributeDescriptor(
00771                 typeDescriptor,pAttr->isNullable(),pAttr->getByteLength()));
00772     }
00773 }
00774 
00775 void CmdInterpreter::readTupleProjection(
00776     TupleProjection &tupleProj,
00777     SharedProxyTupleProjection pJavaTupleProj)
00778 {
00779     tupleProj.clear();
00780     SharedProxyTupleAttrProjection pAttr = pJavaTupleProj->getAttrProjection();
00781     for (; pAttr; ++pAttr) {
00782         tupleProj.push_back(pAttr->getAttributeIndex());
00783     }
00784 }
00785 
00786 void CmdInterpreter::visit(ProxyCmdAlterSystemDeallocate &cmd)
00787 {
00788     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00789     SharedDatabase pDb = pDbHandle->pDb;
00790     if (!pDb->areSnapshotsEnabled()) {
00791         // Nothing to do if snapshots aren't enabled
00792         return;
00793     } else {
00794         uint64_t paramVal = cmd.getOldestLabelCsn();
00795         TxnId labelCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00796         pDb->deallocateOldPages(labelCsn);
00797     }
00798 }
00799 
00800 void CmdInterpreter::visit(ProxyCmdVersionIndexRoot &cmd)
00801 {
00802     TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00803     SharedDatabase pDb = pTxnHandle->pDb;
00804     assert(pDb->areSnapshotsEnabled());
00805 
00806     SnapshotRandomAllocationSegment *pSnapshotSegment =
00807         SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00808             pTxnHandle->pSnapshotSegment);
00809     pSnapshotSegment->versionPage(
00810         PageId(cmd.getOldRootPageId()),
00811         PageId(cmd.getNewRootPageId()));
00812 }
00813 
00814 void CmdInterpreter::visit(ProxyCmdInitiateBackup &cmd)
00815 {
00816     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00817     SharedDatabase pDb = pDbHandle->pDb;
00818     uint64_t paramVal = cmd.getLowerBoundCsn();
00819     TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00820     FileSize dataDeviceSize;
00821 
00822     volatile bool abortFlag = false;
00823     TxnId csn =
00824         pDb->initiateBackup(
00825             cmd.getBackupPathname(),
00826             cmd.isCheckSpaceRequirements(),
00827             FileSize(cmd.getSpacePadding()),
00828             lowerBoundCsn,
00829             cmd.getCompressionProgram(),
00830             dataDeviceSize,
00831             (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted);
00832     cmd.setResultDataDeviceSize(dataDeviceSize);
00833     setCsnHandle(cmd.getResultHandle(), csn);
00834 }
00835 
00836 void CmdInterpreter::visit(ProxyCmdCompleteBackup &cmd)
00837 {
00838     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00839     SharedDatabase pDb = pDbHandle->pDb;
00840     uint64_t paramVal = cmd.getLowerBoundCsn();
00841     TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00842     volatile bool abortFlag = false;
00843     pDb->completeBackup(
00844         lowerBoundCsn,
00845         TxnId(cmd.getUpperBoundCsn()),
00846         (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted);
00847 }
00848 
00849 void CmdInterpreter::visit(ProxyCmdAbandonBackup &cmd)
00850 {
00851     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00852     SharedDatabase pDb = pDbHandle->pDb;
00853     pDb->abortBackup();
00854 }
00855 
00856 void CmdInterpreter::visit(ProxyCmdRestoreFromBackup &cmd)
00857 {
00858     DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00859     SharedDatabase pDb = pDbHandle->pDb;
00860     uint64_t paramVal = cmd.getLowerBoundCsn();
00861     TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00862     volatile bool abortFlag = false;
00863     pDb->restoreFromBackup(
00864         cmd.getBackupPathname(),
00865         cmd.getFileSize(),
00866         cmd.getCompressionProgram(),
00867         lowerBoundCsn,
00868         TxnId(cmd.getUpperBoundCsn()),
00869         (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted);
00870 }
00871 
00872 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $");
00873 
00874 // End CmdInterpreter.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1