FtrsTableWriter.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/ftrs/FtrsTableWriter.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2003-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/ftrs/FtrsTableWriter.h"
00026 #include "fennel/ftrs/FtrsTableWriterFactory.h"
00027 #include "fennel/btree/BTreeWriter.h"
00028 #include "fennel/tuple/TupleDescriptor.h"
00029 #include "fennel/txn/LogicalTxn.h"
00030 #include "fennel/common/ByteOutputStream.h"
00031 #include "fennel/common/ByteInputStream.h"
00032 #include "fennel/synch/SXMutex.h"
00033 #include "fennel/exec/ExecStreamBufAccessor.h"
00034 
00035 #include <boost/bind.hpp>
00036 #include <numeric>
00037 
00038 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/FtrsTableWriter.cpp#11 $");
00039 
00040 const LogicalActionType FtrsTableWriter::ACTION_INSERT = 1;
00041 
00042 const LogicalActionType FtrsTableWriter::ACTION_DELETE = 2;
00043 
00044 const LogicalActionType FtrsTableWriter::ACTION_UPDATE = 3;
00045 
00046 const LogicalActionType FtrsTableWriter::ACTION_REVERSE_UPDATE = 4;
00047 
00048 FtrsTableWriter::FtrsTableWriter(FtrsTableWriterParams const &params)
00049 {
00050     updateProj = params.updateProj;
00051     pClusteredIndexWriter = NULL;
00052     indexWriters.resize(params.indexParams.size());
00053     std::transform(
00054         indexWriters.begin(),
00055         indexWriters.end(),
00056         params.indexParams.begin(),
00057         indexWriters.begin(),
00058         boost::bind(&FtrsTableWriter::createIndexWriter,this,_1,_2));
00059     assert(pClusteredIndexWriter);
00060 
00061     pTupleData = &(pClusteredIndexWriter->tupleData);
00062 
00063     if (!updateProj.empty()) {
00064         TupleDescriptor tupleDesc =
00065             pClusteredIndexWriter->pWriter->getTupleDescriptor();
00066         for (uint i = 0; i < updateProj.size(); ++i) {
00067             tupleDesc.push_back(tupleDesc[updateProj[i]]);
00068         }
00069         tupleAccessor.compute(tupleDesc);
00070         updateTupleData.compute(tupleDesc);
00071         pTupleData = &updateTupleData;
00072     }
00073 
00074     nAttrs = pClusteredIndexWriter->tupleData.size();
00075 
00076     logBuf.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]);
00077 }
00078 
00079 FtrsTableIndexWriter &FtrsTableWriter::createIndexWriter(
00080     FtrsTableIndexWriter &indexWriter,
00081     FtrsTableIndexWriterParams const &indexParams)
00082 {
00083     indexWriter.pWriter = BTreeExecStream::newWriter(indexParams);
00084     indexWriter.distinctness = indexParams.distinctness;
00085     indexWriter.updateInPlace = indexParams.updateInPlace;
00086     indexWriter.inputProj = indexParams.inputProj;
00087     indexWriter.pRootMap = indexParams.pRootMap;
00088     if (indexWriter.inputProj.size()) {
00089         indexWriter.tupleData.compute(
00090             indexWriter.pWriter->getTupleDescriptor());
00091         // TODO:  TupleProjection folding util
00092         TupleProjection const &keyProj =
00093             indexWriter.pWriter->getKeyProjection();
00094         for (uint i = 0; i < keyProj.size(); ++i) {
00095             indexWriter.inputKeyProj.push_back(
00096                 indexWriter.inputProj[keyProj[i]]);
00097         }
00098     } else {
00099         // TODO:  tuple format?
00100 
00101         // this is the clustered index:  its tuple will drive the other indexes
00102         TupleDescriptor const &clusteredTupleDesc =
00103             indexWriter.pWriter->getTupleDescriptor();
00104         tupleAccessor.compute(clusteredTupleDesc);
00105         indexWriter.tupleData.compute(clusteredTupleDesc);
00106         assert(!pClusteredIndexWriter);
00107         pClusteredIndexWriter = &indexWriter;
00108 
00109         indexWriter.inputKeyProj =
00110             indexWriter.pWriter->getKeyProjection();
00111     }
00112     return indexWriter;
00113 }
00114 
00115 inline bool FtrsTableWriter::searchForIndexKey(
00116     FtrsTableIndexWriter &indexWriter)
00117 {
00118     TupleData &keyData = indexWriter.pWriter->getSearchKeyForWrite();
00119     for (uint i = 0; i < indexWriter.inputKeyProj.size(); ++i) {
00120         keyData[i] =
00121             pClusteredIndexWriter->tupleData[indexWriter.inputKeyProj[i]];
00122     }
00123     return indexWriter.pWriter->searchForKey(keyData,DUP_SEEK_ANY);
00124 }
00125 
00126 inline void FtrsTableWriter::insertIntoIndex(
00127     FtrsTableIndexWriter &indexWriter)
00128 {
00129     for (uint i = 0; i < indexWriter.inputProj.size(); ++i) {
00130         indexWriter.tupleData[i] =
00131             pClusteredIndexWriter->tupleData[indexWriter.inputProj[i]];
00132     }
00133     if (indexWriter.updateInPlace) {
00134         if (searchForIndexKey(indexWriter)) {
00135             if (indexWriter.pWriter->updateCurrent(indexWriter.tupleData)) {
00136                 indexWriter.pWriter->endSearch();
00137                 return;
00138             }
00139             // couldn't update in place:  treat as a deletion+insertion instead
00140             indexWriter.pWriter->deleteCurrent();
00141 
00142             // REVIEW jvs 26-Jun-2007:  doesn't insertTupleData below
00143             // assume that the writer is still positioned?
00144             // So why do we call endSearch here?
00145             indexWriter.pWriter->endSearch();
00146         } else {
00147             // REVIEW:  can this happen?  If so, should we insert?
00148             permAssert(false);
00149         }
00150     }
00151     indexWriter.pWriter->insertTupleData(
00152         indexWriter.tupleData,
00153         indexWriter.distinctness);
00154 }
00155 
00156 inline void FtrsTableWriter::deleteFromIndex(
00157     FtrsTableIndexWriter &indexWriter)
00158 {
00159     if (searchForIndexKey(indexWriter)) {
00160         // REVIEW:  under what circumstances can we assert when the key doesn't
00161         // exist?
00162         indexWriter.pWriter->deleteCurrent();
00163     }
00164     indexWriter.pWriter->endSearch();
00165 }
00166 
00167 inline void FtrsTableWriter::modifySomeIndexes(
00168     LogicalActionType actionType,
00169     IndexWriterVector::iterator &first,
00170     IndexWriterVector::iterator last)
00171 {
00172     switch (actionType) {
00173     case ACTION_INSERT:
00174         for (; first != last; ++first) {
00175             insertIntoIndex(*first);
00176         }
00177         break;
00178     case ACTION_DELETE:
00179         for (; first != last; ++first) {
00180             if (!first->updateInPlace) {
00181                 deleteFromIndex(*first);
00182             }
00183         }
00184         break;
00185     default:
00186         permAssert(false);
00187     }
00188 }
00189 
00190 inline void FtrsTableWriter::modifyAllIndexes(LogicalActionType actionType)
00191 {
00192     IndexWriterVector::iterator first = indexWriters.begin();
00193     IndexWriterVector::iterator current = first;
00194     try {
00195         modifySomeIndexes(actionType,current,indexWriters.end());
00196     } catch (...) {
00197         // In case of exception, carefully roll back only those indexes which
00198         // were already modified.
00199         try {
00200             LogicalActionType compensatingActionType =
00201                 (actionType == ACTION_INSERT) ? ACTION_DELETE : ACTION_INSERT;
00202             modifySomeIndexes(compensatingActionType,first,current);
00203         } catch (...) {
00204             // If this rollback fails, don't allow exception to hide original
00205             // exception.  But TODO:  trace.
00206             permAssert(false);
00207         }
00208         throw;
00209     }
00210 }
00211 
00212 inline void FtrsTableWriter::copyNewValues()
00213 {
00214     for (uint i = 0; i < updateProj.size(); ++i) {
00215         pClusteredIndexWriter->tupleData[updateProj[i]] =
00216             (*pTupleData)[nAttrs + i];
00217     }
00218 }
00219 
00220 inline void FtrsTableWriter::copyOldValues()
00221 {
00222     for (uint i = 0; i < updateProj.size(); ++i) {
00223         pClusteredIndexWriter->tupleData[updateProj[i]] =
00224             (*pTupleData)[updateProj[i]];
00225     }
00226 }
00227 
00228 void FtrsTableWriter::executeUpdate(bool reverse)
00229 {
00230     // copy old values to be deleted
00231     std::copy(
00232         pTupleData->begin(),
00233         pTupleData->begin() + nAttrs,
00234         pClusteredIndexWriter->tupleData.begin());
00235 
00236     if (reverse) {
00237         // for reverse, overlay new values instead
00238         copyNewValues();
00239     }
00240 
00241     modifyAllIndexes(ACTION_DELETE);
00242 
00243     if (reverse) {
00244         // overlay old values to be inserted
00245         copyOldValues();
00246     } else {
00247         // overlay new values to be inserted
00248         copyNewValues();
00249     }
00250 
00251     try {
00252         modifyAllIndexes(ACTION_INSERT);
00253     } catch (...) {
00254         // In case of exception while inserting, put back original row.
00255         try {
00256             if (reverse) {
00257                 copyNewValues();
00258             } else {
00259                 copyOldValues();
00260             }
00261             modifyAllIndexes(ACTION_INSERT);
00262         } catch (...) {
00263             // If this rollback fails, don't allow exception to hide original
00264             // exception.  But TODO:  trace.
00265             permAssert(false);
00266         }
00267         throw;
00268     }
00269 }
00270 
00271 inline void FtrsTableWriter::executeTuple(LogicalActionType actionType)
00272 {
00273     switch (actionType) {
00274     case ACTION_INSERT:
00275     case ACTION_DELETE:
00276         modifyAllIndexes(actionType);
00277         break;
00278     case ACTION_UPDATE:
00279         executeUpdate(false);
00280         break;
00281     case ACTION_REVERSE_UPDATE:
00282         executeUpdate(true);
00283         break;
00284     default:
00285         permAssert(false);
00286     }
00287 }
00288 
00289 RecordNum FtrsTableWriter::execute(
00290     ExecStreamQuantum const &quantum,
00291     ExecStreamBufAccessor &bufAccessor,
00292     LogicalActionType actionType,
00293     SXMutex &actionMutex)
00294 {
00295     // TODO:  assert bufAccessors's output tupledesc and format
00296     // match clustered index
00297 
00298     assert(isLoggingEnabled());
00299 
00300     RecordNum nTuples = 0;
00301     // TODO:  bulk logging?
00302 
00303     TupleAccessor &tupleAccessor = bufAccessor.getConsumptionTupleAccessor();
00304 
00305     do {
00306         if (!bufAccessor.demandData()) {
00307             break;
00308         }
00309         bufAccessor.unmarshalTuple(*pTupleData);
00310 
00311         // Block checkpoints for each atomic operation, including
00312         // execution and logging.  REVIEW:  if lock/unlock overhead is too
00313         // high per-action, could do it only every so many.
00314         SXMutexSharedGuard actionMutexGuard(actionMutex);
00315         executeTuple(actionType);
00316         uint cb = tupleAccessor.getCurrentByteCount();
00317         LogicalTxn *pTxn = getLogicalTxn();
00318         ByteOutputStream &logStream =
00319             pTxn->beginLogicalAction(*this,actionType);
00320         logStream.writeBytes(tupleAccessor.getCurrentTupleBuf(),cb);
00321         pTxn->endLogicalAction();
00322         bufAccessor.consumeTuple();
00323         ++nTuples;
00324     } while (nTuples < quantum.nTuplesMax);
00325 
00326     return nTuples;
00327 }
00328 
00329 LogicalTxnClassId FtrsTableWriter::getParticipantClassId() const
00330 {
00331     return FtrsTableWriterFactory::getParticipantClassId();
00332 }
00333 
00334 void FtrsTableWriter::describeParticipant(
00335     ByteOutputStream &logStream)
00336 {
00337     TupleDescriptor const &clusteredTupleDesc =
00338         pClusteredIndexWriter->pWriter->getTupleDescriptor();
00339     clusteredTupleDesc.writePersistent(logStream);
00340     uint nIndexes = indexWriters.size();
00341     logStream.writeValue(nIndexes);
00342     std::for_each(
00343         indexWriters.begin(),
00344         indexWriters.end(),
00345         boost::bind(&FtrsTableWriter::describeIndex,this,_1,&logStream));
00346     updateProj.writePersistent(logStream);
00347 }
00348 
00349 void FtrsTableWriter::describeIndex(
00350     FtrsTableIndexWriter &indexWriter,
00351     ByteOutputStream *pLogStream)
00352 {
00353     pLogStream->writeValue(indexWriter.pWriter->getSegmentId());
00354     pLogStream->writeValue(indexWriter.pWriter->getPageOwnerId());
00355     pLogStream->writeValue(indexWriter.pWriter->getRootPageId());
00356     pLogStream->writeValue(indexWriter.distinctness);
00357     pLogStream->writeValue(indexWriter.updateInPlace);
00358     indexWriter.inputProj.writePersistent(*pLogStream);
00359     indexWriter.pWriter->getKeyProjection().writePersistent(
00360         *pLogStream);
00361 }
00362 
00363 void FtrsTableWriter::undoLogicalAction(
00364     LogicalActionType actionType,
00365     ByteInputStream &logStream)
00366 {
00367     switch (actionType) {
00368     case ACTION_INSERT:
00369         redoLogicalAction(ACTION_DELETE,logStream);
00370         break;
00371     case ACTION_DELETE:
00372         redoLogicalAction(ACTION_INSERT,logStream);
00373         break;
00374     case ACTION_UPDATE:
00375         redoLogicalAction(ACTION_REVERSE_UPDATE,logStream);
00376         break;
00377     default:
00378         permAssert(false);
00379     }
00380 }
00381 
00382 void FtrsTableWriter::redoLogicalAction(
00383     LogicalActionType actionType,
00384     ByteInputStream &logStream)
00385 {
00386     uint cbMin = tupleAccessor.getMinByteCount();
00387     uint cbActual = logStream.readBytes(logBuf.get(), cbMin);
00388     assert(cbMin == cbActual);
00389     tupleAccessor.setCurrentTupleBuf(logBuf.get());
00390     uint cb = tupleAccessor.getCurrentByteCount();
00391     uint cbRemainder = cb - cbMin;
00392     if (cbRemainder) {
00393       cbActual = logStream.readBytes(logBuf.get() + cbMin, cbRemainder);
00394       assert(cbActual == cbRemainder);
00395     }
00396     // TODO:  for delete, only need to unmarshal union of keys
00397     tupleAccessor.unmarshal(*pTupleData);
00398     executeTuple(actionType);
00399 }
00400 
00401 PageOwnerId FtrsTableWriter::getTableId()
00402 {
00403     return pClusteredIndexWriter->pWriter->getPageOwnerId();
00404 }
00405 
00406 uint FtrsTableWriter::getIndexCount() const
00407 {
00408     return indexWriters.size();
00409 }
00410 
00411 void FtrsTableWriter::openIndexWriters()
00412 {
00413     for (uint i = 0; i < indexWriters.size(); ++i) {
00414         FtrsTableIndexWriter &indexWriter = indexWriters[i];
00415         if (!indexWriter.pRootMap) {
00416             continue;
00417         }
00418         PageId rootPageId = indexWriter.pRootMap->getRoot(
00419             indexWriter.pWriter->getPageOwnerId());
00420         indexWriter.pWriter->setRootPageId(rootPageId);
00421     }
00422 }
00423 
00424 void FtrsTableWriter::closeIndexWriters()
00425 {
00426     for (uint i = 0; i < indexWriters.size(); ++i) {
00427         FtrsTableIndexWriter &indexWriter = indexWriters[i];
00428         indexWriter.pWriter->releaseScratchBuffers();
00429         // REVIEW: since this FtrsTableWriter may be reused by rollback, we
00430         // can't fully close it.  But we should find a way to do so at end of
00431         // transaction.
00432 #if 0
00433         if (indexWriter.pRootMap) {
00434             indexWriter.pWriter->setRootPageId(NULL_PAGE_ID);
00435         }
00436 #endif
00437     }
00438 }
00439 
00440 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/FtrsTableWriter.cpp#11 $");
00441 
00442 // End FtrsTableWriter.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1