LcsMultiClusterAppendTest.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/test/LcsMultiClusterAppendTest.cpp#18 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/common/FemEnums.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00026 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00027 #include "fennel/btree/BTreeBuilder.h"
00028 #include "fennel/ftrs/BTreeInsertExecStream.h"
00029 #include "fennel/ftrs/BTreeSearchExecStream.h"
00030 #include "fennel/ftrs/BTreeExecStream.h"
00031 #include "fennel/tuple/StandardTypeDescriptor.h"
00032 #include "fennel/tuple/TupleDescriptor.h"
00033 #include "fennel/exec/MockProducerExecStream.h"
00034 #include "fennel/exec/ValuesExecStream.h"
00035 #include "fennel/exec/ExecStreamEmbryo.h"
00036 #include "fennel/exec/SplitterExecStream.h"
00037 #include "fennel/exec/BarrierExecStream.h"
00038 #include "fennel/exec/DynamicParam.h"
00039 #include "fennel/cache/Cache.h"
00040 #include <stdarg.h>
00041 
00042 #include <boost/test/test_tools.hpp>
00043 
00044 using namespace fennel;
00045 
00049 class LcsMultiClusterAppendTest : public ExecStreamUnitTestBase
00050 {
00051 protected:
00052     StandardTypeDescriptorFactory stdTypeFactory;
00053     TupleAttributeDescriptor attrDesc_int64;
00054     TupleAttributeDescriptor attrDesc_bitmap;
00055 
00056     vector<boost::shared_ptr<BTreeDescriptor> > bTreeClusters;
00057 
00069     void loadClusters(uint nRows, uint nCols, uint nClusters);
00070 
00076     void scanCols(
00077         uint nRows, uint nCols, uint nClusters, TupleProjection proj);
00078 
00079 public:
00080     explicit LcsMultiClusterAppendTest()
00081     {
00082         FENNEL_UNIT_TEST_CASE(LcsMultiClusterAppendTest, testLoad);
00083     }
00084 
00085     void testCaseSetUp();
00086     void testCaseTearDown();
00087 
00088     void testLoad();
00089 };
00090 
00091 void LcsMultiClusterAppendTest::testLoad()
00092 {
00093     // load clusters and then do a full table scan to verify the rows
00094     // were properly loaded
00095 
00096     uint nRows = 100000;
00097     uint nCols = 5;
00098     uint nClusters = 7;
00099     TupleProjection proj;
00100 
00101     loadClusters(nRows, nCols, nClusters);
00102     resetExecStreamTest();
00103 
00104     // project all columns
00105     for (uint i = 0; i < nClusters * nCols; i++) {
00106         proj.push_back(i);
00107     }
00108     scanCols(nRows, nCols, nClusters, proj);
00109 }
00110 
00111 void LcsMultiClusterAppendTest::loadClusters(
00112     uint nRows,
00113     uint nCols,
00114     uint nClusters)
00115 {
00116     // setup input stream
00117 
00118     MockProducerExecStreamParams mockParams;
00119     for (uint i = 0; i < nCols * nClusters; i++) {
00120         mockParams.outputTupleDesc.push_back(attrDesc_int64);
00121     }
00122     mockParams.nRows = nRows;
00123 
00124     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00125     for (uint i = 0; i < nCols * nClusters; i++) {
00126         SharedInt64ColumnGenerator col =
00127             SharedInt64ColumnGenerator(new DupColumnGenerator(i + 1));
00128         columnGenerators.push_back(col);
00129     }
00130     mockParams.pGenerator.reset(
00131         new CompositeExecStreamGenerator(columnGenerators));
00132 
00133     ExecStreamEmbryo mockStreamEmbryo;
00134     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00135     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00136 
00137     // setup splitter stream
00138 
00139     SplitterExecStreamParams splitterParams;
00140     ExecStreamEmbryo splitterStreamEmbryo;
00141 
00142     splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams);
00143     splitterStreamEmbryo.getStream()->setName("SplitterExecStream");
00144 
00145     // setup loader streams
00146 
00147     vector<ExecStreamEmbryo> lcsAppendEmbryos;
00148     for (uint i = 0; i < nClusters; i++) {
00149         LcsClusterAppendExecStreamParams lcsAppendParams;
00150         lcsAppendParams.scratchAccessor =
00151             pSegmentFactory->newScratchSegment(pCache, 10);
00152         lcsAppendParams.pCacheAccessor = pCache;
00153         lcsAppendParams.pSegment = pRandomSegment;
00154 
00155         // initialize the btree parameter portion of lcsAppendParams
00156         // BTree tuple desc only has one column
00157         (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00158         (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00159 
00160         // BTree key only has one column which is the first column.
00161         (lcsAppendParams.keyProj).push_back(0);
00162 
00163         // output only one value(rows inserted)
00164         lcsAppendParams.outputTupleDesc.push_back(attrDesc_int64);
00165 
00166         for (uint j = 0; j < nCols; j++) {
00167             lcsAppendParams.inputProj.push_back(i * nCols + j);
00168         }
00169         lcsAppendParams.pRootMap = 0;
00170         lcsAppendParams.rootPageIdParamId = DynamicParamId(0);
00171 
00172         // setup temporary btree descriptor to get an empty page to start
00173         // the btree
00174 
00175         boost::shared_ptr<BTreeDescriptor> pBTreeDesc =
00176             boost::shared_ptr<BTreeDescriptor> (new BTreeDescriptor());
00177         bTreeClusters.push_back(pBTreeDesc);
00178         pBTreeDesc->segmentAccessor.pSegment = lcsAppendParams.pSegment;
00179         pBTreeDesc->segmentAccessor.pCacheAccessor = pCache;
00180         pBTreeDesc->tupleDescriptor = lcsAppendParams.tupleDesc;
00181         pBTreeDesc->keyProjection = lcsAppendParams.keyProj;
00182         pBTreeDesc->rootPageId = NULL_PAGE_ID;
00183         lcsAppendParams.pageOwnerId = pBTreeDesc->pageOwnerId;
00184         lcsAppendParams.segmentId = pBTreeDesc->segmentId;
00185 
00186         BTreeBuilder builder(*pBTreeDesc, pRandomSegment);
00187         builder.createEmptyRoot();
00188         lcsAppendParams.rootPageId = pBTreeDesc->rootPageId =
00189             builder.getRootPageId();
00190 
00191         // Now use the above initialized parameter
00192 
00193         LcsClusterAppendExecStream *lcsStream =
00194             new LcsClusterAppendExecStream();
00195 
00196         ExecStreamEmbryo lcsAppendStreamEmbryo;
00197         lcsAppendStreamEmbryo.init(lcsStream, lcsAppendParams);
00198         std::ostringstream oss;
00199         oss << "LcsClusterAppendExecStream" << "#" << i;
00200         lcsAppendStreamEmbryo.getStream()-> setName(oss.str());
00201         lcsAppendEmbryos.push_back(lcsAppendStreamEmbryo);
00202     }
00203 
00204     // setup barrier stream
00205 
00206     BarrierExecStreamParams barrierParams;
00207     barrierParams.outputTupleDesc.push_back(attrDesc_int64);
00208     barrierParams.returnMode = BARRIER_RET_ANY_INPUT;
00209 
00210     ExecStreamEmbryo barrierStreamEmbryo;
00211     barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams);
00212     barrierStreamEmbryo.getStream()->setName("BarrierExecStream");
00213 
00214     // connect all the embryos together
00215     SharedExecStream pOutputStream = prepareDAG(
00216         mockStreamEmbryo, splitterStreamEmbryo, lcsAppendEmbryos,
00217         barrierStreamEmbryo);
00218 
00219     // set up a generator which can produce the expected output
00220     RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00221 
00222     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00223 }
00224 
00225 void LcsMultiClusterAppendTest::scanCols(
00226     uint nRows,
00227     uint nCols,
00228     uint nClusters,
00229     TupleProjection proj)
00230 {
00231     // setup parameters into scan
00232     //  nClusters cluster with nCols columns each
00233 
00234     LcsRowScanExecStreamParams scanParams;
00235     scanParams.hasExtraFilter = false;
00236     scanParams.isFullScan = true;
00237     scanParams.samplingMode = SAMPLING_OFF;
00238     for (uint i = 0; i < nClusters; i++) {
00239         struct LcsClusterScanDef clusterScanDef;
00240 
00241         for (uint j = 0; j < nCols; j++) {
00242             clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
00243         }
00244 
00245         clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
00246         clusterScanDef.pCacheAccessor =
00247             bTreeClusters[i]->segmentAccessor.pCacheAccessor;
00248         clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
00249         clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
00250         clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
00251         clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
00252         clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
00253 
00254         scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
00255     }
00256 
00257     // setup projection
00258 
00259     scanParams.outputProj = proj;
00260     for (uint i = 0; i < proj.size(); i++) {
00261         scanParams.outputTupleDesc.push_back(attrDesc_int64);
00262     }
00263 
00264     // setup a values stream to provide an empty input to simulate
00265     // the scan of the deletion index
00266 
00267     ValuesExecStreamParams valuesParams;
00268     ExecStreamEmbryo valuesStreamEmbryo;
00269     boost::shared_array<FixedBuffer> pBuffer;
00270 
00271     valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00272     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00273     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00274 
00275     uint bufferSize = 16;
00276     pBuffer.reset(new FixedBuffer[bufferSize]);
00277     valuesParams.pTupleBuffer = pBuffer;
00278     valuesParams.bufSize = 0;
00279     valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
00280     valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00281 
00282     ExecStreamEmbryo scanStreamEmbryo;
00283 
00284     scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
00285     scanStreamEmbryo.getStream()->setName("RowScanExecStream");
00286 
00287     SharedExecStream pOutputStream =
00288         prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo);
00289 
00290     // setup generators for result stream
00291 
00292     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00293     for (uint i = 0; i < proj.size(); i++) {
00294         SharedInt64ColumnGenerator col =
00295             SharedInt64ColumnGenerator(new DupColumnGenerator(proj[i] + 1));
00296         columnGenerators.push_back(col);
00297     }
00298 
00299     CompositeExecStreamGenerator resultGenerator(columnGenerators);
00300     verifyOutput(*pOutputStream, nRows, resultGenerator);
00301 }
00302 
00303 void LcsMultiClusterAppendTest::testCaseSetUp()
00304 {
00305     ExecStreamUnitTestBase::testCaseSetUp();
00306 
00307     attrDesc_int64 = TupleAttributeDescriptor(
00308         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00309     attrDesc_bitmap = TupleAttributeDescriptor(
00310         stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
00311         true, pRandomSegment->getUsablePageSize() / 8);
00312 }
00313 
00314 void LcsMultiClusterAppendTest::testCaseTearDown()
00315 {
00316     for (uint i = 0; i < bTreeClusters.size(); i++) {
00317         bTreeClusters[i]->segmentAccessor.reset();
00318     }
00319     ExecStreamUnitTestBase::testCaseTearDown();
00320 }
00321 
00322 FENNEL_UNIT_TEST_SUITE(LcsMultiClusterAppendTest);
00323 
00324 
00325 // End LcsMultiClusterAppendTest.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1