LhxAggExecStreamTest.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/test/LhxAggExecStreamTest.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/hashexe/LhxAggExecStream.h"
00026 #include "fennel/sorter/ExternalSortExecStream.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 #include "fennel/exec/MockProducerExecStream.h"
00029 #include "fennel/exec/ExecStreamEmbryo.h"
00030 #include "fennel/cache/Cache.h"
00031 
00032 #include <boost/test/test_tools.hpp>
00033 
00034 using namespace fennel;
00035 
00036 class LhxAggExecStreamTest : public ExecStreamUnitTestBase
00037 {
00038     void testCountImpl(uint forcePartitionLevel);
00039     void testSumImpl(uint forcePartitionLevel);
00040     void testGroupCountImpl(uint forcePartitionLevel);
00041     void testSingleValueImpl(uint forcePartitionLevel);
00042 
00043 public:
00044     explicit LhxAggExecStreamTest()
00045     {
00046         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testCount);
00047         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSum);
00048         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testGroupCount);
00049         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSingleValue);
00050         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testCountPartition);
00051         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSumPartition);
00052         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testGroupCountPartition);
00053         FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSingleValuePartition);
00054     }
00055 
00056     void testCount();
00057     void testCountPartition();
00058 
00059     void testSum();
00060     void testSumPartition();
00061 
00062     void testGroupCount();
00063     void testGroupCountPartition();
00064 
00065     void testSingleValue();
00066     void testSingleValuePartition();
00067 };
00068 
00069 void LhxAggExecStreamTest::testCount()
00070 {
00071     testCountImpl(0);
00072 }
00073 
00074 void LhxAggExecStreamTest::testCountPartition()
00075 {
00076     testCountImpl(2);
00077 }
00078 
00079 void LhxAggExecStreamTest::testSum()
00080 {
00081     testSumImpl(0);
00082 }
00083 
00084 void LhxAggExecStreamTest::testSumPartition()
00085 {
00086     testSumImpl(2);
00087 }
00088 
00089 void LhxAggExecStreamTest::testGroupCount()
00090 {
00091     testGroupCountImpl(0);
00092 }
00093 
00094 void LhxAggExecStreamTest::testGroupCountPartition()
00095 {
00096     testGroupCountImpl(2);
00097 }
00098 
00099 void LhxAggExecStreamTest::testSingleValue()
00100 {
00101     testSingleValueImpl(0);
00102 }
00103 
00104 void LhxAggExecStreamTest::testSingleValuePartition()
00105 {
00106     testSingleValueImpl(2);
00107 }
00108 
00109 void LhxAggExecStreamTest::testCountImpl(uint forcePartitionLevel)
00110 {
00111     StandardTypeDescriptorFactory stdTypeFactory;
00112     TupleAttributeDescriptor attrDesc(
00113         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00114 
00115     uint numRows = 10000;
00116 
00117     MockProducerExecStreamParams mockParams;
00118     mockParams.outputTupleDesc.push_back(attrDesc);
00119     mockParams.nRows = numRows;   // at least two buffers
00120 
00121     ExecStreamEmbryo mockStreamEmbryo;
00122     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00123     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00124 
00125     // simulate SELECT COUNT(*) FROM t10k
00126     LhxAggExecStreamParams aggParams;
00127     aggParams.groupByKeyCount = 0;
00128     aggParams.outputTupleDesc.push_back(attrDesc);
00129     AggInvocation countInvocation;
00130     countInvocation.aggFunction = AGG_FUNC_COUNT;
00131     countInvocation.iInputAttr = -1; // interpreted as COUNT(*)
00132     aggParams.aggInvocations.push_back(countInvocation);
00133 
00134     aggParams.pCacheAccessor = pCache;
00135     aggParams.scratchAccessor =
00136         pSegmentFactory->newScratchSegment(pCache, 100);
00137     aggParams.pTempSegment = pRandomSegment;
00138     aggParams.cndGroupByKeys = numRows;
00139     aggParams.numRows = numRows;
00140     aggParams.forcePartitionLevel = forcePartitionLevel;
00141     aggParams.enableSubPartStat = true;
00142 
00143     ExecStreamEmbryo aggStreamEmbryo;
00144     aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00145     aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00146 
00147     SharedExecStream pOutputStream = prepareTransformGraph(
00148         mockStreamEmbryo,aggStreamEmbryo);
00149 
00150     // set up a generator which can produce the expected output
00151     // (a count of 10000)
00152     RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00153 
00154     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00155 }
00156 
00157 void LhxAggExecStreamTest::testSumImpl(uint forcePartitionLevel)
00158 {
00159     StandardTypeDescriptorFactory stdTypeFactory;
00160     TupleAttributeDescriptor attrDesc(
00161         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00162 
00163     uint numRows = 10000;
00164     MockProducerExecStreamParams mockParams;
00165     mockParams.outputTupleDesc.push_back(attrDesc);
00166     mockParams.nRows = numRows;   // at least two buffers
00167     mockParams.pGenerator.reset(new RampExecStreamGenerator());
00168 
00169     ExecStreamEmbryo mockStreamEmbryo;
00170     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00171     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00172 
00173     // simulate SELECT SUM(x) FROM t10k with x iterating from 0 to 9999
00174     LhxAggExecStreamParams aggParams;
00175     aggParams.groupByKeyCount = 0;
00176     attrDesc.isNullable = true;
00177     aggParams.outputTupleDesc.push_back(attrDesc);
00178     AggInvocation sumInvocation;
00179     sumInvocation.aggFunction = AGG_FUNC_SUM;
00180     sumInvocation.iInputAttr = 0;
00181     aggParams.aggInvocations.push_back(sumInvocation);
00182 
00183     aggParams.pCacheAccessor = pCache;
00184     aggParams.scratchAccessor =
00185         pSegmentFactory->newScratchSegment(pCache, 100);
00186     aggParams.pTempSegment = pRandomSegment;
00187     aggParams.cndGroupByKeys = numRows;
00188     aggParams.numRows = numRows;
00189     aggParams.forcePartitionLevel = forcePartitionLevel;
00190     aggParams.enableSubPartStat = true;
00191 
00192     ExecStreamEmbryo aggStreamEmbryo;
00193     aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00194     aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00195 
00196     SharedExecStream pOutputStream = prepareTransformGraph(
00197         mockStreamEmbryo,aggStreamEmbryo);
00198 
00199     // set up a generator which can produce the expected output
00200     // (a count of 5000*9999)
00201     RampExecStreamGenerator expectedResultGenerator(
00202         (mockParams.nRows-1)*mockParams.nRows/2);
00203 
00204     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00205 }
00206 
00207 void LhxAggExecStreamTest::testGroupCountImpl(uint forcePartitionLevel)
00208 {
00209     StandardTypeDescriptorFactory stdTypeFactory;
00210     TupleAttributeDescriptor attrDesc(
00211         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00212 
00213     uint numRows = 1000;
00214     // Create one column, with two duplicates per value.
00215     MockProducerExecStreamParams mockParams;
00216     mockParams.outputTupleDesc.push_back(attrDesc);
00217     mockParams.nRows = numRows;
00218     mockParams.pGenerator.reset(new RampDuplicateExecStreamGenerator());
00219 
00220     ExecStreamEmbryo mockStreamEmbryo;
00221     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00222     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00223 
00224     TupleDescriptor outputDesc;
00225     outputDesc.push_back(attrDesc);
00226     outputDesc.push_back(attrDesc);
00227 
00228     // simulate SELECT col, COUNT(*) FROM t10k GROUP BY col;
00229     LhxAggExecStreamParams aggParams;
00230     aggParams.groupByKeyCount = 1;
00231     aggParams.outputTupleDesc = outputDesc;
00232     AggInvocation countInvocation;
00233     countInvocation.aggFunction = AGG_FUNC_COUNT;
00234     countInvocation.iInputAttr = -1; // interpreted as COUNT(*)
00235     aggParams.aggInvocations.push_back(countInvocation);
00236 
00237     aggParams.pCacheAccessor = pCache;
00238     aggParams.scratchAccessor =
00239         pSegmentFactory->newScratchSegment(pCache, 100);
00240     aggParams.pTempSegment = pRandomSegment;
00241     aggParams.cndGroupByKeys = numRows / 2;
00242     aggParams.numRows = numRows;
00243     aggParams.forcePartitionLevel = forcePartitionLevel;
00244     aggParams.enableSubPartStat = true;
00245 
00246     ExecStreamEmbryo aggStreamEmbryo;
00247 
00248     aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00249     aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00250 
00251     ExternalSortExecStreamParams sortParams;
00252     sortParams.outputTupleDesc = outputDesc;
00253     sortParams.distinctness = DUP_ALLOW;
00254     sortParams.pTempSegment = pRandomSegment;
00255     sortParams.pCacheAccessor = pCache;
00256     sortParams.scratchAccessor =
00257         pSegmentFactory->newScratchSegment(pCache, 10);
00258     sortParams.keyProj.push_back(0);
00259     sortParams.storeFinalRun = false;
00260     sortParams.estimatedNumRows = MAXU;
00261     sortParams.earlyClose = false;
00262 
00263     ExecStreamEmbryo sortStreamEmbryo;
00264     sortStreamEmbryo.init(
00265         ExternalSortExecStream::newExternalSortExecStream(),sortParams);
00266     sortStreamEmbryo.getStream()->setName("ExternalSortExecStream");
00267 
00268     std::vector<ExecStreamEmbryo> transforms;
00269     transforms.push_back(aggStreamEmbryo);
00270     transforms.push_back(sortStreamEmbryo);
00271 
00272     SharedExecStream pOutputStream = prepareTransformGraph(
00273         mockStreamEmbryo, transforms);
00274 
00275     // Result should be a sequence of values in the first column
00276     // and 2 for the second column
00277     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > columnGenerators;
00278 
00279     SharedInt64ColumnGenerator col =
00280         SharedInt64ColumnGenerator(new SeqColumnGenerator());
00281     columnGenerators.push_back(col);
00282 
00283     col = SharedInt64ColumnGenerator(new ConstColumnGenerator(2));
00284     columnGenerators.push_back(col);
00285 
00286     CompositeExecStreamGenerator expectedResultGenerator(columnGenerators);
00287 
00288     verifyOutput(*pOutputStream, mockParams.nRows/2, expectedResultGenerator);
00289 }
00290 
00291 void LhxAggExecStreamTest::testSingleValueImpl(uint forcePartitionLevel)
00292 {
00293     StandardTypeDescriptorFactory stdTypeFactory;
00294     TupleAttributeDescriptor attrDesc(
00295         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00296     TupleAttributeDescriptor attrDescNullable(
00297         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), true,
00298         sizeof(::int64_t));
00299 
00300     // Result should be a sequence of values in the first column
00301     // and 2 for the second column
00302     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00303         columnGeneratorsIn;
00304 
00305     SharedInt64ColumnGenerator col =
00306         SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00307     columnGeneratorsIn.push_back(col);
00308 
00309     uint numRows = 1000;
00310 
00311     // Create two columns, both with two duplicates per column.
00312     MockProducerExecStreamParams mockParams;
00313     mockParams.outputTupleDesc.push_back(attrDesc);
00314     mockParams.nRows = numRows;
00315     mockParams.pGenerator.reset(
00316         new CompositeExecStreamGenerator(columnGeneratorsIn));
00317 
00318     ExecStreamEmbryo mockStreamEmbryo;
00319     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00320     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00321 
00322     TupleDescriptor outputDesc;
00323     outputDesc.push_back(attrDesc);
00324     outputDesc.push_back(attrDescNullable);
00325 
00326     // simulate SELECT x, single_value(x) FROM t10k group by x
00327     LhxAggExecStreamParams aggParams;
00328     aggParams.groupByKeyCount = 1;
00329     aggParams.outputTupleDesc = outputDesc;
00330     AggInvocation singleValueInvocation;
00331     singleValueInvocation.aggFunction = AGG_FUNC_SINGLE_VALUE;
00332     singleValueInvocation.iInputAttr = 0;
00333     aggParams.aggInvocations.push_back(singleValueInvocation);
00334 
00335     aggParams.pCacheAccessor = pCache;
00336     aggParams.scratchAccessor =
00337         pSegmentFactory->newScratchSegment(pCache, 100);
00338     aggParams.pTempSegment = pRandomSegment;
00339     aggParams.cndGroupByKeys = numRows;
00340     aggParams.numRows = numRows;
00341     aggParams.forcePartitionLevel = forcePartitionLevel;
00342     aggParams.enableSubPartStat = true;
00343 
00344     ExecStreamEmbryo aggStreamEmbryo;
00345 
00346     aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00347     aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00348 
00349     ExternalSortExecStreamParams sortParams;
00350     sortParams.outputTupleDesc = outputDesc;
00351     sortParams.distinctness = DUP_ALLOW;
00352     sortParams.pTempSegment = pRandomSegment;
00353     sortParams.pCacheAccessor = pCache;
00354     sortParams.scratchAccessor =
00355         pSegmentFactory->newScratchSegment(pCache, 10);
00356     sortParams.keyProj.push_back(0);
00357     sortParams.storeFinalRun = false;
00358 
00359     ExecStreamEmbryo sortStreamEmbryo;
00360     sortStreamEmbryo.init(
00361         ExternalSortExecStream::newExternalSortExecStream(),sortParams);
00362     sortStreamEmbryo.getStream()->setName("ExternalSortExecStream");
00363 
00364     std::vector<ExecStreamEmbryo> transforms;
00365     transforms.push_back(aggStreamEmbryo);
00366     transforms.push_back(sortStreamEmbryo);
00367 
00368     SharedExecStream pOutputStream = prepareTransformGraph(
00369         mockStreamEmbryo, transforms);
00370 
00371     // Result should be a sequence of values in both columns
00372     vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00373         columnGeneratorsOut;
00374 
00375     col =
00376         SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00377     columnGeneratorsOut.push_back(col);
00378 
00379     col =
00380         SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00381     columnGeneratorsOut.push_back(col);
00382 
00383     CompositeExecStreamGenerator expectedResultGenerator(columnGeneratorsOut);
00384 
00385     verifyOutput(*pOutputStream, mockParams.nRows, expectedResultGenerator);
00386 }
00387 
00388 FENNEL_UNIT_TEST_SUITE(LhxAggExecStreamTest);
00389 
00390 // End LhxAggExecStreamTest.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1