LbmUnionExecStreamTest.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/test/LbmUnionExecStreamTest.cpp#8 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ValuesExecStream.h"
00024 #include "fennel/lucidera/bitmap/LbmChopperExecStream.h"
00025 #include "fennel/lucidera/bitmap/LbmUnionExecStream.h"
00026 #include "fennel/lucidera/test/LbmExecStreamTestBase.h"
00027 
00028 #include <boost/test/test_tools.hpp>
00029 
00030 using namespace fennel;
00031 
00032 // declares an array on the stack and initializes it to a value
00033 #define DECLARE_CONSTANT_ARRAY(name, value, size) \
00034 FixedBuffer name[size]; \
00035 memset(name, value, size);
00036 
00037 // check whether code causes an exception
00038 #define CHECK_THROWN(code) \
00039 { \
00040     bool thrown = false; \
00041     try { \
00042         code; \
00043     } catch (...) { \
00044         thrown = true; \
00045     } \
00046     BOOST_CHECK(thrown); \
00047 }
00048 
00049 class LbmUnionExecStreamTest : public LbmExecStreamTestBase
00050 {
00051     boost::shared_array<PBuffer> ppBuffers;
00052     uint nBuffers;
00053 
00054     // allocate a byte buffer
00055     SharedByteBuffer allocateBuffer(uint nBuffers, uint bufferSize);
00056 
00057     // free the byte buffer
00058     void deleteBuffer();
00059 
00060     // run the byte buffer through a couple of tests
00061     void testBuffer(SharedByteBuffer pByteBuffer);
00062 
00063     // check the results stored in a merge area with a fixed buffer
00064     void verifyMerge(LbmUnionMergeArea area, PConstBuffer reference);
00065 
00066     // test the results of running union on a set of number streams
00067     void testUnion(uint nRows, std::vector<LbmNumberStreamInput> inputData);
00068 
00069 public:
00070     explicit LbmUnionExecStreamTest()
00071     {
00072         // NOTE: this test only works reliably for one page due to the
00073         // inconsistent tuple building process
00074         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testSinglePageBuffer);
00075         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testTwoPageBuffer);
00076         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testMultiPageBuffer);
00077         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testTwoInputs);
00078         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testThreeInputs);
00079         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testLargeRids);
00080         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testSparse);
00081         FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testGaps);
00082     }
00083 
00084     void testSinglePageBuffer();
00085     void testTwoPageBuffer();
00086     void testMultiPageBuffer();
00087 
00088     void testTwoInputs();
00089     void testThreeInputs();
00090     void testLargeRids();
00091     void testSparse();
00092     void testGaps();
00093 };
00094 
00095 void LbmUnionExecStreamTest::testSinglePageBuffer()
00096 {
00097     SharedByteBuffer pByteBuffer = allocateBuffer(1, 16);
00098     testBuffer(pByteBuffer);
00099     deleteBuffer();
00100 }
00101 
00102 void LbmUnionExecStreamTest::testTwoPageBuffer()
00103 {
00104     SharedByteBuffer pByteBuffer = allocateBuffer(2, 8);
00105     testBuffer(pByteBuffer);
00106     deleteBuffer();
00107 }
00108 
00109 void LbmUnionExecStreamTest::testMultiPageBuffer()
00110 {
00111     SharedByteBuffer pByteBuffer = allocateBuffer(4, 4);
00112     testBuffer(pByteBuffer);
00113     deleteBuffer();
00114 }
00115 
00116 void LbmUnionExecStreamTest::testTwoInputs()
00117 {
00118     uint nRows = 1000;
00119     std::vector<LbmNumberStreamInput> inputData;
00120 
00121     // evens from 20 .. 500
00122     LbmNumberStreamInput input1;
00123     input1.pStream =
00124         SharedNumberStream(new SkipNumberStream(20, 500, 2));
00125     input1.bitmapSize = 4;
00126     inputData.push_back(input1);
00127 
00128     // multiples of 3 from 300 .. 990
00129     LbmNumberStreamInput input2;
00130     input2.pStream =
00131         SharedNumberStream(new SkipNumberStream(300, 990, 3));
00132     input2.bitmapSize = 8;
00133     inputData.push_back(input2);
00134 
00135     testUnion(nRows, inputData);
00136 }
00137 
00138 void LbmUnionExecStreamTest::testThreeInputs()
00139 {
00140     uint nRows = 1051;
00141     std::vector<LbmNumberStreamInput> inputData;
00142 
00143     // multiples of 7 from 21 .. 700
00144     LbmNumberStreamInput input1;
00145     input1.pStream =
00146         SharedNumberStream(new SkipNumberStream(21, 700, 7));
00147     input1.bitmapSize = 4;
00148     inputData.push_back(input1);
00149 
00150     // multiples of 3 from 300 .. 990
00151     LbmNumberStreamInput input2;
00152     input2.pStream =
00153         SharedNumberStream(new SkipNumberStream(300, 990, 3));
00154     input2.bitmapSize = 8;
00155     inputData.push_back(input2);
00156 
00157     // multiples of 5 from 500 .. 1050
00158     LbmNumberStreamInput input3;
00159     input3.pStream =
00160         SharedNumberStream(new SkipNumberStream(500, 1050, 5));
00161     input3.bitmapSize = 8;
00162     inputData.push_back(input3);
00163 
00164     testUnion(nRows, inputData);
00165 }
00166 
00167 void LbmUnionExecStreamTest::testLargeRids()
00168 {
00169     uint nRows = 5001000;
00170     std::vector<LbmNumberStreamInput> inputData;
00171 
00172     // multiples of 4 from 5000020 .. 5000500
00173     LbmNumberStreamInput input1;
00174     input1.pStream =
00175         SharedNumberStream(new SkipNumberStream(5000020, 5000500, 4));
00176     input1.bitmapSize = 4;
00177     inputData.push_back(input1);
00178 
00179     // multiples of 5 from 5000300 .. 5000990
00180     LbmNumberStreamInput input2;
00181     input2.pStream =
00182         SharedNumberStream(new SkipNumberStream(5000300, 5000990, 5));
00183     input2.bitmapSize = 8;
00184     inputData.push_back(input2);
00185 
00186     testUnion(nRows, inputData);
00187 }
00188 
00189 void LbmUnionExecStreamTest::testSparse()
00190 {
00191     uint nRows = 2900;
00192     std::vector<LbmNumberStreamInput> inputData;
00193 
00194     // multiples of 13 from 26 .. 1300
00195     LbmNumberStreamInput input1;
00196     input1.pStream =
00197         SharedNumberStream(new SkipNumberStream(26, 1300, 13));
00198     input1.bitmapSize = 4;
00199     inputData.push_back(input1);
00200 
00201     // multiples of 17 from 340 ... 1700
00202     LbmNumberStreamInput input2;
00203     input2.pStream =
00204         SharedNumberStream(new SkipNumberStream(340, 1700, 17));
00205     input2.bitmapSize = 8;
00206     inputData.push_back(input2);
00207 
00208     // multiples of 11 from 1100 .. 2200
00209     LbmNumberStreamInput input3;
00210     input3.pStream =
00211         SharedNumberStream(new SkipNumberStream(1100, 2200, 11));
00212     input3.bitmapSize = 8;
00213     inputData.push_back(input3);
00214 
00215     testUnion(nRows, inputData);
00216 }
00217 
00218 void LbmUnionExecStreamTest::testGaps()
00219 {
00220     uint nRows = 2000;
00221     std::vector<LbmNumberStreamInput> inputData;
00222 
00223     // multiples of 13 from 26 .. 520
00224     LbmNumberStreamInput input1;
00225     input1.pStream =
00226         SharedNumberStream(new SkipNumberStream(26, 520, 13));
00227     input1.bitmapSize = 4;
00228     inputData.push_back(input1);
00229 
00230     // multiples of 17 from 680 .. 1020
00231     LbmNumberStreamInput input2;
00232     input2.pStream =
00233         SharedNumberStream(new SkipNumberStream(680, 1020, 17));
00234     input2.bitmapSize = 8;
00235     inputData.push_back(input2);
00236 
00237     // multiples of 11 from 1199 ..
00238     LbmNumberStreamInput input3;
00239     input3.pStream =
00240         SharedNumberStream(new SkipNumberStream(1320, 1540, 11));
00241     input3.bitmapSize = 8;
00242     inputData.push_back(input3);
00243 
00244     // multiples of 19 from 3800 .. 7600
00245     LbmNumberStreamInput input4;
00246     input4.pStream =
00247         SharedNumberStream(new SkipNumberStream(1330, 1900, 11));
00248     input4.bitmapSize = 8;
00249     inputData.push_back(input4);
00250 
00251     testUnion(nRows, inputData);
00252 }
00253 
00254 SharedByteBuffer LbmUnionExecStreamTest::allocateBuffer(
00255     uint nBuffers, uint bufferSize)
00256 {
00257     ppBuffers.reset(new PBuffer[nBuffers]);
00258     this->nBuffers = nBuffers;
00259     for (uint i = 0; i < nBuffers; i++) {
00260         ppBuffers[i] = new FixedBuffer[bufferSize];
00261     }
00262 
00263     SharedByteBuffer pByteBuffer(new ByteBuffer());
00264     pByteBuffer->init(ppBuffers, nBuffers, bufferSize);
00265     return pByteBuffer;
00266 }
00267 
00268 void LbmUnionExecStreamTest::deleteBuffer()
00269 {
00270     for (uint i = 0; i < nBuffers; i++) {
00271         delete [] ppBuffers[i];
00272     }
00273 }
00274 
00275 void LbmUnionExecStreamTest::testBuffer(SharedByteBuffer pByteBuffer)
00276 {
00277     DECLARE_CONSTANT_ARRAY(zeroes, 0, 16);
00278     DECLARE_CONSTANT_ARRAY(ones, 1, 16);
00279     DECLARE_CONSTANT_ARRAY(twos, 2, 16);
00280     DECLARE_CONSTANT_ARRAY(fours, 4, 16);
00281     DECLARE_CONSTANT_ARRAY(maxByte, 255, 16);
00282 
00283     LbmUnionMergeArea mergeArea;
00284     mergeArea.init(pByteBuffer);
00285 
00286     // merge area should be able to use arbitrary offsets and zero values
00287     // 0000 0011 1111 11xx
00288     mergeArea.advance(10000);
00289     mergeArea.mergeMem(10006, ones, 8);
00290     FixedBuffer result0[16] = { 0,0,0,0, 0,0,1,1, 1,1,1,1, 1,1 };
00291     verifyMerge(mergeArea, result0);
00292 
00293     // xxxx x011 1111 11xx
00294     mergeArea.advance(10005);
00295     FixedBuffer result1[16] = { 0,1,1, 1,1,1,1, 1,1 };
00296     verifyMerge(mergeArea, result1);
00297 
00298     // merge area should wrap around
00299     // 2222 2011 3333 3322
00300     mergeArea.mergeMem(10008, twos, 13);
00301     FixedBuffer result2[16] = { 0,1,1, 3,3,3,3, 3,3,2,2, 2,2,2,2, 2 };
00302     verifyMerge(mergeArea, result2);
00303 
00304     // 2222 2xx1 3333 3322
00305     mergeArea.advance(10007);
00306     FixedBuffer result3[16] = { 1, 3,3,3,3, 3,3,2,2, 2,2,2,2, 2 };
00307     verifyMerge(mergeArea, result3);
00308 
00309     // 6662 2xx1 3377 7766
00310     mergeArea.mergeMem(10010, fours, 9);
00311     FixedBuffer result4[16] = { 1, 3,3,7,7, 7,7,6,6, 6,6,6,2, 2 };
00312     verifyMerge(mergeArea, result4);
00313 
00314     // 0011 1122 2244 4400
00315     mergeArea.advance(10030);
00316     mergeArea.mergeMem(10040, fours, 4);
00317     mergeArea.mergeMem(10036, twos, 4);
00318     mergeArea.mergeMem(10032, ones, 4);
00319     mergeArea.mergeMem(10044, zeroes, 2);
00320     FixedBuffer result5[16] = { 0,0,1,1, 1,1,2,2, 2,2,4,4, 4,4,0,0 };
00321     verifyMerge(mergeArea, result5);
00322 
00323     // index too low
00324     mergeArea.advance(10032);
00325     // CHECK_THROWN(mergeArea.getByte(10031);
00326 
00327     // index too high
00328     // CHECK_THROWN(mergeArea.getByte(10046));
00329 
00330     // can't write past bound
00331     // CHECK_THROWN(mergeArea.mergeMem(10048, zeroes, 2));
00332 
00333     // can't go backwards
00334     // CHECK_THROWN(mergeArea.advance(10031));
00335 
00336     // this should be ok
00337     mergeArea.advance(10032);
00338 }
00339 
00340 void LbmUnionExecStreamTest::verifyMerge(
00341     LbmUnionMergeArea area, PConstBuffer reference)
00342 {
00343     LbmByteNumberPrimitive start = area.getStart();
00344     uint size = opaqueToInt(area.getEnd() - start);
00345 
00346     for (uint i = 0; i < size; i++) {
00347         BOOST_CHECK_EQUAL(area.getByte(start + i), reference[i]);
00348     }
00349 }
00350 
00351 void LbmUnionExecStreamTest::testUnion(
00352     uint nRows, std::vector<LbmNumberStreamInput> inputData)
00353 {
00354     uint nInputs = inputData.size();
00355 
00356     // set up expected result (with fresh copies of input data)
00357     UnionNumberStream *pUnion = new UnionNumberStream();
00358     for (uint i = 0; i < nInputs; i++) {
00359         SharedNumberStream pChild(inputData[i].pStream->clone());
00360         pUnion->addChild(pChild);
00361     }
00362     LbmNumberStreamInput expectedData;
00363     expectedData.pStream = SharedNumberStream(pUnion);
00364     expectedData.bitmapSize = resultBitmapSize(nRows);
00365 
00366     // + 1 is for the precalculated results buffer
00367     boost::scoped_array<BitmapInput> bmInputs;
00368     bmInputs.reset(new BitmapInput[nInputs + 1]);
00369 
00370     // combine inputs into a single values exec stream
00371     uint totalSize = 0;
00372     uint totalBitmaps = 0;
00373     for (uint i = 0; i < nInputs; i++) {
00374         initBitmapInput(bmInputs[i], nRows, inputData[i]);
00375         totalSize += bmInputs[i].currBufSize;
00376         totalBitmaps += bmInputs[i].nBitmaps;
00377     }
00378 
00379     BitmapInput bmCombined;
00380     bmCombined.bufArray.reset(new FixedBuffer[totalSize]);
00381     bmCombined.currBufSize = bmCombined.fullBufSize = totalSize;
00382     bmCombined.nBitmaps = totalBitmaps;
00383     PBuffer pCurrent = bmCombined.bufArray.get();
00384     for (uint i = 0; i < nInputs; i++) {
00385         memcpy(pCurrent, bmInputs[i].bufArray.get(), bmInputs[i].currBufSize);
00386         pCurrent += bmInputs[i].currBufSize;
00387     }
00388 
00389     ValuesExecStreamParams valuesParams;
00390     ExecStreamEmbryo valuesStreamEmbryo;
00391     initValuesExecStream(
00392         0, valuesParams, valuesStreamEmbryo, bmCombined);
00393 
00394     // set up precalculated result buffer
00395     initBitmapInput(bmInputs[nInputs], nRows, expectedData);
00396 
00397     // build values -> chopper -> sorter -> union transforms
00398     std::vector<ExecStreamEmbryo> transformEmbryoList;
00399 
00400     LbmChopperExecStreamParams chopperParams;
00401     chopperParams.ridLimitParamId = DynamicParamId(1);
00402     chopperParams.outputTupleDesc = bitmapTupleDesc;
00403     chopperParams.scratchAccessor.pSegment = pRandomSegment;
00404     chopperParams.scratchAccessor.pCacheAccessor = pCache;
00405     ExecStreamEmbryo chopperEmbryo;
00406     chopperEmbryo.init(new LbmChopperExecStream(), chopperParams);
00407     chopperEmbryo.getStream()->setName("ChopperExecStream");
00408     transformEmbryoList.push_back(chopperEmbryo);
00409 
00410     ExternalSortExecStreamParams sortParams;
00411     ExecStreamEmbryo sortEmbryo;
00412     initSorterExecStream(sortParams, sortEmbryo, bitmapTupleDesc);
00413     transformEmbryoList.push_back(sortEmbryo);
00414 
00415     LbmUnionExecStreamParams unionParams;
00416     unionParams.maxRid = (LcsRid) 0;
00417     unionParams.ridLimitParamId = DynamicParamId(1);
00418     unionParams.startRidParamId = DynamicParamId(0);
00419     unionParams.segmentLimitParamId = DynamicParamId(0);
00420     unionParams.outputTupleDesc = bitmapTupleDesc;
00421     unionParams.scratchAccessor =
00422         pSegmentFactory->newScratchSegment(pCache, 10);
00423     ExecStreamEmbryo unionEmbryo;
00424     unionEmbryo.init(new LbmUnionExecStream(), unionParams);
00425     unionEmbryo.getStream()->setName("UnionExecStream");
00426     transformEmbryoList.push_back(unionEmbryo);
00427 
00428     SharedExecStream pOutputStream = prepareTransformGraph(
00429         valuesStreamEmbryo, transformEmbryoList);
00430 
00431     if (bmInputs[nInputs].bufArray.get()) {
00432         bitmapTupleAccessor.setCurrentTupleBuf(
00433             bmInputs[nInputs].bufArray.get());
00434     }
00435     verifyBufferedOutput(
00436         *pOutputStream, bitmapTupleDesc, bmInputs[nInputs].nBitmaps,
00437         bmInputs[nInputs].bufArray.get());
00438 }
00439 
00440 FENNEL_UNIT_TEST_SUITE(LbmUnionExecStreamTest);
00441 
00442 // End LbmUnionExecStreamTest.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1