LcsRowScanBaseExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 LucidEra, Inc.
00005 // Copyright (C) 2006-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsRowScanBaseExecStream.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025 
00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $");
00027 
00028 LcsRowScanBaseExecStream::LcsRowScanBaseExecStream()
00029 {
00030     nClusters = 0;
00031 }
00032 
00033 void LcsRowScanBaseExecStream::prepare(
00034     LcsRowScanBaseExecStreamParams const &params)
00035 {
00036     ConfluenceExecStream::prepare(params);
00037 
00038     // Copy cluster definition parameters and setup btree readers for each
00039     // cluster.  Also, setup the full output tuple based on the ordered
00040     // list of cluster descriptors.
00041 
00042     nClusters = params.lcsClusterScanDefs.size();
00043     pClusters.reset(new SharedLcsClusterReader[nClusters]);
00044 
00045     uint clusterStart = 0;
00046     uint projCount = 0;
00047     TupleDescriptor allClusterTupleDesc;
00048     TupleProjection newProj, outputProj;
00049 
00050     buildOutputProj(outputProj, params);
00051 
00052     newProj.resize(outputProj.size());
00053 
00054     // if we're projecting non-cluster columns, keep track of them separately;
00055     TupleDescriptor outputTupleDesc = pOutAccessor->getTupleDesc();
00056     for (uint i = 0; i < outputProj.size(); i++) {
00057         if (outputProj[i] == LCS_RID_COLUMN_ID) {
00058             newProj[i] = projCount++;
00059             allClusterTupleDesc.push_back(outputTupleDesc[i]);
00060             nonClusterCols.push_back(params.outputProj[i]);
00061         }
00062     }
00063 
00064     allSpecial = (nonClusterCols.size() == newProj.size());
00065 
00066     for (uint i = 0; i < nClusters; i++) {
00067         SharedLcsClusterReader &pClu = pClusters[i];
00068 
00069         BTreeExecStreamParams const &bTreeParams = params.lcsClusterScanDefs[i];
00070 
00071         BTreeDescriptor treeDescriptor;
00072         treeDescriptor.segmentAccessor.pSegment = bTreeParams.pSegment;
00073         treeDescriptor.segmentAccessor.pCacheAccessor =
00074             bTreeParams.pCacheAccessor;
00075         treeDescriptor.tupleDescriptor = bTreeParams.tupleDesc;
00076         treeDescriptor.keyProjection = bTreeParams.keyProj;
00077         treeDescriptor.rootPageId = bTreeParams.rootPageId;
00078         treeDescriptor.segmentId = bTreeParams.segmentId;
00079         treeDescriptor.pageOwnerId = bTreeParams.pageOwnerId;
00080 
00081         pClu =
00082             SharedLcsClusterReader(
00083                 new LcsClusterReader(treeDescriptor, &ridRuns));
00084 
00085         // setup the cluster and column readers to only read the columns
00086         // that are going to be projected
00087         uint clusterEnd = clusterStart +
00088             params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1;
00089 
00090         // create a vector of the columns that are projected from
00091         // this cluster and recompute the projection list
00092         // based on the individual cluster projections
00093         TupleProjection clusterProj;
00094         for (uint j = 0; j < newProj.size(); j++) {
00095             if (outputProj[j] >= clusterStart &&
00096                 outputProj[j] <= clusterEnd)
00097             {
00098                 clusterProj.push_back(outputProj[j] - clusterStart);
00099                 newProj[j] = projCount++;
00100             }
00101         }
00102         clusterStart = clusterEnd + 1;
00103 
00104         // need to select at least one column from cluster, except in the
00105         // cases where we're only selecting special columns or when there
00106         // are filter columns; in the former case, we'll just arbitrarily
00107         // read the first column, but not actually project it
00108         if (allSpecial) {
00109            clusterProj.push_back(0);
00110         }
00111         pClu->initColumnReaders(
00112             params.lcsClusterScanDefs[i].clusterTupleDesc.size(),
00113             clusterProj);
00114         if (!allSpecial) {
00115             for (uint j = 0; j < pClu->nColsToRead; j++) {
00116                 allClusterTupleDesc.push_back(
00117                     params.lcsClusterScanDefs[i].
00118                         clusterTupleDesc[clusterProj[j]]);
00119             }
00120         }
00121     }
00122 
00123     // setup projected tuple descriptor, by reshuffling allClusterTupleDesc
00124     // built above, into the correct projection order
00125 
00126     for (uint i = 0; i < newProj.size(); i++) {
00127         projDescriptor.push_back(allClusterTupleDesc[newProj[i]]);
00128     }
00129 
00130     // create a projection map to map cluster data read to the output
00131     // projection
00132     projMap.resize(newProj.size());
00133     for (uint i = 0; i < projMap.size(); i++) {
00134         for (uint j = 0; j < newProj.size(); j++) {
00135             if (newProj[j] == i) {
00136                 projMap[i] = j;
00137             }
00138         }
00139     }
00140 }
00141 
00142 void LcsRowScanBaseExecStream::open(bool restart)
00143 {
00144     ConfluenceExecStream::open(restart);
00145     for (uint i = 0; i < nClusters; i++) {
00146         pClusters[i]->open();
00147     }
00148 }
00149 
00150 void LcsRowScanBaseExecStream::getResourceRequirements(
00151     ExecStreamResourceQuantity &minQuantity,
00152     ExecStreamResourceQuantity &optQuantity)
00153 {
00154     ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00155 
00156     // 2 pages per cluster (not taking into account pre-fetches yet)
00157     // - 1 for cluster page
00158     // - 1 for btree page
00159     minQuantity.nCachePages += (nClusters * 2);
00160 
00161     optQuantity = minQuantity;
00162 }
00163 
00164 void LcsRowScanBaseExecStream::closeImpl()
00165 {
00166     ConfluenceExecStream::closeImpl();
00167     for (uint i = 0; i < nClusters; i++) {
00168         pClusters[i]->close();
00169     }
00170 }
00171 
00172 void LcsRowScanBaseExecStream::syncColumns(SharedLcsClusterReader &pScan)
00173 {
00174     for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00175         pScan->clusterCols[iCluCol].sync();
00176     }
00177 }
00178 
00179 bool LcsRowScanBaseExecStream::readColVals(
00180     SharedLcsClusterReader &pScan,
00181     TupleDataWithBuffer &tupleData,
00182     uint colStart)
00183 {
00184     if (!allSpecial) {
00185         for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00186             // Get value of each column and load it to the appropriate
00187             // tuple datum entry
00188             PBuffer curValue = pScan->clusterCols[iCluCol].getCurrentValue();
00189             uint idx = projMap[colStart + iCluCol];
00190 
00191             attrAccessors[idx].loadValue(tupleData[idx], curValue);
00192             if (pScan->clusterCols[iCluCol].getFilters().hasResidualFilters) {
00193                 if (!pScan->clusterCols[iCluCol].applyFilters(
00194                     projDescriptor,
00195                     tupleData))
00196                 {
00197                     return false;
00198                 }
00199             }
00200         }
00201     }
00202     return true;
00203 }
00204 
00205 void LcsRowScanBaseExecStream::buildOutputProj(
00206     TupleProjection &outputProj,
00207     LcsRowScanBaseExecStreamParams const &params)
00208 {
00209     /*
00210      * Copy the projection
00211      */
00212     for (uint i = 0;  i < params.outputProj.size(); i++) {
00213         outputProj.push_back(params.outputProj[i]);
00214     }
00215 }
00216 
00217 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $");
00218 
00219 // End LcsRowScanBaseExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1