00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/CmdInterpreter.h"
00026 #include "fennel/farrago/JavaErrorTarget.h"
00027 #include "fennel/farrago/JavaTraceTarget.h"
00028 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00029 #include "fennel/exec/SimpleExecStreamGovernor.h"
00030 #include "fennel/farrago/ExecStreamBuilder.h"
00031 #include "fennel/cache/CacheParams.h"
00032 #include "fennel/common/ConfigMap.h"
00033 #include "fennel/common/FennelExcn.h"
00034 #include "fennel/common/FennelResource.h"
00035 #include "fennel/common/InvalidParamExcn.h"
00036 #include "fennel/common/Backtrace.h"
00037 #include "fennel/btree/BTreeBuilder.h"
00038 #include "fennel/db/Database.h"
00039 #include "fennel/db/CheckpointThread.h"
00040 #include "fennel/txn/LogicalTxn.h"
00041 #include "fennel/txn/LogicalTxnLog.h"
00042 #include "fennel/tuple/StoredTypeDescriptorFactory.h"
00043 #include "fennel/segment/SegmentFactory.h"
00044 #include "fennel/segment/SnapshotRandomAllocationSegment.h"
00045 #include "fennel/exec/ParallelExecStreamScheduler.h"
00046 #include "fennel/exec/DfsTreeExecStreamScheduler.h"
00047 #include "fennel/exec/ExecStreamGraph.h"
00048 #include "fennel/farrago/ExecStreamFactory.h"
00049 #include "fennel/ftrs/FtrsTableWriterFactory.h"
00050 #include "fennel/btree/BTreeVerifier.h"
00051
00052 #include <boost/lexical_cast.hpp>
00053
00054 #include <malloc.h>
00055
00056 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $");
00057
00058 int64_t CmdInterpreter::executeCommand(
00059 ProxyCmd &cmd)
00060 {
00061 resultHandle = 0;
00062
00063 FemVisitor::visitTbl.accept(*this,cmd);
00064 return resultHandle;
00065 }
00066
00067 CmdInterpreter::DbHandle *CmdInterpreter::getDbHandle(
00068 SharedProxyDbHandle pHandle)
00069 {
00070 return reinterpret_cast<DbHandle *>(pHandle->getLongHandle());
00071 }
00072
00073 CmdInterpreter::TxnHandle *CmdInterpreter::getTxnHandle(
00074 SharedProxyTxnHandle pHandle)
00075 {
00076 return reinterpret_cast<TxnHandle *>(pHandle->getLongHandle());
00077 }
00078
00079 CmdInterpreter::StreamGraphHandle *CmdInterpreter::getStreamGraphHandle(
00080 SharedProxyStreamGraphHandle pHandle)
00081 {
00082 return reinterpret_cast<StreamGraphHandle *>(pHandle->getLongHandle());
00083 }
00084
00085 SavepointId CmdInterpreter::getSavepointId(SharedProxySvptHandle pHandle)
00086 {
00087 return SavepointId(pHandle->getLongHandle());
00088 }
00089
00090 TxnId CmdInterpreter::getCsn(SharedProxyCsnHandle pHandle)
00091 {
00092 return TxnId(pHandle->getLongHandle());
00093 }
00094
00095 void CmdInterpreter::setDbHandle(
00096 SharedProxyDbHandle,DbHandle *pHandle)
00097 {
00098 resultHandle = reinterpret_cast<int64_t>(pHandle);
00099 }
00100
00101 void CmdInterpreter::setTxnHandle(
00102 SharedProxyTxnHandle,TxnHandle *pHandle)
00103 {
00104 resultHandle = reinterpret_cast<int64_t>(pHandle);
00105 }
00106
00107 void CmdInterpreter::setStreamGraphHandle(
00108 SharedProxyStreamGraphHandle,StreamGraphHandle *pHandle)
00109 {
00110 resultHandle = reinterpret_cast<int64_t>(pHandle);
00111 }
00112
00113 void CmdInterpreter::setExecStreamHandle(
00114 SharedProxyStreamHandle,ExecStream *pStream)
00115 {
00116 resultHandle = reinterpret_cast<int64_t>(pStream);
00117 }
00118
00119 void CmdInterpreter::setSvptHandle(
00120 SharedProxySvptHandle,SavepointId svptId)
00121 {
00122 resultHandle = opaqueToInt(svptId);
00123 }
00124
00125 void CmdInterpreter::setCsnHandle(
00126 SharedProxyCsnHandle, TxnId csnId)
00127 {
00128 resultHandle = opaqueToInt(csnId);
00129 }
00130
00131 CmdInterpreter::DbHandle* CmdInterpreter::newDbHandle()
00132 {
00133 return new DbHandle();
00134 }
00135
00136 CmdInterpreter::TxnHandle* CmdInterpreter::newTxnHandle()
00137 {
00138 return new TxnHandle();
00139 }
00140
00141 CmdInterpreter::DbHandle::~DbHandle()
00142 {
00143 statsTimer.stop();
00144
00145
00146 if (pDb) {
00147 pDb->close();
00148 }
00149 JniUtil::decrementHandleCount(DBHANDLE_TRACE_TYPE_STR, this);
00150
00151 JniUtil::shutdown();
00152 }
00153
00154 CmdInterpreter::TxnHandle::~TxnHandle()
00155 {
00156 JniUtil::decrementHandleCount(TXNHANDLE_TRACE_TYPE_STR, this);
00157 }
00158
00159 CmdInterpreter::StreamGraphHandle::~StreamGraphHandle()
00160 {
00161 if (javaRuntimeContext) {
00162 JniEnvAutoRef pEnv;
00163 pEnv->DeleteGlobalRef(javaRuntimeContext);
00164 }
00165 JniUtil::decrementHandleCount(STREAMGRAPHHANDLE_TRACE_TYPE_STR, this);
00166 }
00167
00168 JavaTraceTarget *CmdInterpreter::newTraceTarget()
00169 {
00170 return new JavaTraceTarget();
00171 }
00172
00173 SharedErrorTarget CmdInterpreter::newErrorTarget(
00174 jobject fennelJavaErrorTarget)
00175 {
00176 SharedErrorTarget errorTarget;
00177 errorTarget.reset(new JavaErrorTarget(fennelJavaErrorTarget));
00178 return errorTarget;
00179 }
00180
00181 void CmdInterpreter::visit(ProxyCmdOpenDatabase &cmd)
00182 {
00183 ConfigMap configMap;
00184
00185 SharedProxyDatabaseParam pParam = cmd.getParams();
00186 for (; pParam; ++pParam) {
00187 configMap.setStringParam(pParam->getName(),pParam->getValue());
00188 }
00189
00190 CacheParams cacheParams;
00191 cacheParams.readConfig(configMap);
00192 SharedCache pCache = Cache::newCache(cacheParams);
00193
00194 JniUtilParams jniUtilParams;
00195 jniUtilParams.readConfig(configMap);
00196 JniUtil::configure(jniUtilParams);
00197
00198 DeviceMode openMode = cmd.isCreateDatabase()
00199 ? DeviceMode::createNew
00200 : DeviceMode::load;
00201
00202 std::auto_ptr<DbHandle> pDbHandle(newDbHandle());
00203 JniUtil::incrementHandleCount(DBHANDLE_TRACE_TYPE_STR, pDbHandle.get());
00204
00205 JavaTraceTarget *pJavaTraceTarget = newTraceTarget();
00206 pDbHandle->pTraceTarget.reset(pJavaTraceTarget);
00207
00208 AutoBacktrace::setTraceTarget(pDbHandle->pTraceTarget);
00209
00210 SharedDatabase pDb;
00211 try {
00212 pDb = Database::newDatabase(
00213 pCache,
00214 configMap,
00215 openMode,
00216 pDbHandle->pTraceTarget,
00217 SharedPseudoUuidGenerator(new JniPseudoUuidGenerator()));
00218 } catch (...) {
00219 AutoBacktrace::setTraceTarget();
00220 throw;
00221 }
00222
00223 pDbHandle->pDb = pDb;
00224
00225 ExecStreamResourceKnobs knobSettings;
00226 knobSettings.cacheReservePercentage =
00227 configMap.getIntParam("cacheReservePercentage");
00228 knobSettings.expectedConcurrentStatements =
00229 configMap.getIntParam("expectedConcurrentStatements");
00230
00231 ExecStreamResourceQuantity resourcesAvailable;
00232 resourcesAvailable.nCachePages = pCache->getMaxLockedPages();
00233
00234 pDbHandle->pResourceGovernor =
00235 SharedExecStreamGovernor(
00236 new SimpleExecStreamGovernor(
00237 knobSettings, resourcesAvailable,
00238 pDbHandle->pTraceTarget,
00239 "xo.resourceGovernor"));
00240
00241 if (pDb->isRecoveryRequired()) {
00242 SegmentAccessor scratchAccessor =
00243 pDb->getSegmentFactory()->newScratchSegment(pDb->getCache());
00244 FtrsTableWriterFactory recoveryFactory(
00245 pDb,
00246 pDb->getCache(),
00247 pDb->getTypeFactory(),
00248 scratchAccessor);
00249 pDb->recover(recoveryFactory);
00250 cmd.setResultRecoveryRequired(true);
00251 } else {
00252 cmd.setResultRecoveryRequired(false);
00253 }
00254 pDbHandle->statsTimer.setTarget(*pJavaTraceTarget);
00255 pDbHandle->statsTimer.addSource(pDb);
00256 pDbHandle->statsTimer.addSource(pDbHandle->pResourceGovernor);
00257 pDbHandle->statsTimer.start();
00258
00259
00260
00261 if (pCache->getMaxAllocatedPageCount() != cacheParams.nMemPagesMax ||
00262 pCache->getAllocatedPageCount() != cacheParams.nMemPagesInit)
00263 {
00264 FENNEL_DELEGATE_TRACE(
00265 TRACE_WARNING,
00266 pDb,
00267 "Unable to allocate "
00268 << cacheParams.nMemPagesInit
00269 << " (of "
00270 << cacheParams.nMemPagesMax
00271 << " max) cache pages; allocated "
00272 << pCache->getAllocatedPageCount()
00273 << " cache pages.");
00274 }
00275
00276 setDbHandle(cmd.getResultHandle(),pDbHandle.release());
00277 }
00278
00279 void CmdInterpreter::visit(ProxyCmdCloseDatabase &cmd)
00280 {
00281 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00282 pDbHandle->pResourceGovernor.reset();
00283 AutoBacktrace::setTraceTarget();
00284 deleteAndNullify(pDbHandle);
00285 }
00286
00287 void CmdInterpreter::visit(ProxyCmdCheckpoint &cmd)
00288 {
00289 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00290
00291 pDbHandle->pDb->requestCheckpoint(
00292 cmd.isFuzzy() ? CHECKPOINT_FLUSH_FUZZY : CHECKPOINT_FLUSH_ALL,
00293 cmd.isAsync());
00294 }
00295
00296 void CmdInterpreter::visit(ProxyCmdSetParam &cmd)
00297 {
00298 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00299 SharedProxyDatabaseParam pParam = cmd.getParam();
00300
00301 std::string paramName = pParam->getName();
00302
00303 if (paramName.compare("cachePagesInit") == 0) {
00304 int pageCount = boost::lexical_cast<int>(pParam->getValue());
00305 SharedCache pCache = pDbHandle->pDb->getCache();
00306 if (pageCount <= 0 || pageCount > pCache->getMaxAllocatedPageCount()) {
00307 throw InvalidParamExcn("1", "'cachePagesMax'");
00308 }
00309
00310 bool decreasingPageCount = pageCount < pCache->getAllocatedPageCount();
00311 if (decreasingPageCount) {
00312
00313 ExecStreamResourceQuantity available;
00314 available.nCachePages = pageCount;
00315 if (!pDbHandle->pResourceGovernor->setResourceAvailability(
00316 available, EXEC_RESOURCE_CACHE_PAGES))
00317 {
00318 throw InvalidParamExcn(
00319 "the number of pages currently assigned (plus reserve)",
00320 "'cachePagesMax'");
00321 }
00322 }
00323
00324 pCache->setAllocatedPageCount(pageCount);
00325
00326 if (!decreasingPageCount) {
00327
00328 ExecStreamResourceQuantity available;
00329 available.nCachePages = pageCount;
00330 bool result =
00331 pDbHandle->pResourceGovernor->setResourceAvailability(
00332 available, EXEC_RESOURCE_CACHE_PAGES);
00333 assert(result);
00334 }
00335 } else if (paramName.compare("expectedConcurrentStatements") == 0) {
00336 int nStatements = boost::lexical_cast<int>(pParam->getValue());
00337 SharedCache pCache = pDbHandle->pDb->getCache();
00338
00339 if (nStatements <= 0 ||
00340 nStatements > pCache->getMaxLockedPages() / 5)
00341 {
00342 throw InvalidParamExcn("1", "'cachePagesInit/5'");
00343 }
00344 ExecStreamResourceKnobs knob;
00345 knob.expectedConcurrentStatements = nStatements;
00346 pDbHandle->pResourceGovernor->setResourceKnob(
00347 knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS);
00348
00349 } else if (paramName.compare("cacheReservePercentage") == 0) {
00350 int percent = boost::lexical_cast<int>(pParam->getValue());
00351 if (percent <= 0 || percent > 99) {
00352 throw InvalidParamExcn("1", "99");
00353 }
00354 ExecStreamResourceKnobs knob;
00355 knob.cacheReservePercentage = percent;
00356 if (!pDbHandle->pResourceGovernor->setResourceKnob(
00357 knob, EXEC_KNOB_CACHE_RESERVE_PERCENTAGE))
00358 {
00359 throw InvalidParamExcn(
00360 "1",
00361 "a percentage that sets aside fewer pages, to allow for pages already assigned");
00362 }
00363 }
00364 }
00365
00366 void CmdInterpreter::getBTreeForIndexCmd(
00367 ProxyIndexCmd &cmd,PageId rootPageId,BTreeDescriptor &treeDescriptor)
00368 {
00369 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00370
00371 readTupleDescriptor(
00372 treeDescriptor.tupleDescriptor,
00373 *(cmd.getTupleDesc()),pTxnHandle->pDb->getTypeFactory());
00374
00375 CmdInterpreter::readTupleProjection(
00376 treeDescriptor.keyProjection,cmd.getKeyProj());
00377
00378 treeDescriptor.pageOwnerId = PageOwnerId(cmd.getIndexId());
00379 treeDescriptor.segmentId = SegmentId(cmd.getSegmentId());
00380 treeDescriptor.segmentAccessor.pSegment =
00381 pTxnHandle->pDb->getSegmentById(
00382 treeDescriptor.segmentId,
00383 pTxnHandle->pSnapshotSegment);
00384 treeDescriptor.segmentAccessor.pCacheAccessor = pTxnHandle->pDb->getCache();
00385 treeDescriptor.rootPageId = rootPageId;
00386 }
00387
00388 void CmdInterpreter::visit(ProxyCmdCreateIndex &cmd)
00389 {
00390
00391 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00392 SXMutexSharedGuard actionMutexGuard(
00393 pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00394
00395 BTreeDescriptor treeDescriptor;
00396 getBTreeForIndexCmd(cmd,NULL_PAGE_ID,treeDescriptor);
00397 BTreeBuilder builder(treeDescriptor);
00398 builder.createEmptyRoot();
00399 resultHandle = opaqueToInt(builder.getRootPageId());
00400 }
00401
00402 void CmdInterpreter::visit(ProxyCmdTruncateIndex &cmd)
00403 {
00404 dropOrTruncateIndex(cmd, false);
00405 }
00406
00407 void CmdInterpreter::visit(ProxyCmdDropIndex &cmd)
00408 {
00409 dropOrTruncateIndex(cmd, true);
00410 }
00411
00412 void CmdInterpreter::visit(ProxyCmdVerifyIndex &cmd)
00413 {
00414
00415 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00416 SXMutexSharedGuard actionMutexGuard(
00417 pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00418
00419 BTreeDescriptor treeDescriptor;
00420 getBTreeForIndexCmd(cmd,PageId(cmd.getRootPageId()),treeDescriptor);
00421 TupleProjection leafPageIdProj;
00422 if (cmd.getLeafPageIdProj()) {
00423 CmdInterpreter::readTupleProjection(
00424 leafPageIdProj, cmd.getLeafPageIdProj());
00425 }
00426 bool estimate = cmd.isEstimate();
00427 bool includeTuples = cmd.isIncludeTuples();
00428 bool keys = (!estimate);
00429 bool leaf = ((!estimate) || includeTuples);
00430 BTreeVerifier verifier(treeDescriptor);
00431 verifier.verify(true, keys, leaf);
00432 BTreeStatistics statistics = verifier.getStatistics();
00433 long pageCount = statistics.nNonLeafNodes + statistics.nLeafNodes;
00434 if (includeTuples) {
00435 pageCount += statistics.nTuples;
00436 }
00437 cmd.setResultPageCount(pageCount);
00438
00439 if (keys) {
00440 cmd.setResultUniqueKeyCount(statistics.nUniqueKeys);
00441 } else {
00442 cmd.clearResultUniqueKeyCount();
00443 }
00444 }
00445
00446 void CmdInterpreter::dropOrTruncateIndex(
00447 ProxyCmdDropIndex &cmd, bool drop)
00448 {
00449
00450 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00451 SXMutexSharedGuard actionMutexGuard(
00452 pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00453
00454 BTreeDescriptor treeDescriptor;
00455 getBTreeForIndexCmd(cmd,PageId(cmd.getRootPageId()),treeDescriptor);
00456 TupleProjection leafPageIdProj;
00457 if (cmd.getLeafPageIdProj()) {
00458 CmdInterpreter::readTupleProjection(
00459 leafPageIdProj, cmd.getLeafPageIdProj());
00460 }
00461 BTreeBuilder builder(treeDescriptor);
00462 builder.truncate(drop, leafPageIdProj.size() ? &leafPageIdProj : NULL);
00463 }
00464
00465 void CmdInterpreter::visit(ProxyCmdBeginTxn &cmd)
00466 {
00467 beginTxn(cmd, cmd.isReadOnly(), NULL_TXN_ID);
00468 }
00469
00470 void CmdInterpreter::beginTxn(ProxyBeginTxnCmd &cmd, bool readOnly, TxnId csn)
00471 {
00472 assert(readOnly || csn == NULL_TXN_ID);
00473
00474
00475 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00476 SharedDatabase pDb = pDbHandle->pDb;
00477
00478 SXMutexSharedGuard actionMutexGuard(
00479 pDb->getCheckpointThread()->getActionMutex());
00480
00481 std::auto_ptr<TxnHandle> pTxnHandle(newTxnHandle());
00482 JniUtil::incrementHandleCount(TXNHANDLE_TRACE_TYPE_STR, pTxnHandle.get());
00483 pTxnHandle->pDb = pDb;
00484 pTxnHandle->readOnly = readOnly;
00485
00486 pTxnHandle->pTxn = pDb->getTxnLog()->newLogicalTxn(pDb->getCache());
00487 pTxnHandle->pResourceGovernor = pDbHandle->pResourceGovernor;
00488
00489
00490
00491 SegmentAccessor scratchAccessor;
00492
00493 pTxnHandle->pFtrsTableWriterFactory = SharedFtrsTableWriterFactory(
00494 new FtrsTableWriterFactory(
00495 pDb,
00496 pDb->getCache(),
00497 pDb->getTypeFactory(),
00498 scratchAccessor));
00499
00500
00501
00502
00503
00504 if (pDb->areSnapshotsEnabled()) {
00505 if (csn == NULL_TXN_ID) {
00506 csn = pTxnHandle->pTxn->getTxnId();
00507 }
00508 pTxnHandle->pSnapshotSegment =
00509 pDb->getSegmentFactory()->newSnapshotRandomAllocationSegment(
00510 pDb->getDataSegment(),
00511 pDb->getDataSegment(),
00512 csn,
00513 false);
00514 pTxnHandle->pReadCommittedSnapshotSegment =
00515 pDb->getSegmentFactory()->newSnapshotRandomAllocationSegment(
00516 pDb->getDataSegment(),
00517 pDb->getDataSegment(),
00518 csn,
00519 true);
00520 } else {
00521 assert(csn == NULL_TXN_ID);
00522 }
00523
00524 setTxnHandle(cmd.getResultHandle(),pTxnHandle.release());
00525 }
00526
00527 void CmdInterpreter::visit(ProxyCmdBeginTxnWithCsn &cmd)
00528 {
00529 beginTxn(cmd, true, getCsn(cmd.getCsnHandle()));
00530 }
00531
00532 void CmdInterpreter::visit(ProxyCmdSavepoint &cmd)
00533 {
00534 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00535
00536
00537 SXMutexSharedGuard actionMutexGuard(
00538 pTxnHandle->pDb->getCheckpointThread()->getActionMutex());
00539
00540 setSvptHandle(
00541 cmd.getResultHandle(),
00542 pTxnHandle->pTxn->createSavepoint());
00543 }
00544
00545 void CmdInterpreter::visit(ProxyCmdCommit &cmd)
00546 {
00547 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00548 SharedDatabase pDb = pTxnHandle->pDb;
00549
00550
00551 bool txnBlocksCheckpoint = !pTxnHandle->readOnly && pDb->shouldForceTxns();
00552 SXMutexSharedGuard actionMutexGuard(
00553 pDb->getCheckpointThread()->getActionMutex());
00554
00555 if (pDb->areSnapshotsEnabled()) {
00556
00557
00558
00559
00560 pTxnHandle->pTxn->commit();
00561 pTxnHandle->pTxn = pDb->getTxnLog()->newLogicalTxn(pDb->getCache());
00562 SnapshotRandomAllocationSegment *pSnapshotSegment =
00563 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00564 pTxnHandle->pSnapshotSegment);
00565 TxnId commitTxnId = pTxnHandle->pTxn->getTxnId();
00566 pSnapshotSegment->commitChanges(commitTxnId);
00567
00568
00569
00570
00571
00572
00573
00574
00575 if (txnBlocksCheckpoint) {
00576 pTxnHandle->pSnapshotSegment->checkpoint(CHECKPOINT_FLUSH_ALL);
00577 }
00578 }
00579
00580 if (cmd.getSvptHandle()) {
00581 SavepointId svptId = getSavepointId(cmd.getSvptHandle());
00582 pTxnHandle->pTxn->commitSavepoint(svptId);
00583 } else {
00584 pTxnHandle->pTxn->commit();
00585 deleteAndNullify(pTxnHandle);
00586 if (txnBlocksCheckpoint) {
00587
00588 actionMutexGuard.unlock();
00589
00590
00591
00592 pDb->requestCheckpoint(CHECKPOINT_FLUSH_ALL, false);
00593 }
00594 }
00595 }
00596
00597 void CmdInterpreter::visit(ProxyCmdRollback &cmd)
00598 {
00599 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00600 SharedDatabase pDb = pTxnHandle->pDb;
00601
00602
00603 bool txnBlocksCheckpoint = !pTxnHandle->readOnly && pDb->shouldForceTxns();
00604 SXMutexSharedGuard actionMutexGuard(
00605 pDb->getCheckpointThread()->getActionMutex());
00606
00607 if (pDb->areSnapshotsEnabled()) {
00608 SnapshotRandomAllocationSegment *pSegment =
00609 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00610 pTxnHandle->pSnapshotSegment);
00611 pSegment->rollbackChanges();
00612 }
00613
00614 if (cmd.getSvptHandle()) {
00615 SavepointId svptId = getSavepointId(cmd.getSvptHandle());
00616 pTxnHandle->pTxn->rollback(&svptId);
00617 } else {
00618 pTxnHandle->pTxn->rollback();
00619 deleteAndNullify(pTxnHandle);
00620 if (txnBlocksCheckpoint && !pDb->areSnapshotsEnabled()) {
00621
00622
00623
00624
00625 pDb->recoverOnline();
00626 }
00627 }
00628 }
00629
00630 void CmdInterpreter::visit(ProxyCmdGetTxnCsn &cmd)
00631 {
00632 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00633 SharedDatabase pDb = pTxnHandle->pDb;
00634 assert(pDb->areSnapshotsEnabled());
00635 SnapshotRandomAllocationSegment *pSegment =
00636 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00637 pTxnHandle->pSnapshotSegment);
00638 setCsnHandle(cmd.getResultHandle(), pSegment->getSnapshotCsn());
00639 }
00640
00641 void CmdInterpreter::visit(ProxyCmdGetLastCommittedTxnId &cmd)
00642 {
00643 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00644 SharedDatabase pDb = pDbHandle->pDb;
00645 setCsnHandle(cmd.getResultHandle(), pDb->getLastCommittedTxnId());
00646 }
00647
00648 void CmdInterpreter::visit(ProxyCmdCreateExecutionStreamGraph &cmd)
00649 {
00650 #if 0
00651 struct mallinfo minfo = mallinfo();
00652 std::cout << "Number of allocated bytes before stream graph construction = "
00653 << minfo.uordblks << " bytes" << std::endl;
00654 #endif
00655 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00656 SharedDatabase pDb = pTxnHandle->pDb;
00657 SharedExecStreamGraph pGraph =
00658 ExecStreamGraph::newExecStreamGraph();
00659 pGraph->setTxn(pTxnHandle->pTxn);
00660 pGraph->setResourceGovernor(pTxnHandle->pResourceGovernor);
00661 std::auto_ptr<StreamGraphHandle> pStreamGraphHandle(
00662 new StreamGraphHandle());
00663 JniUtil::incrementHandleCount(
00664 STREAMGRAPHHANDLE_TRACE_TYPE_STR, pStreamGraphHandle.get());
00665 pStreamGraphHandle->javaRuntimeContext = NULL;
00666 pStreamGraphHandle->pTxnHandle = pTxnHandle;
00667 pStreamGraphHandle->pExecStreamGraph = pGraph;
00668 pStreamGraphHandle->pExecStreamFactory.reset(
00669 new ExecStreamFactory(
00670 pDb,
00671 pTxnHandle->pFtrsTableWriterFactory,
00672 pStreamGraphHandle.get()));
00673
00674
00675
00676
00677 if (pDb->areSnapshotsEnabled()) {
00678 pStreamGraphHandle->pSegment =
00679 pDb->getSegmentFactory()->newDynamicDelegatingSegment(
00680 pTxnHandle->pSnapshotSegment);
00681 pStreamGraphHandle->pReadCommittedSegment =
00682 pDb->getSegmentFactory()->newDynamicDelegatingSegment(
00683 pTxnHandle->pReadCommittedSnapshotSegment);
00684 }
00685 setStreamGraphHandle(
00686 cmd.getResultHandle(),
00687 pStreamGraphHandle.release());
00688 }
00689
00690 void CmdInterpreter::visit(ProxyCmdPrepareExecutionStreamGraph &cmd)
00691 {
00692 StreamGraphHandle *pStreamGraphHandle = getStreamGraphHandle(
00693 cmd.getStreamGraphHandle());
00694 TxnHandle *pTxnHandle = pStreamGraphHandle->pTxnHandle;
00695
00696 SharedExecStreamScheduler pScheduler;
00697 std::string schedulerName = "xo.scheduler";
00698 if (cmd.getDegreeOfParallelism() == 1) {
00699 pScheduler.reset(
00700 new DfsTreeExecStreamScheduler(
00701 pTxnHandle->pDb->getSharedTraceTarget(),
00702 schedulerName));
00703 } else {
00704 pScheduler.reset(
00705 new ParallelExecStreamScheduler(
00706 pTxnHandle->pDb->getSharedTraceTarget(),
00707 schedulerName,
00708 JniUtil::getThreadTracker(),
00709 cmd.getDegreeOfParallelism()));
00710 }
00711 ExecStreamGraphEmbryo graphEmbryo(
00712 pStreamGraphHandle->pExecStreamGraph,
00713 pScheduler,
00714 pTxnHandle->pDb->getCache(),
00715 pTxnHandle->pDb->getSegmentFactory());
00716 pStreamGraphHandle->pExecStreamFactory->setGraphEmbryo(graphEmbryo);
00717 ExecStreamBuilder streamBuilder(
00718 graphEmbryo,
00719 *(pStreamGraphHandle->pExecStreamFactory));
00720 streamBuilder.buildStreamGraph(cmd, true);
00721 pStreamGraphHandle->pExecStreamFactory.reset();
00722 pStreamGraphHandle->pScheduler = pScheduler;
00723 #if 0
00724 struct mallinfo minfo = mallinfo();
00725 std::cout << "Number of allocated bytes after stream graph construction = "
00726 << minfo.uordblks << " bytes" << std::endl;
00727 #endif
00728 }
00729
00730 void CmdInterpreter::visit(ProxyCmdCreateStreamHandle &cmd)
00731 {
00732 StreamGraphHandle *pStreamGraphHandle = getStreamGraphHandle(
00733 cmd.getStreamGraphHandle());
00734 SharedExecStream pStream;
00735 if (cmd.isInput()) {
00736 pStream =
00737 pStreamGraphHandle->pExecStreamGraph->findLastStream(
00738 cmd.getStreamName(), 0);
00739 } else {
00740 pStream =
00741 pStreamGraphHandle->pExecStreamGraph->findStream(
00742 cmd.getStreamName());
00743 }
00744
00745 setExecStreamHandle(
00746 cmd.getResultHandle(),
00747 pStream.get());
00748 }
00749
00750 PageId CmdInterpreter::StreamGraphHandle::getRoot(PageOwnerId pageOwnerId)
00751 {
00752 JniEnvAutoRef pEnv;
00753 jlong x = opaqueToInt(pageOwnerId);
00754 x = pEnv->CallLongMethod(
00755 javaRuntimeContext,JniUtil::methGetIndexRoot,x);
00756 return PageId(x);
00757 }
00758
00759 void CmdInterpreter::readTupleDescriptor(
00760 TupleDescriptor &tupleDesc,
00761 ProxyTupleDescriptor &javaTupleDesc,
00762 StoredTypeDescriptorFactory const &typeFactory)
00763 {
00764 tupleDesc.clear();
00765 SharedProxyTupleAttrDescriptor pAttr = javaTupleDesc.getAttrDescriptor();
00766 for (; pAttr; ++pAttr) {
00767 StoredTypeDescriptor const &typeDescriptor =
00768 typeFactory.newDataType(pAttr->getTypeOrdinal());
00769 tupleDesc.push_back(
00770 TupleAttributeDescriptor(
00771 typeDescriptor,pAttr->isNullable(),pAttr->getByteLength()));
00772 }
00773 }
00774
00775 void CmdInterpreter::readTupleProjection(
00776 TupleProjection &tupleProj,
00777 SharedProxyTupleProjection pJavaTupleProj)
00778 {
00779 tupleProj.clear();
00780 SharedProxyTupleAttrProjection pAttr = pJavaTupleProj->getAttrProjection();
00781 for (; pAttr; ++pAttr) {
00782 tupleProj.push_back(pAttr->getAttributeIndex());
00783 }
00784 }
00785
00786 void CmdInterpreter::visit(ProxyCmdAlterSystemDeallocate &cmd)
00787 {
00788 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00789 SharedDatabase pDb = pDbHandle->pDb;
00790 if (!pDb->areSnapshotsEnabled()) {
00791
00792 return;
00793 } else {
00794 uint64_t paramVal = cmd.getOldestLabelCsn();
00795 TxnId labelCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00796 pDb->deallocateOldPages(labelCsn);
00797 }
00798 }
00799
00800 void CmdInterpreter::visit(ProxyCmdVersionIndexRoot &cmd)
00801 {
00802 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle());
00803 SharedDatabase pDb = pTxnHandle->pDb;
00804 assert(pDb->areSnapshotsEnabled());
00805
00806 SnapshotRandomAllocationSegment *pSnapshotSegment =
00807 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>(
00808 pTxnHandle->pSnapshotSegment);
00809 pSnapshotSegment->versionPage(
00810 PageId(cmd.getOldRootPageId()),
00811 PageId(cmd.getNewRootPageId()));
00812 }
00813
00814 void CmdInterpreter::visit(ProxyCmdInitiateBackup &cmd)
00815 {
00816 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00817 SharedDatabase pDb = pDbHandle->pDb;
00818 uint64_t paramVal = cmd.getLowerBoundCsn();
00819 TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00820 FileSize dataDeviceSize;
00821
00822 volatile bool abortFlag = false;
00823 TxnId csn =
00824 pDb->initiateBackup(
00825 cmd.getBackupPathname(),
00826 cmd.isCheckSpaceRequirements(),
00827 FileSize(cmd.getSpacePadding()),
00828 lowerBoundCsn,
00829 cmd.getCompressionProgram(),
00830 dataDeviceSize,
00831 (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted);
00832 cmd.setResultDataDeviceSize(dataDeviceSize);
00833 setCsnHandle(cmd.getResultHandle(), csn);
00834 }
00835
00836 void CmdInterpreter::visit(ProxyCmdCompleteBackup &cmd)
00837 {
00838 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00839 SharedDatabase pDb = pDbHandle->pDb;
00840 uint64_t paramVal = cmd.getLowerBoundCsn();
00841 TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00842 volatile bool abortFlag = false;
00843 pDb->completeBackup(
00844 lowerBoundCsn,
00845 TxnId(cmd.getUpperBoundCsn()),
00846 (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted);
00847 }
00848
00849 void CmdInterpreter::visit(ProxyCmdAbandonBackup &cmd)
00850 {
00851 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00852 SharedDatabase pDb = pDbHandle->pDb;
00853 pDb->abortBackup();
00854 }
00855
00856 void CmdInterpreter::visit(ProxyCmdRestoreFromBackup &cmd)
00857 {
00858 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle());
00859 SharedDatabase pDb = pDbHandle->pDb;
00860 uint64_t paramVal = cmd.getLowerBoundCsn();
00861 TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal);
00862 volatile bool abortFlag = false;
00863 pDb->restoreFromBackup(
00864 cmd.getBackupPathname(),
00865 cmd.getFileSize(),
00866 cmd.getCompressionProgram(),
00867 lowerBoundCsn,
00868 TxnId(cmd.getUpperBoundCsn()),
00869 (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted);
00870 }
00871
00872 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $");
00873
00874