LcsClusterReplaceExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterReplaceExecStream.h"
00024 #include "fennel/lucidera/colstore/LcsClusterReader.h"
00025 #include "fennel/segment/SegmentFactory.h"
00026 #include "fennel/btree/BTreeBuilder.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $");
00030 
00031 void LcsClusterReplaceExecStream::prepare(
00032     LcsClusterReplaceExecStreamParams const &params)
00033 {
00034     LcsClusterAppendExecStream::prepare(params);
00035     newClusterRootParamId = params.rootPageIdParamId;
00036 
00037     // Save the original root pageId at prepare time because the treeDescriptor
00038     // will be reset at open time with the new cluster's rootPageId
00039     origRootPageId = treeDescriptor.rootPageId;
00040 }
00041 
00042 void LcsClusterReplaceExecStream::initTupleLoadParams(
00043     const TupleProjection &inputProj)
00044 {
00045     numColumns = inputProj.size() - 1;
00046 
00047     projInputTupleDesc.projectFrom(tableColsTupleDesc, inputProj);
00048     projInputTupleData.compute(projInputTupleDesc);
00049 
00050     // Setup the cluster reader to read all columns from the original cluster
00051     // without any pre-fetch
00052     //
00053     // TODO - Extend this class to use pre-fetches when reading from the
00054     // original cluster.  This will require reading ahead from the input
00055     // stream to detect gaps in the rid values and then setting up rid runs
00056     // for each block of missing rids.
00057     pOrigClusterReader =
00058         SharedLcsClusterReader(new LcsClusterReader(treeDescriptor));
00059     TupleProjection proj;
00060     proj.resize(numColumns);
00061     for (uint i = 0; i < numColumns; i++) {
00062         proj[i] = i;
00063     }
00064     pOrigClusterReader->initColumnReaders(numColumns, proj);
00065 
00066     // Setup the objects for accessing just the cluster columns by excluding
00067     // the rid column
00068     std::copy(inputProj.begin() + 1, inputProj.end(), proj.begin());
00069     TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor();
00070     clusterColsTupleAccessor.bind(inputAccessor, proj);
00071     clusterColsTupleDesc.projectFrom(pInAccessor->getTupleDesc(), proj);
00072     clusterColsTupleData.compute(clusterColsTupleDesc);
00073 
00074     attrAccessors.resize(clusterColsTupleDesc.size());
00075     for (uint i = 0; i < clusterColsTupleDesc.size(); i++) {
00076         attrAccessors[i].compute(clusterColsTupleDesc[i]);
00077     }
00078 
00079     origClusterTupleData.computeAndAllocate(clusterColsTupleDesc);
00080 
00081     // setup one tuple descriptor per cluster column
00082     colTupleDesc.reset(new TupleDescriptor[numColumns]);
00083     for (int i = 0; i < numColumns; i++) {
00084         // +1 to skip over the rid column
00085         colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i + 1]]);
00086     }
00087 }
00088 
00089 void LcsClusterReplaceExecStream::getResourceRequirements(
00090     ExecStreamResourceQuantity &minQuantity,
00091     ExecStreamResourceQuantity &optQuantity)
00092 {
00093     LcsClusterAppendExecStream::getResourceRequirements(
00094         minQuantity,
00095         optQuantity);
00096 
00097     // Need to allocate two more pages for the cluster reader that reads
00098     // original cluster values -- one for the rid to pageId btree and another
00099     // for the actual cluster page.
00100     minQuantity.nCachePages += 2;
00101 
00102     optQuantity = minQuantity;
00103 }
00104 
00105 void LcsClusterReplaceExecStream::open(bool restart)
00106 {
00107     newData = false;
00108 
00109     // Need to call this after the setup above because the cluster append
00110     // stream depends on the new cluster being in place
00111     LcsClusterAppendExecStream::open(restart);
00112 
00113     // Determine how many rows are in the original cluster
00114     origNumRows = pOrigClusterReader->getNumRows();
00115 
00116     if (!restart) {
00117         // Save the root pageId in a dynamic parameter so it can be read
00118         // downstream, if a parameter is specified
00119         if (opaqueToInt(newClusterRootParamId) > 0) {
00120             pDynamicParamManager->createParam(
00121                 newClusterRootParamId,
00122                 pInAccessor->getTupleDesc()[0]);
00123         }
00124 
00125         // Retrieve the snapshot segment.  This needs to be done at open time
00126         // because the segment changes across transaction boundaries.
00127         pSnapshotSegment =
00128             SegmentFactory::getSnapshotSegment(
00129                 treeDescriptor.segmentAccessor.pSegment);
00130         assert(pSnapshotSegment != NULL);
00131     }
00132 
00133     if (opaqueToInt(newClusterRootParamId) > 0) {
00134         TupleDatum rootPageIdDatum;
00135         rootPageIdDatum.pData = (PConstBuffer) &(treeDescriptor.rootPageId);
00136         rootPageIdDatum.cbData = sizeof(treeDescriptor.rootPageId);
00137         pDynamicParamManager->writeParam(
00138             newClusterRootParamId,
00139             rootPageIdDatum);
00140     }
00141 
00142     pOrigClusterReader->open();
00143     currLoadRid = LcsRid(0);
00144     currInputRid = LcsRid(MAXU);
00145     needTuple = true;
00146 }
00147 
00148 ExecStreamResult LcsClusterReplaceExecStream::getTupleForLoad()
00149 {
00150     // If the last tuple provided has not been processed yet, then there's no
00151     // work to be done
00152     if (!needTuple) {
00153         return EXECRC_YIELD;
00154     }
00155 
00156     if (pInAccessor->getState() == EXECBUF_EOS) {
00157         // No more input rows, but that doesn't mean we're finished because
00158         // we have to match the number of rows in the original cluster.
00159         // Therefore, if there's a gap at the end of the cluster, read the
00160         // original rows until we read the rid corresponding to the last tuple
00161         // tuple in the original cluster, at which point, we can finally
00162         // say that we're done.  However, if there wasn't at least one new
00163         // row, then there's no need to replace the column.  We can simply
00164         // keep the original.
00165         if (!newData) {
00166             return EXECRC_EOS;
00167         }
00168         if (opaqueToInt(currLoadRid) < origNumRows) {
00169             readOrigClusterRow();
00170             needTuple = false;
00171             // in case this wasn't already called
00172             initLoad();
00173             return EXECRC_YIELD;
00174         } else {
00175             pSnapshotSegment->versionPage(
00176                 origRootPageId,
00177                 treeDescriptor.rootPageId);
00178             return EXECRC_EOS;
00179         }
00180     }
00181 
00182     if (!pInAccessor->demandData()) {
00183         return EXECRC_BUF_UNDERFLOW;
00184     }
00185 
00186     // Create a new rid to pageId btree map for this cluster, once we know
00187     // at least one row is being updated
00188     if (!newData) {
00189         treeDescriptor.rootPageId = NULL_PAGE_ID;
00190         BTreeBuilder builder(
00191             treeDescriptor,
00192             treeDescriptor.segmentAccessor.pSegment);
00193         builder.createEmptyRoot();
00194         treeDescriptor.rootPageId = builder.getRootPageId();
00195         newData = true;
00196     }
00197 
00198     initLoad();
00199 
00200     if (currLoadRid == LcsRid(0) || currLoadRid > currInputRid) {
00201         assert(!pInAccessor->isTupleConsumptionPending());
00202         pInAccessor->unmarshalProjectedTuple(projInputTupleData);
00203         currInputRid =
00204             *reinterpret_cast<LcsRid const *> (projInputTupleData[0].pData);
00205     }
00206 
00207     // If there's a gap between the last input tuple read and the
00208     // current row that needs to be loaded, then read the original
00209     // cluster data; otherwise, unmarshal the last input row read.
00210     if (currInputRid > currLoadRid) {
00211         readOrigClusterRow();
00212     } else {
00213         assert(currInputRid == currLoadRid);
00214         clusterColsTupleAccessor.unmarshal(clusterColsTupleData);
00215     }
00216 
00217     needTuple = false;
00218     return EXECRC_YIELD;
00219 }
00220 
00221 void LcsClusterReplaceExecStream::readOrigClusterRow()
00222 {
00223     origClusterTupleData.resetBuffer();
00224 
00225     // Position to the current rid we want to load.  Then read each of the
00226     // column values, load them into the TupleDataWithBuffer, and then copy
00227     // those TupleDatum's into the TupleData that's used to load the
00228     // cluster.
00229     bool needSync = true;
00230     if (pOrigClusterReader->isPositioned() &&
00231         currLoadRid < pOrigClusterReader->getRangeEndRid())
00232     {
00233         needSync = false;
00234     }
00235     bool rc = pOrigClusterReader->position(currLoadRid);
00236     assert(rc);
00237     for (uint i = 0; i < pOrigClusterReader->nColsToRead; i++) {
00238         if (needSync) {
00239             pOrigClusterReader->clusterCols[i].sync();
00240         }
00241         PBuffer colValue = pOrigClusterReader->clusterCols[i].getCurrentValue();
00242         attrAccessors[i].loadValue(origClusterTupleData[i], colValue);
00243         clusterColsTupleData[i] = origClusterTupleData[i];
00244     }
00245 }
00246 
00247 void LcsClusterReplaceExecStream::postProcessTuple()
00248 {
00249     // Consume the current input tuple if we've completed processing that
00250     // input.
00251     if (currInputRid == currLoadRid) {
00252         LcsClusterAppendExecStream::postProcessTuple();
00253     }
00254     currLoadRid++;
00255     needTuple = true;
00256 }
00257 
00258 void LcsClusterReplaceExecStream::close()
00259 {
00260     LcsClusterAppendExecStream::close();
00261     pOrigClusterReader->close();
00262 }
00263 
00264 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $");
00265 
00266 // End LcsClusterReplaceExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1