00001 /* 00002 // $Id: //open/dev/fennel/ftrs/FtrsTableWriterExecStream.cpp#12 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 1999-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/ftrs/FtrsTableWriterExecStream.h" 00026 #include "fennel/ftrs/FtrsTableWriterFactory.h" 00027 #include "fennel/txn/LogicalTxn.h" 00028 #include "fennel/tuple/StandardTypeDescriptor.h" 00029 #include "fennel/exec/ExecStreamBufAccessor.h" 00030 #include "fennel/exec/ExecStreamGraph.h" 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/FtrsTableWriterExecStream.cpp#12 $"); 00033 00034 FtrsTableWriterExecStream::FtrsTableWriterExecStream() 00035 { 00036 pActionMutex = NULL; 00037 svptId = NULL_SVPT_ID; 00038 } 00039 00040 void FtrsTableWriterExecStream::prepare( 00041 FtrsTableWriterExecStreamParams const ¶ms) 00042 { 00043 ConduitExecStream::prepare(params); 00044 pTableWriter = params.pTableWriterFactory->newTableWriter(params); 00045 actionType = params.actionType; 00046 pActionMutex = params.pActionMutex; 00047 assert(pActionMutex); 00048 00049 TupleAccessor &outputTupleAccessor = 00050 pOutAccessor->getScratchTupleAccessor(); 00051 TupleDescriptor const &outputTupleDesc = pOutAccessor->getTupleDesc(); 00052 outputTuple.compute(outputTupleDesc); 00053 outputTuple[0].pData = reinterpret_cast<PBuffer>(&nTuples); 00054 outputTupleBuffer.reset( 00055 new FixedBuffer[outputTupleAccessor.getMaxByteCount()]); 00056 } 00057 00058 void FtrsTableWriterExecStream::getResourceRequirements( 00059 ExecStreamResourceQuantity &minQuantity, 00060 ExecStreamResourceQuantity &optQuantity) 00061 { 00062 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00063 00064 // REVIEW: update/delete resources 00065 00066 // This is to account for total number of pages needed to perform an 00067 // update on a single index. Pages are only locked for the duration of 00068 // one index update, so they don't need to be charged per index (unless 00069 // we start parallelizing index updates). REVIEW: determine the correct 00070 // number; 4 is just a guess. 00071 minQuantity.nCachePages += 4; 00072 00073 // each BTreeWriter currently needs a private scratch page 00074 minQuantity.nCachePages += pTableWriter->getIndexCount(); 00075 00076 optQuantity = minQuantity; 00077 } 00078 00079 void FtrsTableWriterExecStream::open(bool restart) 00080 { 00081 ConduitExecStream::open(restart); 00082 assert(pTxn); 00083 00084 // REVIEW: close/restart? 00085 00086 // block checkpoints while joining txn 00087 SXMutexSharedGuard actionMutexGuard(*pActionMutex); 00088 pTxn->addParticipant(pTableWriter); 00089 actionMutexGuard.unlock(); 00090 00091 nTuples = 0; 00092 pTableWriter->openIndexWriters(); 00093 isDone = false; 00094 } 00095 00096 ExecStreamResult FtrsTableWriterExecStream::execute( 00097 ExecStreamQuantum const &quantum) 00098 { 00099 if (isDone) { 00100 // already returned final result 00101 pOutAccessor->markEOS(); 00102 return EXECRC_EOS; 00103 } 00104 00105 if (pInAccessor->getState() == EXECBUF_EOS) { 00106 // we've processed all input, so commit what we've written 00107 // and return row count as our output 00108 commitSavepoint(); 00109 00110 // TODO jvs 11-Feb-2006: Other streams (e.g. 00111 // LcsClusterAppendExecStream) need to do something similar, 00112 // so provide some utilities to make it easier. 00113 TupleAccessor &outputTupleAccessor = 00114 pOutAccessor->getScratchTupleAccessor(); 00115 outputTupleAccessor.marshal(outputTuple, outputTupleBuffer.get()); 00116 pOutAccessor->provideBufferForConsumption( 00117 outputTupleBuffer.get(), 00118 outputTupleBuffer.get() 00119 + outputTupleAccessor.getCurrentByteCount()); 00120 isDone = true; 00121 return EXECRC_BUF_OVERFLOW; 00122 } 00123 00124 ExecStreamResult rc = precheckConduitBuffers(); 00125 if (rc != EXECRC_YIELD) { 00126 return rc; 00127 } 00128 00129 if (svptId == NULL_SVPT_ID) { 00130 createSavepoint(); 00131 } 00132 00133 try { 00134 nTuples += pTableWriter->execute( 00135 quantum, *pInAccessor, actionType, *pActionMutex); 00136 } catch (...) { 00137 try { 00138 rollbackSavepoint(); 00139 } catch (...) { 00140 // TODO: trace failed rollback 00141 } 00142 throw; 00143 } 00144 00145 if (!pInAccessor->isConsumptionPossible()) { 00146 return EXECRC_BUF_UNDERFLOW; 00147 } else { 00148 return EXECRC_QUANTUM_EXPIRED; 00149 } 00150 } 00151 00152 ExecStreamBufProvision 00153 FtrsTableWriterExecStream::getOutputBufProvision() const 00154 { 00155 return BUFPROV_PRODUCER; 00156 } 00157 00158 void FtrsTableWriterExecStream::closeImpl() 00159 { 00160 if (svptId != NULL_SVPT_ID) { 00161 rollbackSavepoint(); 00162 } 00163 ConduitExecStream::closeImpl(); 00164 if (pTableWriter) { 00165 pTableWriter->closeIndexWriters(); 00166 } 00167 } 00168 00169 void FtrsTableWriterExecStream::createSavepoint() 00170 { 00171 // block checkpoints while creating savepoint 00172 SXMutexSharedGuard actionMutexGuard(*pActionMutex); 00173 svptId = pTxn->createSavepoint(); 00174 } 00175 00176 void FtrsTableWriterExecStream::commitSavepoint() 00177 { 00178 if (svptId == NULL_SVPT_ID) { 00179 return; 00180 } 00181 00182 SavepointId svptIdCopy = svptId; 00183 svptId = NULL_SVPT_ID; 00184 00185 // block checkpoints while committing savepoint 00186 SXMutexSharedGuard actionMutexGuard(*pActionMutex); 00187 pTxn->commitSavepoint(svptIdCopy); 00188 } 00189 00190 void FtrsTableWriterExecStream::rollbackSavepoint() 00191 { 00192 if (svptId == NULL_SVPT_ID) { 00193 return; 00194 } 00195 00196 SavepointId svptIdCopy = svptId; 00197 svptId = NULL_SVPT_ID; 00198 00199 // block checkpoints while rolling back savepoint 00200 SXMutexSharedGuard actionMutexGuard(*pActionMutex); 00201 pTxn->rollback(&svptIdCopy); 00202 pTxn->commitSavepoint(svptIdCopy); 00203 } 00204 00205 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/FtrsTableWriterExecStream.cpp#12 $"); 00206 00207 // End FtrsTableWriterExecStream.cpp