LhxJoinExecStreamTest.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/test/LhxJoinExecStreamTest.cpp#3 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/hashexe/LhxJoinExecStream.h"
00026 #include "fennel/sorter/ExternalSortExecStream.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 #include "fennel/exec/MockProducerExecStream.h"
00029 #include "fennel/exec/ExecStreamEmbryo.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/exec/ExecStreamGraph.h"
00032 #include "fennel/cache/Cache.h"
00033 
00034 #include <boost/test/test_tools.hpp>
00035 
00036 using namespace fennel;
00037 
00038 class LhxJoinExecStreamTest : public ExecStreamUnitTestBase
00039 {
00040     void testSequentialImpl(
00041         uint numRows,
00042         uint forcePartitionLevel,
00043         bool enableJoinFilter,
00044         bool enableSubPartStat);
00045 
00046     void testDupImpl(
00047         uint numRows,
00048         uint cndKeyLeft,
00049         uint cndKeyRight,
00050         uint forcePartitionLevel,
00051         bool enableJoinFilter,
00052         bool enableSubPartStat,
00053         bool needSort,
00054         bool fakeInterrupt);
00055 
00056     void testImpl(
00057         uint numInputRows,
00058         uint keyCount,
00059         uint cndKeys,
00060         uint numResultRows,
00061         TupleDescriptor &inputDesc,
00062         TupleDescriptor &outputDesc,
00063         TupleProjection &outputProj,
00064         SharedMockProducerExecStreamGenerator pLeftGenerator,
00065         SharedMockProducerExecStreamGenerator pRightGenerator,
00066         CompositeExecStreamGenerator &verifier,
00067         uint forcePartitionLevel,
00068         bool enableJoinFilter,
00069         bool enableSubPartStat,
00070         bool needSort,
00071         bool fakeInterrupt);
00072 
00073 public:
00074     explicit LhxJoinExecStreamTest()
00075     {
00076         FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testSequential);
00077         FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testDup1);
00078         FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testDup2);
00079         FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testConst);
00080 
00081 /*
00082         FENNEL_UNIT_TEST_CASE(
00083             LhxJoinExecStreamTest,
00084             testSequentialPartition);
00085         FENNEL_UNIT_TEST_CASE(
00086             LhxJoinExecStreamTest,
00087             testDup1Partition);
00088         FENNEL_UNIT_TEST_CASE(
00089             LhxJoinExecStreamTest,
00090             testDup2Partition);
00091         FENNEL_UNIT_TEST_CASE(
00092             LhxJoinExecStreamTest,
00093             testConstPartition);
00094 
00095         FENNEL_UNIT_TEST_CASE(
00096             LhxJoinExecStreamTest,
00097             testSequentialPartitionFilter);
00098         FENNEL_UNIT_TEST_CASE(
00099             LhxJoinExecStreamTest,
00100             testDup1PartitionFilter);
00101         FENNEL_UNIT_TEST_CASE(
00102             LhxJoinExecStreamTest,
00103             testDup2PartitionFilter);
00104 
00105         FENNEL_UNIT_TEST_CASE(
00106             LhxJoinExecStreamTest,
00107             testSequentialPartitionStat);
00108         FENNEL_UNIT_TEST_CASE(
00109             LhxJoinExecStreamTest,
00110             testDup1PartitionStat);
00111         FENNEL_UNIT_TEST_CASE(
00112             LhxJoinExecStreamTest,
00113             testDup2PartitionStat);
00114         FENNEL_UNIT_TEST_CASE(
00115             LhxJoinExecStreamTest,
00116             testConstPartitionStat);
00117 */
00118 
00119         FENNEL_UNIT_TEST_CASE(
00120             LhxJoinExecStreamTest,
00121             testSequentialPartitionFilterStat);
00122         FENNEL_UNIT_TEST_CASE(
00123             LhxJoinExecStreamTest,
00124             testDup1PartitionFilterStat);
00125         FENNEL_UNIT_TEST_CASE(
00126             LhxJoinExecStreamTest,
00127             testDup2PartitionFilterStat);
00128         FENNEL_UNIT_TEST_CASE(
00129             LhxJoinExecStreamTest,
00130             testConstPartitionFilterStat);
00131 
00132         FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testConstCleanup);
00133     }
00134 
00135     /*
00136      * Match two identical sets.
00137      */
00138     void testSequential();
00139 
00140     void testSequentialPartition();
00141     void testSequentialPartitionFilter();
00142     void testSequentialPartitionStat();
00143     void testSequentialPartitionFilterStat();
00144 
00145     /*
00146      * Match these two sets:
00147      *  left:  0, 0  0, .. 1, 1, 1, .. 2, 2, 2, ..
00148      * right:  0, 0, .. 1, 1, .. 2, 2, .. 3, 3, ..
00149      *
00150      * result: 0, 0, 0, 0, 0, 0, .. 1, 1, 1, 1, 1, 1, .. 2, 2, 2, 2, 2, 2, ..
00151      */
00152     void testDup1();
00153 
00154     void testDup1Partition();
00155     void testDup1PartitionFilter();
00156     void testDup1PartitionStat();
00157     void testDup1PartitionFilterStat();
00158 
00159     /*
00160      * Match these two sets:
00161      *  left:  0, 0, .. 1, 1, .. 2, 2, .. 3, 3, ..
00162      * right:  0, 0  0, .. 1, 1, 1, .. 2, 2, 2, ..
00163      *
00164      * result: 0, 0, 0, 0, 0, 0, .. 1, 1, 1, 1, 1, 1, .. 2, 2, 2, 2, 2, 2, ..
00165      */
00166     void testDup2();
00167 
00168     void testDup2Partition();
00169     void testDup2PartitionFilter();
00170     void testDup2PartitionStat();
00171     void testDup2PartitionFilterStat();
00172 
00173     /*
00174      * Match these two sets:
00175      *  left:  0, 0  0, .. 0, 0, 0, .. 0, 0, 0, ..
00176      * right:  0, 0, .. 1, 1, .. 2, 2, .. 3, 3, ..
00177      *
00178      * result: 0, 0, 0, .. 0, 0, 0, .. 0, 0, 0, ..
00179      */
00180     void testConst();
00181     void testConstPartition();
00182     void testConstPartitionStat();
00183     void testConstPartitionFilterStat();
00184     void testConstCleanup();
00185 };
00186 
00187 void LhxJoinExecStreamTest::testSequential()
00188 {
00189     testSequentialImpl(1000, 0, true, true);
00190 }
00191 
00192 void LhxJoinExecStreamTest::testSequentialPartition()
00193 {
00194     testSequentialImpl(1000, 2, false, false);
00195 }
00196 
00197 void LhxJoinExecStreamTest::testSequentialPartitionFilter()
00198 {
00199     testSequentialImpl(1000, 2, true, false);
00200 }
00201 
00202 void LhxJoinExecStreamTest::testSequentialPartitionStat()
00203 {
00204     testSequentialImpl(1000, 2, false, true);
00205 }
00206 
00207 void LhxJoinExecStreamTest::testSequentialPartitionFilterStat()
00208 {
00209     testSequentialImpl(1000, 2, true, true);
00210 }
00211 
00212 void  LhxJoinExecStreamTest::testDup1()
00213 {
00214     testDupImpl(960, 16, 60, 0, false, false, false, false);
00215 }
00216 
00217 void  LhxJoinExecStreamTest::testDup1Partition()
00218 {
00219     testDupImpl(960, 16, 60, 2, false, false, true, false);
00220 }
00221 
00222 void  LhxJoinExecStreamTest::testDup1PartitionFilter()
00223 {
00224     testDupImpl(960, 16, 60, 2, true, false, true, false);
00225 }
00226 
00227 void  LhxJoinExecStreamTest::testDup1PartitionStat()
00228 {
00229     testDupImpl(960, 16, 60, 2, false, true, true, false);
00230 }
00231 
00232 void  LhxJoinExecStreamTest::testDup1PartitionFilterStat()
00233 {
00234     testDupImpl(960, 16, 60, 2, true, true, true, false);
00235 }
00236 
00237 void  LhxJoinExecStreamTest::testDup2()
00238 {
00239     testDupImpl(960, 60, 16, 0, false, false, false, false);
00240 }
00241 
00242 void  LhxJoinExecStreamTest::testDup2Partition()
00243 {
00244     testDupImpl(960, 60, 16, 2, false, false, true, false);
00245 }
00246 
00247 void  LhxJoinExecStreamTest::testDup2PartitionFilter()
00248 {
00249     testDupImpl(960, 60, 16, 2, true, false, true, false);
00250 }
00251 
00252 void  LhxJoinExecStreamTest::testDup2PartitionStat()
00253 {
00254     testDupImpl(960, 60, 16, 2, false, true, true, false);
00255 }
00256 
00257 void  LhxJoinExecStreamTest::testDup2PartitionFilterStat()
00258 {
00259     testDupImpl(960, 60, 16, 2, true, true, true, false);
00260 }
00261 
00262 void  LhxJoinExecStreamTest::testConst()
00263 {
00264     testDupImpl(960,  1, 60, 0, false, false, false, false);
00265 }
00266 
00267 void  LhxJoinExecStreamTest::testConstPartition()
00268 {
00269     testDupImpl(960,  1, 60, 2, false, false, false, false);
00270 }
00271 
00272 void  LhxJoinExecStreamTest::testConstPartitionStat()
00273 {
00274     testDupImpl(960,  1, 60, 2, false, true, false, false);
00275 }
00276 
00277 void  LhxJoinExecStreamTest::testConstPartitionFilterStat()
00278 {
00279     testDupImpl(960,  1, 60, 2, true, true, false, false);
00280 }
00281 
00282 void  LhxJoinExecStreamTest::testConstCleanup()
00283 {
00284     /*
00285      * Fake interrupt to exercise temp seg clean up code.
00286      */
00287     testDupImpl(960,  1, 60, 2, false, false, false, true);
00288 }
00289 
00290 void LhxJoinExecStreamTest::testSequentialImpl(
00291     uint numRows,
00292     uint forcePartitionLevel,
00293     bool enableJoinFilter,
00294     bool enableSubPartStat)
00295 {
00296     uint numColsLeft;
00297     uint numColsRight;
00298     numColsRight = numColsLeft = 1;
00299     uint keyCount = 1;
00300     uint cndKeys = numRows;
00301 
00302     assert (keyCount <= numColsRight && keyCount <= numColsLeft);
00303 
00304     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00305         leftColumnGenerators;
00306     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00307         rightColumnGenerators;
00308     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00309         outColumnGenerators;
00310 
00311     StandardTypeDescriptorFactory stdTypeFactory;
00312     TupleAttributeDescriptor attrDesc(
00313         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00314 
00315     TupleDescriptor inputDesc;
00316     TupleDescriptor outputDesc;
00317     TupleProjection outputProj;
00318 
00319     uint i;
00320 
00321     for (i = 0; i < numColsLeft; i++) {
00322         leftColumnGenerators.push_back(
00323             SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00324         /*
00325          * The two inputs have identical tuple descriptors.
00326          */
00327         inputDesc.push_back(attrDesc);
00328 
00329         /*
00330          * The result row has cols from both inputs.
00331          */
00332         outColumnGenerators.push_back(
00333             SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00334         outputDesc.push_back(attrDesc);
00335         outputProj.push_back(i);
00336     }
00337 
00338     for (; i < numColsLeft + numColsRight; i++) {
00339         rightColumnGenerators.push_back(
00340             SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00341 
00342         /*
00343          * The result row has cols from both inputs.
00344          */
00345         outColumnGenerators.push_back(
00346             SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00347         outputDesc.push_back(attrDesc);
00348         outputProj.push_back(i);
00349     }
00350 
00351     SharedMockProducerExecStreamGenerator pLeftGenerator(
00352         new CompositeExecStreamGenerator(leftColumnGenerators));
00353 
00354     SharedMockProducerExecStreamGenerator pRightGenerator(
00355         new CompositeExecStreamGenerator(rightColumnGenerators));
00356 
00357     CompositeExecStreamGenerator verifier(outColumnGenerators);
00358 
00359     bool needSort = (forcePartitionLevel > 0) ? true : false;
00360     bool fakeInterrupt = false;
00361 
00362     testImpl(
00363         numRows, keyCount, cndKeys, numRows, inputDesc, outputDesc,
00364         outputProj, pLeftGenerator, pRightGenerator, verifier,
00365         forcePartitionLevel, enableJoinFilter, enableSubPartStat,
00366         needSort, fakeInterrupt);
00367 }
00368 
00369 void LhxJoinExecStreamTest::testDupImpl(
00370     uint numRows,
00371     uint cndKeyLeft,
00372     uint cndKeyRight,
00373     uint forcePartitionLevel,
00374     bool enableJoinFilter,
00375     bool enableSubPartStat,
00376     bool needSort,
00377     bool fakeInterrupt)
00378 {
00379     assert (!fakeInterrupt || !needSort);
00380 
00381     uint numColsLeft;
00382     uint numColsRight;
00383     numColsRight = numColsLeft = 2;
00384     uint keyCount = 1;
00385     uint cndKeys;
00386 
00387     assert (keyCount <= numColsRight && keyCount <= numColsLeft);
00388 
00389     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00390         leftColumnGenerators;
00391     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00392         rightColumnGenerators;
00393     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00394         outColumnGenerators;
00395 
00396     StandardTypeDescriptorFactory stdTypeFactory;
00397     TupleAttributeDescriptor attrDesc(
00398         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00399 
00400     TupleDescriptor inputDesc;
00401     TupleDescriptor outputDesc;
00402     TupleProjection outputProj;
00403 
00404     uint i;
00405 
00406     for (i = 0; i < numColsLeft; i++) {
00407         leftColumnGenerators.push_back(
00408             SharedInt64ColumnGenerator(new
00409                 DupColumnGenerator(numRows / cndKeyLeft)));
00410         outColumnGenerators.push_back(
00411             SharedInt64ColumnGenerator(new
00412                 DupColumnGenerator(numRows*numRows/cndKeyLeft/cndKeyRight)));
00413 
00414         inputDesc.push_back(attrDesc);
00415         outputDesc.push_back(attrDesc);
00416         outputProj.push_back(i);
00417     }
00418 
00419     for (; i < numColsLeft + numColsRight; i++) {
00420         rightColumnGenerators.push_back(
00421             SharedInt64ColumnGenerator(new
00422                 DupColumnGenerator(numRows / cndKeyRight)));
00423         outColumnGenerators.push_back(
00424             SharedInt64ColumnGenerator(new
00425                 DupColumnGenerator(numRows*numRows/cndKeyLeft/cndKeyRight)));
00426 
00427         outputDesc.push_back(attrDesc);
00428         outputProj.push_back(i);
00429     }
00430 
00431     cndKeys = cndKeyRight;
00432 
00433     SharedMockProducerExecStreamGenerator pLeftGenerator(
00434         new CompositeExecStreamGenerator(leftColumnGenerators));
00435 
00436     SharedMockProducerExecStreamGenerator pRightGenerator(
00437         new CompositeExecStreamGenerator(rightColumnGenerators));
00438 
00439     CompositeExecStreamGenerator verifier(outColumnGenerators);
00440 
00441     uint numResRows = (cndKeyLeft > cndKeyRight) ?
00442         (numRows * numRows / cndKeyLeft) :
00443         (numRows * numRows / cndKeyRight);
00444 
00445     testImpl(
00446         numRows, keyCount, cndKeys, numResRows, inputDesc, outputDesc,
00447         outputProj, pLeftGenerator, pRightGenerator, verifier,
00448         forcePartitionLevel, enableJoinFilter, enableSubPartStat, needSort,
00449         fakeInterrupt);
00450 }
00451 
00452 void LhxJoinExecStreamTest::testImpl(
00453     uint numInputRows, uint keyCount, uint cndKeys, uint numResultRows,
00454     TupleDescriptor &inputDesc, TupleDescriptor &outputDesc,
00455     TupleProjection &outputProj,
00456     SharedMockProducerExecStreamGenerator pLeftGenerator,
00457     SharedMockProducerExecStreamGenerator pRightGenerator,
00458     CompositeExecStreamGenerator &verifier,
00459     uint forcePartitionLevel, bool enableJoinFilter, bool enableSubPartStat,
00460     bool needSort, bool fakeInterrupt)
00461 {
00462     TupleProjection leftKeyProj;
00463     TupleProjection rightKeyProj;
00464 
00465     /*
00466      * Construct left and right input.
00467      */
00468     MockProducerExecStreamParams mockParams;
00469     mockParams.outputTupleDesc = inputDesc;
00470     mockParams.nRows = numInputRows;
00471 
00472     mockParams.pGenerator = pLeftGenerator;
00473     ExecStreamEmbryo leftInputStreamEmbryo;
00474     leftInputStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00475     leftInputStreamEmbryo.getStream()->setName("LeftInputExecStream");
00476 
00477     /*
00478      * The left and the right inputs are identical.
00479      */
00480     mockParams.pGenerator = pRightGenerator;
00481     ExecStreamEmbryo rightInputStreamEmbryo;
00482     rightInputStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00483     rightInputStreamEmbryo.getStream()->setName("RightInputExecStream");
00484 
00485     /*
00486      * Construct the join node.
00487      */
00488     LhxJoinExecStreamParams joinParams;
00489     /*
00490      * Fields in LhxJoinExecStreamParams
00491      */
00492     joinParams.leftInner     = true;
00493     joinParams.leftOuter     = false;
00494     joinParams.rightInner    = true;
00495     joinParams.rightOuter    = false;
00496 
00497     joinParams.setopAll      = false;
00498     joinParams.setopDistinct = false;
00499 
00500     joinParams.forcePartitionLevel = forcePartitionLevel;
00501     joinParams.enableJoinFilter = enableJoinFilter;
00502     joinParams.enableSubPartStat = enableSubPartStat;
00503     joinParams.enableSwing = true;
00504 
00505     joinParams.outputProj = outputProj;
00506     joinParams.cndKeys = cndKeys;
00507     joinParams.numRows = numInputRows;
00508 
00509     for (int i = 0; i < keyCount; i ++) {
00510         joinParams.leftKeyProj.push_back(i);
00511         joinParams.rightKeyProj.push_back(i);
00512     }
00513 
00514     /*
00515      * Fields in SingleOutputExecStreamParams
00516      */
00517     joinParams.outputTupleDesc = outputDesc;
00518     /*
00519      * Fields in ExecStreamParams
00520      */
00521     joinParams.pCacheAccessor = pCache;
00522     int cacheSize = 100;
00523     joinParams.scratchAccessor =
00524         pSegmentFactory->newScratchSegment(pCache, cacheSize);
00525     joinParams.pTempSegment = pRandomSegment;
00526 
00527     ExecStreamEmbryo joinStreamEmbryo;
00528     joinStreamEmbryo.init(new LhxJoinExecStream(),joinParams);
00529     joinStreamEmbryo.getStream()->setName("LhxJoinExecStream");
00530 
00531     SharedExecStream pOutputStream;
00532 
00533     if (needSort) {
00534         ExternalSortExecStreamParams sortParams;
00535         sortParams.outputTupleDesc = outputDesc;
00536         sortParams.distinctness = DUP_ALLOW;
00537         sortParams.pTempSegment = pRandomSegment;
00538         sortParams.pCacheAccessor = pCache;
00539         sortParams.scratchAccessor =
00540             pSegmentFactory->newScratchSegment(pCache, 10);
00541         sortParams.keyProj.push_back(0);
00542         sortParams.storeFinalRun = false;
00543         sortParams.estimatedNumRows = MAXU;
00544         sortParams.earlyClose = false;
00545         ExecStreamEmbryo sortStreamEmbryo;
00546         sortStreamEmbryo.init(
00547             ExternalSortExecStream::newExternalSortExecStream(),sortParams);
00548         sortStreamEmbryo.getStream()->setName("ExternalSortExecStream");
00549 
00550         pOutputStream = prepareConfluenceTransformGraph(
00551             leftInputStreamEmbryo, rightInputStreamEmbryo, joinStreamEmbryo,
00552             sortStreamEmbryo);
00553     } else {
00554         pOutputStream = prepareConfluenceGraph(
00555             leftInputStreamEmbryo, rightInputStreamEmbryo, joinStreamEmbryo);
00556     }
00557 
00558     // after partitioning the order might not be the same as the input, so add
00559     // a sort before verifying the output
00560 
00561     verifyOutput(
00562         *pOutputStream,
00563         fakeInterrupt ? 1 : numResultRows,
00564         verifier,
00565         fakeInterrupt);
00566 
00567     if (fakeInterrupt) {
00568         // simulate error cleanup
00569         pScheduler->stop();
00570         pGraph->close();
00571     }
00572 
00573     BOOST_CHECK_EQUAL(0, pRandomSegment->getAllocatedSizeInPages());
00574 }
00575 
00576 FENNEL_UNIT_TEST_SUITE(LhxJoinExecStreamTest);
00577 
00578 // End LhxJoinExecStreamTest.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1