LcsRowScanExecStreamTest.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/test/LcsRowScanExecStreamTest.cpp#26 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/test/ExecStreamUnitTestBase.h"
00024 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00025 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00026 #include "fennel/lucidera/bitmap/LbmEntry.h"
00027 #include "fennel/lucidera/test/SamplingExecStreamGenerator.h"
00028 #include "fennel/btree/BTreeBuilder.h"
00029 #include "fennel/ftrs/BTreeInsertExecStream.h"
00030 #include "fennel/ftrs/BTreeSearchExecStream.h"
00031 #include "fennel/ftrs/BTreeExecStream.h"
00032 #include "fennel/tuple/StandardTypeDescriptor.h"
00033 #include "fennel/tuple/TupleDescriptor.h"
00034 #include "fennel/tuple/TupleAccessor.h"
00035 #include "fennel/exec/MockProducerExecStream.h"
00036 #include "fennel/exec/ValuesExecStream.h"
00037 #include "fennel/exec/ExecStreamEmbryo.h"
00038 #include "fennel/exec/DynamicParam.h"
00039 #include "fennel/cache/Cache.h"
00040 #include <stdarg.h>
00041 
00042 #include <boost/test/test_tools.hpp>
00043 
00044 using namespace fennel;
00045 
00051 class LcsRowScanExecStreamTest : public ExecStreamUnitTestBase
00052 {
00053 protected:
00054     static const uint NDUPS = 20;
00055     StandardTypeDescriptorFactory stdTypeFactory;
00056     TupleAttributeDescriptor attrDesc_int64;
00057     TupleAttributeDescriptor attrDesc_bitmap;
00058     TupleAttributeDescriptor attrDesc_char1;
00059     uint bitmapColSize;
00060 
00061     vector<boost::shared_ptr<BTreeDescriptor> > bTreeClusters;
00062 
00066     void loadClusters(
00067         uint nRows,
00068         uint nCols,
00069         uint nClusters,
00070         bool compressed);
00071 
00080     void loadOneCluster(
00081         uint nRows,
00082         uint nCols,
00083         int colStart,
00084         BTreeDescriptor &bTreeDescriptor,
00085         bool compressed);
00086 
00105     void testScanCols(
00106         uint nRows,
00107         uint nCols,
00108         uint nClusters,
00109         TupleProjection proj,
00110         uint skipRows,
00111         uint expectedNumRows);
00112 
00133     void testFilterCols(
00134         uint nRows,
00135         uint nCols,
00136         uint nClusters,
00137         TupleProjection proj,
00138         uint skipRows,
00139         uint expectedNumRows,
00140         bool compressed);
00141 
00142     void setSearchKey(
00143         char lowerDirective,
00144         char upperDirective,
00145         uint64_t lowerVal,
00146         uint64_t upperVal,
00147         PBuffer inputBuf,
00148         uint &offset,
00149         TupleAccessor &inputTupleAccessor,
00150         TupleData &inputTupleData);
00151 
00181     void testSampleScanCols(
00182         uint nRows,
00183         uint nRowsActual,
00184         uint nCols,
00185         uint nClusters,
00186         TupleProjection proj,
00187         uint skipRows,
00188         TableSamplingMode mode,
00189         float rate,
00190         int seed,
00191         uint clumps,
00192         uint expectedNumRows);
00193 
00208     int generateBitmaps(
00209         uint nRows, uint skipRows, TupleDescriptor const &bitmapTupleDesc,
00210         PBuffer pBuf);
00211 
00212     void produceEntry(
00213         LbmEntry &lbmEntry, TupleAccessor &bitmapTupleAccessor, PBuffer pBuf,
00214         int &bufSize);
00215 
00216 public:
00217     explicit LcsRowScanExecStreamTest()
00218     {
00219         FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testScans);
00220         FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testScanOnEmptyCluster);
00221         FENNEL_UNIT_TEST_CASE(
00222             LcsRowScanExecStreamTest, testScanPastEndOfCluster);
00223         FENNEL_UNIT_TEST_CASE(
00224             LcsRowScanExecStreamTest, testCompressedFiltering);
00225         FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testBernoulliSampling);
00226         FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testSystemSampling);
00227     }
00228 
00229     void testCaseSetUp();
00230     void testCaseTearDown();
00231 
00232     void testScans();
00233     void testScanOnEmptyCluster();
00234     void testScanPastEndOfCluster();
00235     void testCompressedFiltering();
00236     void testBernoulliSampling();
00237     void testSystemSampling();
00238 };
00239 
00240 void LcsRowScanExecStreamTest::loadClusters(
00241     uint nRows,
00242     uint nCols,
00243     uint nClusters,
00244     bool compressed)
00245 {
00246     for (uint i = 0; i < nClusters; i++) {
00247         boost::shared_ptr<BTreeDescriptor> pBTreeDesc =
00248             boost::shared_ptr<BTreeDescriptor> (new BTreeDescriptor());
00249         bTreeClusters.push_back(pBTreeDesc);
00250         loadOneCluster(
00251             nRows, nCols, i * nCols, *(bTreeClusters[i]), compressed);
00252         resetExecStreamTest();
00253     }
00254 }
00255 
00256 void LcsRowScanExecStreamTest::loadOneCluster(
00257     uint nRows,
00258     uint nCols,
00259     int colStart,
00260     BTreeDescriptor &bTreeDescriptor,
00261     bool compressed)
00262 {
00263     MockProducerExecStreamParams mockParams;
00264     for (uint i = 0; i < nCols; i++) {
00265         mockParams.outputTupleDesc.push_back(attrDesc_int64);
00266     }
00267     mockParams.nRows = nRows;
00268 
00269     // generators for input stream load
00270 
00271     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00272     for (uint i = 0; i < nCols; i++) {
00273         SharedInt64ColumnGenerator col =
00274             SharedInt64ColumnGenerator(
00275             compressed
00276             ? (Int64ColumnGenerator *) new MixedDupColumnGenerator(
00277                 NDUPS, i + colStart, 500)
00278             : new SeqColumnGenerator(i + colStart));
00279         columnGenerators.push_back(col);
00280     }
00281     mockParams.pGenerator.reset(
00282         new CompositeExecStreamGenerator(columnGenerators));
00283 
00284     ExecStreamEmbryo mockStreamEmbryo;
00285     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00286     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00287 
00288     LcsClusterAppendExecStreamParams lcsAppendParams;
00289     lcsAppendParams.scratchAccessor =
00290         pSegmentFactory->newScratchSegment(pCache, 10);
00291     lcsAppendParams.pCacheAccessor = pCache;
00292     lcsAppendParams.pSegment = pRandomSegment;
00293 
00294     // initialize the btree parameter portion of lcsAppendParams
00295     // BTree tuple desc only has one column
00296     (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00297     (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00298 
00299     // BTree key only has one column which is the first column.
00300     (lcsAppendParams.keyProj).push_back(0);
00301 
00302     // output only one value(rows inserted)
00303     lcsAppendParams.outputTupleDesc.push_back(attrDesc_int64);
00304 
00305     for (uint i = 0; i < nCols; i++) {
00306         lcsAppendParams.inputProj.push_back(i);
00307     }
00308     lcsAppendParams.pRootMap = 0;
00309     lcsAppendParams.rootPageIdParamId = DynamicParamId(0);
00310 
00311     // setup temporary btree descriptor to get an empty page to start the btree
00312 
00313     bTreeDescriptor.segmentAccessor.pSegment = lcsAppendParams.pSegment;
00314     bTreeDescriptor.segmentAccessor.pCacheAccessor = pCache;
00315     bTreeDescriptor.tupleDescriptor = lcsAppendParams.tupleDesc;
00316     bTreeDescriptor.keyProjection = lcsAppendParams.keyProj;
00317     bTreeDescriptor.rootPageId = NULL_PAGE_ID;
00318     lcsAppendParams.segmentId = bTreeDescriptor.segmentId;
00319     lcsAppendParams.pageOwnerId = bTreeDescriptor.pageOwnerId;
00320 
00321     BTreeBuilder builder(bTreeDescriptor, pRandomSegment);
00322     builder.createEmptyRoot();
00323     lcsAppendParams.rootPageId = bTreeDescriptor.rootPageId =
00324         builder.getRootPageId();
00325 
00326     // Now use the above initialized parameter
00327 
00328     LcsClusterAppendExecStream *lcsStream = new LcsClusterAppendExecStream();
00329 
00330     ExecStreamEmbryo lcsAppendStreamEmbryo;
00331     lcsAppendStreamEmbryo.init(lcsStream, lcsAppendParams);
00332     lcsAppendStreamEmbryo.getStream()->setName("LcsClusterAppendExecStream");
00333 
00334     SharedExecStream pOutputStream = prepareTransformGraph(
00335         mockStreamEmbryo, lcsAppendStreamEmbryo);
00336 
00337     // set up a generator which can produce the expected output
00338     RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00339 
00340     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00341 }
00342 
00343 void LcsRowScanExecStreamTest::testScanCols(
00344     uint nRows,
00345     uint nCols,
00346     uint nClusters,
00347     TupleProjection proj,
00348     uint skipRows,
00349     uint expectedNumRows)
00350 {
00351     // setup input rid stream
00352 
00353     ValuesExecStreamParams valuesParams;
00354     boost::shared_array<FixedBuffer> pBuffer;
00355     ExecStreamEmbryo valuesStreamEmbryo;
00356     LcsRowScanExecStreamParams scanParams;
00357 
00358     scanParams.hasExtraFilter = false;
00359     scanParams.samplingMode = SAMPLING_OFF;
00360 
00361     // setup a values stream either to provide an empty input to simulate
00362     // the scan of the deletion index (in the case of a full scan) or a stream
00363     // of rid values when we're doing reads based on specific rids
00364     valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00365     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00366     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00367 
00368     // set buffer size to max number of bytes required to represent each
00369     // bit (nRows/8) plus max number of segments (nRows/bitmapColSize)
00370     // times 8 bytes for each starting rid in the segment
00371     uint bufferSize = std::max(
00372         16, (int) (nRows / 8 + nRows / bitmapColSize * 8));
00373     pBuffer.reset(new FixedBuffer[bufferSize]);
00374     valuesParams.pTupleBuffer = pBuffer;
00375 
00376     if (nRows > 0) {
00377         valuesParams.bufSize = generateBitmaps(
00378             nRows, skipRows, valuesParams.outputTupleDesc, pBuffer.get());
00379         assert(valuesParams.bufSize <= bufferSize);
00380         scanParams.isFullScan = false;
00381     } else {
00382         scanParams.isFullScan = true;
00383         valuesParams.bufSize = 0;
00384     }
00385     valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
00386     valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00387 
00388     // setup parameters into scan
00389     //  nClusters cluster with nCols columns each
00390 
00391     for (uint i = 0; i < nClusters; i++) {
00392         struct LcsClusterScanDef clusterScanDef;
00393 
00394         for (uint j = 0; j < nCols; j++) {
00395             clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
00396         }
00397 
00398         clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
00399         clusterScanDef.pCacheAccessor =
00400             bTreeClusters[i]->segmentAccessor.pCacheAccessor;
00401         clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
00402         clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
00403         clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
00404         clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
00405         clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
00406 
00407         scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
00408     }
00409 
00410     // setup projection
00411     scanParams.outputProj = proj;
00412     for (uint i = 0; i < proj.size(); i++) {
00413         scanParams.outputTupleDesc.push_back(attrDesc_int64);
00414     }
00415 
00416     ExecStreamEmbryo scanStreamEmbryo;
00417     scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
00418     scanStreamEmbryo.getStream()->setName("RowScanExecStream");
00419     SharedExecStream pOutputStream;
00420 
00421     pOutputStream =
00422         prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo);
00423 
00424     // setup generators for result stream
00425 
00426     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00427     for (uint i = 0; i < proj.size(); i++) {
00428         SharedInt64ColumnGenerator col =
00429             SharedInt64ColumnGenerator(
00430                 new SeqColumnGenerator(
00431                     proj[i],
00432                     skipRows));
00433         columnGenerators.push_back(col);
00434     }
00435 
00436     CompositeExecStreamGenerator resultGenerator(columnGenerators);
00437     verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
00438 }
00439 
00440 int LcsRowScanExecStreamTest::generateBitmaps(
00441     uint nRows, uint skipRows, TupleDescriptor const &bitmapTupleDesc,
00442     PBuffer pBuf)
00443 {
00444     int bufSize = 0;
00445     LbmEntry lbmEntry;
00446     boost::scoped_array<FixedBuffer> entryBuf;
00447     TupleAccessor bitmapTupleAccessor;
00448     LcsRid rid = LcsRid(0);
00449 
00450     TupleData bitmapTupleData(bitmapTupleDesc);
00451     bitmapTupleData[0].pData = (PConstBuffer) &rid;
00452     bitmapTupleData[1].pData = NULL;
00453     bitmapTupleData[1].cbData = 0;
00454     bitmapTupleData[2].pData = NULL;
00455     bitmapTupleData[2].cbData = 0;
00456 
00457     bitmapTupleAccessor.compute(bitmapTupleDesc);
00458 
00459     // setup an LbmEntry with the initial rid value
00460     uint scratchBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00461     entryBuf.reset(new FixedBuffer[scratchBufSize]);
00462     lbmEntry.init(entryBuf.get(), NULL, scratchBufSize, bitmapTupleDesc);
00463     lbmEntry.setEntryTuple(bitmapTupleData);
00464 
00465     // add on the remaining rids
00466     for (rid = LcsRid(skipRows); rid < LcsRid(nRows); rid += skipRows) {
00467         if (!lbmEntry.setRID(LcsRid(rid))) {
00468             // if exhausted buffer space, write the tuple to the output
00469             // buffer and reset LbmEntry
00470             produceEntry(lbmEntry, bitmapTupleAccessor, pBuf, bufSize);
00471             lbmEntry.setEntryTuple(bitmapTupleData);
00472         }
00473     }
00474     // write out the last LbmEntry
00475     produceEntry(lbmEntry, bitmapTupleAccessor, pBuf, bufSize);
00476 
00477     return bufSize;
00478 }
00479 
00480 void LcsRowScanExecStreamTest::produceEntry(
00481     LbmEntry &lbmEntry, TupleAccessor &bitmapTupleAccessor, PBuffer pBuf,
00482     int &bufSize)
00483 {
00484     TupleData bitmapTuple = lbmEntry.produceEntryTuple();
00485     bitmapTupleAccessor.marshal(bitmapTuple, pBuf + bufSize);
00486     bufSize += bitmapTupleAccessor.getCurrentByteCount();
00487 }
00488 
00489 void LcsRowScanExecStreamTest::testScans()
00490 {
00491     // 1. load clusters, so they can be used by steps 2-5 below
00492     // 2. scan all data in clusters
00493     // 3. test projection
00494     // 4. test skipping of rows
00495     // 5. test full table scan
00496 
00497     uint nRows = 50000;
00498     uint nCols = 12;
00499     uint nClusters = 3;
00500     TupleProjection proj;
00501 
00502     loadClusters(nRows, nCols, nClusters, false);
00503     // note: no need to reset after loadClusters() because already done
00504     // there
00505 
00506     // scan all rows and columns
00507     for (uint i = 0; i < nClusters; i++) {
00508         for (uint j = 0; j < nCols; j++) {
00509             proj.push_back(i * nCols + j);
00510         }
00511     }
00512     testScanCols(nRows, nCols, nClusters, proj, 1, nRows);
00513     resetExecStreamTest();
00514 
00515     // project columns 22, 10, 12, 26, 1, 35, 15, 5, 17, 30, 4, 20, 7, and 13
00516     proj.clear();
00517     proj.push_back(22);
00518     proj.push_back(10);
00519     proj.push_back(12);
00520     proj.push_back(26);
00521     proj.push_back(1);
00522     proj.push_back(35);
00523     proj.push_back(15);
00524     proj.push_back(5);
00525     proj.push_back(17);
00526     proj.push_back(30);
00527     proj.push_back(4);
00528     proj.push_back(20);
00529     proj.push_back(7);
00530     proj.push_back(13);
00531 
00532     testScanCols(nRows, nCols, nClusters, proj, 1, nRows);
00533     resetExecStreamTest();
00534 
00535     // read every 7 rows, same projection as above
00536     testScanCols(
00537         nRows, nCols, nClusters, proj, 7, (int) ceil((double) nRows / 7));
00538     resetExecStreamTest();
00539 
00540 
00541     // read every 37 rows, same projection as above
00542     testScanCols(
00543         nRows, nCols, nClusters, proj, 37, (int) ceil((double) nRows / 37));
00544     resetExecStreamTest();
00545 
00546     // full table scan -- input stream is empty
00547     testScanCols(0, nCols, nClusters, proj, 1, nRows);
00548 
00549     resetExecStreamTest();
00550 
00551     // scan 1000 rows and columns
00552     for (uint i = 0; i < nClusters; i++) {
00553         for (uint j = 0; j < nCols; j++) {
00554             proj.push_back(i * nCols + j);
00555         }
00556     }
00557     testFilterCols(nRows, nCols, nClusters, proj, 1, 1000, false);
00558 
00559     resetExecStreamTest();
00560 
00561     // scan all columns execept the 1st & 2nd of the 1st & 2nd cluster
00562     proj.resize(0);
00563     for (uint i = 0; i < nClusters; i++) {
00564         for (uint j = 0; j < nCols; j++) {
00565             if (!(i < 2 && (j == 0 || j == 1))) {
00566                 proj.push_back(i * nCols + j);
00567             }
00568         }
00569     }
00570     testFilterCols(nRows, nCols, nClusters, proj, 1, 1000, false);
00571 
00572     resetExecStreamTest();
00573 
00574     // skip one cluster; also setup the input so every 7 rows are skipped
00575     proj.resize(0);
00576     for (uint i = 0; i < nClusters - 1; i++) {
00577         for (uint j = 0; j < nCols; j++) {
00578             proj.push_back(i * nCols + j);
00579         }
00580     }
00581     testFilterCols(
00582         nRows, nCols, nClusters, proj, 7, 1000 / 7 + 1, false);
00583 }
00584 
00585 void LcsRowScanExecStreamTest::testCompressedFiltering()
00586 {
00587     // 1. load clusters, so they can be used by steps 2-5 below
00588     // 2. scan all data in clusters
00589     // 3. test projection
00590     // 4. test skipping of rows
00591     // 5. test full table scan
00592 
00593     uint nRows = 50000;
00594     uint nCols = 12;
00595     uint nClusters = 3;
00596     TupleProjection proj;
00597 
00598     // Test compressed bitmap optimization
00599     //
00600     loadClusters(nRows, nCols, nClusters, true);
00601 
00602     // scan 500*NDUPS+500 rows and columns
00603     proj.resize(0);
00604     for (uint i = 0; i < nClusters; i++) {
00605         for (uint j = 0; j < nCols; j++) {
00606             proj.push_back(i * nCols + j);
00607         }
00608     }
00609     testFilterCols(nRows, nCols, nClusters, proj, 1, 500*NDUPS+500, true);
00610 
00611     resetExecStreamTest();
00612 
00613     // scan all columns execept the 1st & 2nd of the 1st & 2nd cluster
00614     proj.resize(0);
00615     for (uint i = 0; i < nClusters; i++) {
00616         for (uint j = 0; j < nCols; j++) {
00617             if (!(i < 2 && (j == 0 || j == 1))) {
00618                 proj.push_back(i * nCols + j);
00619             }
00620         }
00621     }
00622     testFilterCols(nRows, nCols, nClusters, proj, 1, 500*NDUPS+500, true);
00623 
00624     resetExecStreamTest();
00625 
00626     // skip one cluster
00627     proj.resize(0);
00628     for (uint i = 0; i < nClusters - 1; i++) {
00629         for (uint j = 0; j < nCols; j++) {
00630             proj.push_back(i * nCols + j);
00631         }
00632     }
00633     testFilterCols(nRows, nCols, nClusters, proj, 1, 500*NDUPS+500, true);
00634 }
00635 
00636 
00640 void LcsRowScanExecStreamTest::testScanOnEmptyCluster()
00641 {
00642     // create empty btree
00643 
00644     BTreeDescriptor &bTreeDescriptor = *(bTreeClusters[0]);
00645 
00646     bTreeDescriptor.segmentAccessor.pSegment = pRandomSegment;
00647     bTreeDescriptor.segmentAccessor.pCacheAccessor = pCache;
00648     bTreeDescriptor.tupleDescriptor.push_back(attrDesc_int64);
00649     bTreeDescriptor.tupleDescriptor.push_back(attrDesc_int64);
00650     bTreeDescriptor.keyProjection.push_back(0);
00651     bTreeDescriptor.rootPageId = NULL_PAGE_ID;
00652 
00653     BTreeBuilder builder(bTreeDescriptor, pRandomSegment);
00654     builder.createEmptyRoot();
00655     bTreeDescriptor.rootPageId = builder.getRootPageId();
00656 
00657     // have testScanCols attempt to scan a single row, although it should
00658     // return no rows
00659 
00660     TupleProjection proj;
00661 
00662     proj.push_back(0);
00663     testScanCols(1, 1, 1, proj, 1, 0);
00664 }
00665 
00670 void LcsRowScanExecStreamTest::testScanPastEndOfCluster()
00671 {
00672     loadOneCluster(1, 1, 0, *(bTreeClusters[0]), false);
00673     resetExecStreamTest();
00674 
00675     // have testScanCols attempt to read 2 rows, although it should only
00676     // be able to read 1
00677 
00678     TupleProjection proj;
00679 
00680     proj.push_back(0);
00681     testScanCols(2, 1, 1, proj, 1, 1);
00682 }
00683 
00688 void LcsRowScanExecStreamTest::testBernoulliSampling()
00689 {
00690     uint nRows = 50000;
00691     uint nCols = 12;
00692     uint nClusters = 3;
00693     TupleProjection proj;
00694 
00695     int seed = 19721212;
00696     float rate = 0.1;
00697     TableSamplingMode mode = SAMPLING_BERNOULLI;
00698 
00699     loadClusters(nRows, nCols, nClusters, false);
00700     // note: no need to reset after loadClusters() because already done
00701     // there
00702 
00703     // scan all rows and columns
00704     for (uint i = 0; i < nClusters; i++) {
00705         for (uint j = 0; j < nCols; j++) {
00706             proj.push_back(i * nCols + j);
00707         }
00708     }
00709 
00710     // Full Row Scan (4938 is based on the seed, but determine empirically)
00711     testSampleScanCols(
00712         0, nRows, nCols, nClusters, proj, 1, mode, rate, seed, 0, 4938);
00713     resetExecStreamTest();
00714 
00715     // Skip every other row
00716     testSampleScanCols(
00717         nRows, nRows, nCols, nClusters, proj, 2, mode, rate, seed, 0, 2489);
00718     resetExecStreamTest();
00719 }
00720 
00721 
00726 void LcsRowScanExecStreamTest::testSystemSampling()
00727 {
00728     uint nRows = 50000;
00729     uint nCols = 12;
00730     uint nClusters = 3;
00731     TupleProjection proj;
00732 
00733     TableSamplingMode mode = SAMPLING_SYSTEM;
00734 
00735     loadClusters(nRows, nCols, nClusters, false);
00736     // note: no need to reset after loadClusters() because already done
00737     // there
00738 
00739     // scan all rows and columns
00740     for (uint i = 0; i < nClusters; i++) {
00741         for (uint j = 0; j < nCols; j++) {
00742             proj.push_back(i * nCols + j);
00743         }
00744     }
00745 
00746     testSampleScanCols(
00747         nRows, nRows, nCols, nClusters, proj, 1, mode, 0.1, -1, 10, 5000);
00748     resetExecStreamTest();
00749 
00750     testSampleScanCols(
00751         nRows, nRows, nCols, nClusters, proj, 1, mode, 1.0, -1, 10, 50000);
00752     resetExecStreamTest();
00753 
00754     testSampleScanCols(
00755         nRows, nRows, nCols, nClusters, proj, 1, mode, 0.33333, -1, 10, 16670);
00756     resetExecStreamTest();
00757 }
00758 
00759 void LcsRowScanExecStreamTest::setSearchKey(
00760     char lowerDirective, char upperDirective, uint64_t lowerVal,
00761     uint64_t upperVal, PBuffer inputBuf, uint &offset,
00762     TupleAccessor &inputTupleAccessor, TupleData &inputTupleData)
00763 {
00764     inputTupleData[0].pData = (PConstBuffer) &lowerDirective;
00765     inputTupleData[2].pData = (PConstBuffer) &upperDirective;
00766     inputTupleData[1].pData = (PConstBuffer) &lowerVal;
00767     inputTupleData[3].pData = (PConstBuffer) &upperVal;
00768     inputTupleAccessor.marshal(inputTupleData, inputBuf + offset);
00769     offset += inputTupleAccessor.getCurrentByteCount();
00770 }
00771 
00772 void LcsRowScanExecStreamTest::testFilterCols(
00773     uint nRows,
00774     uint nCols,
00775     uint nClusters,
00776     TupleProjection proj,
00777     uint skipRows,
00778     uint expectedNumRows,
00779     bool compressed)
00780 {
00781     // setup input rid stream
00782 
00783     ValuesExecStreamParams valuesParams;
00784     boost::shared_array<FixedBuffer> pBuffer;
00785     ExecStreamEmbryo valuesStreamEmbryo;
00786     LcsRowScanExecStreamParams scanParams;
00787 
00788     scanParams.hasExtraFilter = true;
00789     scanParams.samplingMode = SAMPLING_OFF;
00790 
00791     // setup a values stream either to provide an empty input to simulate
00792     // the scan of the deletion index (in the case of a full scan) or a stream
00793     // of rid values when we're doing reads based on specific rids
00794     valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00795     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00796     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00797 
00798 
00799     // set buffer size to max number of bytes required to represent each
00800     // bit (nRows/8) plus max number of segments (nRows/bitmapColSize)
00801     // times 8 bytes for each starting rid in the segment
00802     uint bufferSize = std::max(
00803         16, (int) (nRows / 8 + nRows / bitmapColSize * 8));
00804     pBuffer.reset(new FixedBuffer[bufferSize]);
00805     valuesParams.pTupleBuffer = pBuffer;
00806 
00807     if (nRows > 0) {
00808         valuesParams.bufSize = generateBitmaps(
00809             nRows, skipRows, valuesParams.outputTupleDesc, pBuffer.get());
00810         assert(valuesParams.bufSize <= bufferSize);
00811         scanParams.isFullScan = false;
00812     } else {
00813         scanParams.isFullScan = true;
00814         valuesParams.bufSize = 0;
00815     }
00816     valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
00817     valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00818 
00819     // setup the following search keys:
00820     // 1. key0 >= 2000 or key0 < 1000
00821     // 2. 500 <= key1 - nCols < 2999 or  (key1 - nCols) == 2999
00822     // 3  key2 - 2*nCols > 1500
00823     //
00824     // where key0 corresponds to column #0,
00825     // key1 corresponds to the column #nCols, and
00826     // key2 corresponds to column #(2*nCols)
00827 
00828     TupleAttributeDescriptor attrDesc_nullableInt64 =
00829         TupleAttributeDescriptor(
00830             stdTypeFactory.newDataType(STANDARD_TYPE_INT_64),
00831             true, sizeof(uint64_t));
00832 
00833     valuesParams.outputTupleDesc.resize(0);
00834     TupleDescriptor inputTupleDesc;
00835     for (uint i = 0; i < 2; i++) {
00836         inputTupleDesc.push_back(attrDesc_char1);
00837         inputTupleDesc.push_back(attrDesc_nullableInt64);
00838         valuesParams.outputTupleDesc.push_back(attrDesc_char1);
00839         valuesParams.outputTupleDesc.push_back(attrDesc_nullableInt64);
00840     }
00841     TupleData inputTupleData(inputTupleDesc);
00842     TupleAccessor inputTupleAccessor;
00843     inputTupleAccessor.compute(inputTupleDesc);
00844 
00845     uint nInputTuples = 3;
00846     boost::shared_array<FixedBuffer> inputBuffer;
00847     inputBuffer.reset(
00848         new FixedBuffer[nInputTuples * inputTupleAccessor.getMaxByteCount()]);
00849 
00850     PBuffer inputBuf = inputBuffer.get();
00851     uint offset = 0;
00852 
00853     setSearchKey(
00854         '-', ')', 0, 1000, inputBuf, offset, inputTupleAccessor,
00855         inputTupleData);
00856     setSearchKey(
00857         '[', '+', 2000, 0, inputBuf, offset, inputTupleAccessor,
00858         inputTupleData);
00859 
00860     TupleData inputTupleData1(inputTupleDesc);
00861     boost::shared_array<FixedBuffer> inputBuffer1;
00862     inputBuffer1.reset(
00863         new FixedBuffer[nInputTuples * inputTupleAccessor.getMaxByteCount()]);
00864     PBuffer inputBuf1 = inputBuffer1.get();
00865     uint offset1 = 0;
00866 
00867     setSearchKey(
00868         '[', ')', 500 + nCols, 2999 + nCols, inputBuf1, offset1,
00869         inputTupleAccessor,
00870         inputTupleData1);
00871     setSearchKey(
00872         '[', ']', 2999 + nCols, 2999 + nCols, inputBuf1, offset1,
00873         inputTupleAccessor, inputTupleData1);
00874 
00875     TupleData inputTupleData2(inputTupleDesc);
00876     boost::shared_array<FixedBuffer> inputBuffer2;
00877     inputBuffer2.reset(
00878         new FixedBuffer[nInputTuples * inputTupleAccessor.getMaxByteCount()]);
00879     PBuffer inputBuf2 = inputBuffer2.get();
00880     uint offset2 = 0;
00881 
00882     setSearchKey(
00883         '(', '+', 1500+2*nCols, 0, inputBuf2, offset2, inputTupleAccessor,
00884         inputTupleData1);
00885 
00886     valuesParams.pTupleBuffer = inputBuffer;
00887     valuesParams.bufSize = offset;
00888 
00889     ExecStreamEmbryo valuesStreamEmbryo1,  valuesStreamEmbryo2,
00890         valuesStreamEmbryo3;
00891     valuesStreamEmbryo1.init(new ValuesExecStream(), valuesParams);
00892     valuesStreamEmbryo1.getStream()->setName("ValuesExecStream1");
00893 
00894     valuesParams.pTupleBuffer = inputBuffer1;
00895     valuesParams.bufSize = offset1;
00896     valuesStreamEmbryo2.init(new ValuesExecStream(), valuesParams);
00897     valuesStreamEmbryo2.getStream()->setName("ValuesExecStream2");
00898 
00899     valuesParams.pTupleBuffer = inputBuffer2;
00900     valuesParams.bufSize = offset2;
00901     valuesStreamEmbryo3.init(new ValuesExecStream(), valuesParams);
00902     valuesStreamEmbryo3.getStream()->setName("ValuesExecStream3");
00903 
00904     // setup parameters into scan
00905     //  nClusters cluster with nCols columns each
00906 
00907     for (uint i = 0; i < nClusters; i++) {
00908         struct LcsClusterScanDef clusterScanDef;
00909 
00910         for (uint j = 0; j < nCols; j++) {
00911             clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
00912         }
00913 
00914         clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
00915         clusterScanDef.pCacheAccessor =
00916             bTreeClusters[i]->segmentAccessor.pCacheAccessor;
00917         clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
00918         clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
00919         clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
00920         clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
00921         clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
00922 
00923         scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
00924     }
00925 
00926     // setup projection
00927     scanParams.outputProj = proj;
00928     for (uint i = 0; i < proj.size(); i++) {
00929         scanParams.outputTupleDesc.push_back(attrDesc_int64);
00930     }
00931     scanParams.residualFilterCols.push_back(0);
00932     scanParams.residualFilterCols.push_back(nCols);
00933     scanParams.residualFilterCols.push_back(2*nCols);
00934 
00935     ExecStreamEmbryo scanStreamEmbryo;
00936     scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
00937     scanStreamEmbryo.getStream()->setName("RowScanExecStream");
00938     SharedExecStream pOutputStream;
00939 
00940     std::vector<ExecStreamEmbryo> sources;
00941     sources.push_back(valuesStreamEmbryo);
00942     sources.push_back(valuesStreamEmbryo1);
00943     sources.push_back(valuesStreamEmbryo2);
00944     sources.push_back(valuesStreamEmbryo3);
00945 
00946     pOutputStream =
00947         prepareConfluenceGraph(sources, scanStreamEmbryo);
00948 
00949     // setup generators for result stream
00950 
00951     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00952     offset = (int) ceil(2000.0 / skipRows) * skipRows;
00953     for (uint i = 0; i < proj.size(); i++) {
00954         SharedInt64ColumnGenerator col =
00955             SharedInt64ColumnGenerator(
00956                 compressed ?
00957                     (Int64ColumnGenerator*) new MixedDupColumnGenerator(
00958                         NDUPS, proj[i] + 2000,500) :
00959                     new SeqColumnGenerator(proj[i] + offset, skipRows));
00960         columnGenerators.push_back(col);
00961     }
00962 
00963 
00964     CompositeExecStreamGenerator resultGenerator(columnGenerators);
00965     verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
00966 }
00967 
00968 
00969 void LcsRowScanExecStreamTest::testSampleScanCols(
00970     uint nRows,
00971     uint nRowsActual,
00972     uint nCols,
00973     uint nClusters,
00974     TupleProjection proj,
00975     uint skipRows,
00976     TableSamplingMode mode,
00977     float rate,
00978     int seed,
00979     uint clumps,
00980     uint expectedNumRows)
00981 {
00982     // setup input rid stream
00983 
00984     ValuesExecStreamParams valuesParams;
00985     boost::shared_array<FixedBuffer> pBuffer;
00986     ExecStreamEmbryo valuesStreamEmbryo;
00987     LcsRowScanExecStreamParams scanParams;
00988 
00989     scanParams.hasExtraFilter = false;
00990 
00991     // setup a values stream either to provide an empty input to simulate
00992     // the scan of the deletion index (in the case of a full scan) or a stream
00993     // of rid values when we're doing reads based on specific rids
00994     valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00995     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00996     valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00997 
00998     uint nRowsInternal = (mode == SAMPLING_SYSTEM) ? 0 : nRows;
00999 
01000     // set buffer size to max number of bytes required to represent each bit
01001     // (nRowsInternal/8) plus max number of segments
01002     // (nRowsInternal/bitmapColSize) times 8 bytes for each starting rid in the
01003     // segment
01004     uint bufferSize = std::max(
01005         16, (int) (nRowsInternal / 8 + nRowsInternal / bitmapColSize * 8));
01006     pBuffer.reset(new FixedBuffer[bufferSize]);
01007     valuesParams.pTupleBuffer = pBuffer;
01008 
01009     if (nRowsInternal > 0) {
01010         valuesParams.bufSize = generateBitmaps(
01011             nRowsInternal, skipRows, valuesParams.outputTupleDesc,
01012             pBuffer.get());
01013         assert(valuesParams.bufSize <= bufferSize);
01014         scanParams.isFullScan = false;
01015     } else {
01016         scanParams.isFullScan = true;
01017         valuesParams.bufSize = 0;
01018     }
01019     valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
01020     valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
01021 
01022     // setup parameters into scan
01023     //  nClusters cluster with nCols columns each
01024 
01025     for (uint i = 0; i < nClusters; i++) {
01026         struct LcsClusterScanDef clusterScanDef;
01027 
01028         for (uint j = 0; j < nCols; j++) {
01029             clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
01030         }
01031 
01032         clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
01033         clusterScanDef.pCacheAccessor =
01034             bTreeClusters[i]->segmentAccessor.pCacheAccessor;
01035         clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
01036         clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
01037         clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
01038         clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
01039         clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
01040 
01041         scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
01042     }
01043 
01044     // setup projection
01045     scanParams.outputProj = proj;
01046     for (uint i = 0; i < proj.size(); i++) {
01047         scanParams.outputTupleDesc.push_back(attrDesc_int64);
01048     }
01049 
01050 
01051     // setup sampling
01052     scanParams.samplingMode = mode;
01053     scanParams.samplingRate = rate;
01054     scanParams.samplingIsRepeatable = true;
01055     scanParams.samplingRepeatableSeed = seed;
01056     scanParams.samplingClumps = clumps;
01057     scanParams.samplingRowCount = nRowsActual;
01058 
01059     ExecStreamEmbryo scanStreamEmbryo;
01060     scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
01061     scanStreamEmbryo.getStream()->setName("RowScanExecStream");
01062     SharedExecStream pOutputStream;
01063 
01064     pOutputStream =
01065         prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo);
01066 
01067     // setup generators for result stream
01068 
01069     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
01070     for (uint i = 0; i < proj.size(); i++) {
01071         SharedInt64ColumnGenerator col =
01072             SharedInt64ColumnGenerator(
01073                 new SeqColumnGenerator(
01074                     proj[i],
01075                     skipRows));
01076         columnGenerators.push_back(col);
01077     }
01078 
01079     boost::shared_ptr<CompositeExecStreamGenerator> baseResultGenerator(
01080         new CompositeExecStreamGenerator(columnGenerators));
01081 
01082     if (mode == SAMPLING_BERNOULLI) {
01083         BernoulliSamplingExecStreamGenerator resultGenerator(
01084             baseResultGenerator,
01085             rate,
01086             seed,
01087             proj.size());
01088 
01089         verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
01090     } else {
01091         SystemSamplingExecStreamGenerator resultGenerator(
01092             baseResultGenerator,
01093             rate,
01094             nRows,
01095             proj.size(),
01096             clumps);
01097 
01098         verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
01099     }
01100 }
01101 
01102 void LcsRowScanExecStreamTest::testCaseSetUp()
01103 {
01104     ExecStreamUnitTestBase::testCaseSetUp();
01105 
01106     attrDesc_char1 = TupleAttributeDescriptor(
01107         stdTypeFactory.newDataType(STANDARD_TYPE_CHAR), false, 1);
01108     attrDesc_int64 = TupleAttributeDescriptor(
01109         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
01110     bitmapColSize = pRandomSegment->getUsablePageSize() / 8;
01111     attrDesc_bitmap = TupleAttributeDescriptor(
01112         stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
01113         true, bitmapColSize);
01114 }
01115 
01116 void LcsRowScanExecStreamTest::testCaseTearDown()
01117 {
01118     for (uint i = 0; i < bTreeClusters.size(); i++) {
01119         bTreeClusters[i]->segmentAccessor.reset();
01120     }
01121     ExecStreamUnitTestBase::testCaseTearDown();
01122 }
01123 
01124 FENNEL_UNIT_TEST_SUITE(LcsRowScanExecStreamTest);
01125 
01126 
01127 // End LcsRowScanExecStreamTest.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1