00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/hashexe/LhxAggExecStream.h"
00026 #include "fennel/sorter/ExternalSortExecStream.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 #include "fennel/exec/MockProducerExecStream.h"
00029 #include "fennel/exec/ExecStreamEmbryo.h"
00030 #include "fennel/cache/Cache.h"
00031
00032 #include <boost/test/test_tools.hpp>
00033
00034 using namespace fennel;
00035
00036 class LhxAggExecStreamTest : public ExecStreamUnitTestBase
00037 {
00038 void testCountImpl(uint forcePartitionLevel);
00039 void testSumImpl(uint forcePartitionLevel);
00040 void testGroupCountImpl(uint forcePartitionLevel);
00041 void testSingleValueImpl(uint forcePartitionLevel);
00042
00043 public:
00044 explicit LhxAggExecStreamTest()
00045 {
00046 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testCount);
00047 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSum);
00048 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testGroupCount);
00049 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSingleValue);
00050 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testCountPartition);
00051 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSumPartition);
00052 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testGroupCountPartition);
00053 FENNEL_UNIT_TEST_CASE(LhxAggExecStreamTest, testSingleValuePartition);
00054 }
00055
00056 void testCount();
00057 void testCountPartition();
00058
00059 void testSum();
00060 void testSumPartition();
00061
00062 void testGroupCount();
00063 void testGroupCountPartition();
00064
00065 void testSingleValue();
00066 void testSingleValuePartition();
00067 };
00068
00069 void LhxAggExecStreamTest::testCount()
00070 {
00071 testCountImpl(0);
00072 }
00073
00074 void LhxAggExecStreamTest::testCountPartition()
00075 {
00076 testCountImpl(2);
00077 }
00078
00079 void LhxAggExecStreamTest::testSum()
00080 {
00081 testSumImpl(0);
00082 }
00083
00084 void LhxAggExecStreamTest::testSumPartition()
00085 {
00086 testSumImpl(2);
00087 }
00088
00089 void LhxAggExecStreamTest::testGroupCount()
00090 {
00091 testGroupCountImpl(0);
00092 }
00093
00094 void LhxAggExecStreamTest::testGroupCountPartition()
00095 {
00096 testGroupCountImpl(2);
00097 }
00098
00099 void LhxAggExecStreamTest::testSingleValue()
00100 {
00101 testSingleValueImpl(0);
00102 }
00103
00104 void LhxAggExecStreamTest::testSingleValuePartition()
00105 {
00106 testSingleValueImpl(2);
00107 }
00108
00109 void LhxAggExecStreamTest::testCountImpl(uint forcePartitionLevel)
00110 {
00111 StandardTypeDescriptorFactory stdTypeFactory;
00112 TupleAttributeDescriptor attrDesc(
00113 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00114
00115 uint numRows = 10000;
00116
00117 MockProducerExecStreamParams mockParams;
00118 mockParams.outputTupleDesc.push_back(attrDesc);
00119 mockParams.nRows = numRows;
00120
00121 ExecStreamEmbryo mockStreamEmbryo;
00122 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00123 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00124
00125
00126 LhxAggExecStreamParams aggParams;
00127 aggParams.groupByKeyCount = 0;
00128 aggParams.outputTupleDesc.push_back(attrDesc);
00129 AggInvocation countInvocation;
00130 countInvocation.aggFunction = AGG_FUNC_COUNT;
00131 countInvocation.iInputAttr = -1;
00132 aggParams.aggInvocations.push_back(countInvocation);
00133
00134 aggParams.pCacheAccessor = pCache;
00135 aggParams.scratchAccessor =
00136 pSegmentFactory->newScratchSegment(pCache, 100);
00137 aggParams.pTempSegment = pRandomSegment;
00138 aggParams.cndGroupByKeys = numRows;
00139 aggParams.numRows = numRows;
00140 aggParams.forcePartitionLevel = forcePartitionLevel;
00141 aggParams.enableSubPartStat = true;
00142
00143 ExecStreamEmbryo aggStreamEmbryo;
00144 aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00145 aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00146
00147 SharedExecStream pOutputStream = prepareTransformGraph(
00148 mockStreamEmbryo,aggStreamEmbryo);
00149
00150
00151
00152 RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00153
00154 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00155 }
00156
00157 void LhxAggExecStreamTest::testSumImpl(uint forcePartitionLevel)
00158 {
00159 StandardTypeDescriptorFactory stdTypeFactory;
00160 TupleAttributeDescriptor attrDesc(
00161 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00162
00163 uint numRows = 10000;
00164 MockProducerExecStreamParams mockParams;
00165 mockParams.outputTupleDesc.push_back(attrDesc);
00166 mockParams.nRows = numRows;
00167 mockParams.pGenerator.reset(new RampExecStreamGenerator());
00168
00169 ExecStreamEmbryo mockStreamEmbryo;
00170 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00171 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00172
00173
00174 LhxAggExecStreamParams aggParams;
00175 aggParams.groupByKeyCount = 0;
00176 attrDesc.isNullable = true;
00177 aggParams.outputTupleDesc.push_back(attrDesc);
00178 AggInvocation sumInvocation;
00179 sumInvocation.aggFunction = AGG_FUNC_SUM;
00180 sumInvocation.iInputAttr = 0;
00181 aggParams.aggInvocations.push_back(sumInvocation);
00182
00183 aggParams.pCacheAccessor = pCache;
00184 aggParams.scratchAccessor =
00185 pSegmentFactory->newScratchSegment(pCache, 100);
00186 aggParams.pTempSegment = pRandomSegment;
00187 aggParams.cndGroupByKeys = numRows;
00188 aggParams.numRows = numRows;
00189 aggParams.forcePartitionLevel = forcePartitionLevel;
00190 aggParams.enableSubPartStat = true;
00191
00192 ExecStreamEmbryo aggStreamEmbryo;
00193 aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00194 aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00195
00196 SharedExecStream pOutputStream = prepareTransformGraph(
00197 mockStreamEmbryo,aggStreamEmbryo);
00198
00199
00200
00201 RampExecStreamGenerator expectedResultGenerator(
00202 (mockParams.nRows-1)*mockParams.nRows/2);
00203
00204 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00205 }
00206
00207 void LhxAggExecStreamTest::testGroupCountImpl(uint forcePartitionLevel)
00208 {
00209 StandardTypeDescriptorFactory stdTypeFactory;
00210 TupleAttributeDescriptor attrDesc(
00211 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00212
00213 uint numRows = 1000;
00214
00215 MockProducerExecStreamParams mockParams;
00216 mockParams.outputTupleDesc.push_back(attrDesc);
00217 mockParams.nRows = numRows;
00218 mockParams.pGenerator.reset(new RampDuplicateExecStreamGenerator());
00219
00220 ExecStreamEmbryo mockStreamEmbryo;
00221 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00222 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00223
00224 TupleDescriptor outputDesc;
00225 outputDesc.push_back(attrDesc);
00226 outputDesc.push_back(attrDesc);
00227
00228
00229 LhxAggExecStreamParams aggParams;
00230 aggParams.groupByKeyCount = 1;
00231 aggParams.outputTupleDesc = outputDesc;
00232 AggInvocation countInvocation;
00233 countInvocation.aggFunction = AGG_FUNC_COUNT;
00234 countInvocation.iInputAttr = -1;
00235 aggParams.aggInvocations.push_back(countInvocation);
00236
00237 aggParams.pCacheAccessor = pCache;
00238 aggParams.scratchAccessor =
00239 pSegmentFactory->newScratchSegment(pCache, 100);
00240 aggParams.pTempSegment = pRandomSegment;
00241 aggParams.cndGroupByKeys = numRows / 2;
00242 aggParams.numRows = numRows;
00243 aggParams.forcePartitionLevel = forcePartitionLevel;
00244 aggParams.enableSubPartStat = true;
00245
00246 ExecStreamEmbryo aggStreamEmbryo;
00247
00248 aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00249 aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00250
00251 ExternalSortExecStreamParams sortParams;
00252 sortParams.outputTupleDesc = outputDesc;
00253 sortParams.distinctness = DUP_ALLOW;
00254 sortParams.pTempSegment = pRandomSegment;
00255 sortParams.pCacheAccessor = pCache;
00256 sortParams.scratchAccessor =
00257 pSegmentFactory->newScratchSegment(pCache, 10);
00258 sortParams.keyProj.push_back(0);
00259 sortParams.storeFinalRun = false;
00260 sortParams.estimatedNumRows = MAXU;
00261 sortParams.earlyClose = false;
00262
00263 ExecStreamEmbryo sortStreamEmbryo;
00264 sortStreamEmbryo.init(
00265 ExternalSortExecStream::newExternalSortExecStream(),sortParams);
00266 sortStreamEmbryo.getStream()->setName("ExternalSortExecStream");
00267
00268 std::vector<ExecStreamEmbryo> transforms;
00269 transforms.push_back(aggStreamEmbryo);
00270 transforms.push_back(sortStreamEmbryo);
00271
00272 SharedExecStream pOutputStream = prepareTransformGraph(
00273 mockStreamEmbryo, transforms);
00274
00275
00276
00277 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > columnGenerators;
00278
00279 SharedInt64ColumnGenerator col =
00280 SharedInt64ColumnGenerator(new SeqColumnGenerator());
00281 columnGenerators.push_back(col);
00282
00283 col = SharedInt64ColumnGenerator(new ConstColumnGenerator(2));
00284 columnGenerators.push_back(col);
00285
00286 CompositeExecStreamGenerator expectedResultGenerator(columnGenerators);
00287
00288 verifyOutput(*pOutputStream, mockParams.nRows/2, expectedResultGenerator);
00289 }
00290
00291 void LhxAggExecStreamTest::testSingleValueImpl(uint forcePartitionLevel)
00292 {
00293 StandardTypeDescriptorFactory stdTypeFactory;
00294 TupleAttributeDescriptor attrDesc(
00295 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00296 TupleAttributeDescriptor attrDescNullable(
00297 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), true,
00298 sizeof(::int64_t));
00299
00300
00301
00302 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00303 columnGeneratorsIn;
00304
00305 SharedInt64ColumnGenerator col =
00306 SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00307 columnGeneratorsIn.push_back(col);
00308
00309 uint numRows = 1000;
00310
00311
00312 MockProducerExecStreamParams mockParams;
00313 mockParams.outputTupleDesc.push_back(attrDesc);
00314 mockParams.nRows = numRows;
00315 mockParams.pGenerator.reset(
00316 new CompositeExecStreamGenerator(columnGeneratorsIn));
00317
00318 ExecStreamEmbryo mockStreamEmbryo;
00319 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00320 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00321
00322 TupleDescriptor outputDesc;
00323 outputDesc.push_back(attrDesc);
00324 outputDesc.push_back(attrDescNullable);
00325
00326
00327 LhxAggExecStreamParams aggParams;
00328 aggParams.groupByKeyCount = 1;
00329 aggParams.outputTupleDesc = outputDesc;
00330 AggInvocation singleValueInvocation;
00331 singleValueInvocation.aggFunction = AGG_FUNC_SINGLE_VALUE;
00332 singleValueInvocation.iInputAttr = 0;
00333 aggParams.aggInvocations.push_back(singleValueInvocation);
00334
00335 aggParams.pCacheAccessor = pCache;
00336 aggParams.scratchAccessor =
00337 pSegmentFactory->newScratchSegment(pCache, 100);
00338 aggParams.pTempSegment = pRandomSegment;
00339 aggParams.cndGroupByKeys = numRows;
00340 aggParams.numRows = numRows;
00341 aggParams.forcePartitionLevel = forcePartitionLevel;
00342 aggParams.enableSubPartStat = true;
00343
00344 ExecStreamEmbryo aggStreamEmbryo;
00345
00346 aggStreamEmbryo.init(new LhxAggExecStream(),aggParams);
00347 aggStreamEmbryo.getStream()->setName("LhxAggExecStream");
00348
00349 ExternalSortExecStreamParams sortParams;
00350 sortParams.outputTupleDesc = outputDesc;
00351 sortParams.distinctness = DUP_ALLOW;
00352 sortParams.pTempSegment = pRandomSegment;
00353 sortParams.pCacheAccessor = pCache;
00354 sortParams.scratchAccessor =
00355 pSegmentFactory->newScratchSegment(pCache, 10);
00356 sortParams.keyProj.push_back(0);
00357 sortParams.storeFinalRun = false;
00358
00359 ExecStreamEmbryo sortStreamEmbryo;
00360 sortStreamEmbryo.init(
00361 ExternalSortExecStream::newExternalSortExecStream(),sortParams);
00362 sortStreamEmbryo.getStream()->setName("ExternalSortExecStream");
00363
00364 std::vector<ExecStreamEmbryo> transforms;
00365 transforms.push_back(aggStreamEmbryo);
00366 transforms.push_back(sortStreamEmbryo);
00367
00368 SharedExecStream pOutputStream = prepareTransformGraph(
00369 mockStreamEmbryo, transforms);
00370
00371
00372 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00373 columnGeneratorsOut;
00374
00375 col =
00376 SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00377 columnGeneratorsOut.push_back(col);
00378
00379 col =
00380 SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00381 columnGeneratorsOut.push_back(col);
00382
00383 CompositeExecStreamGenerator expectedResultGenerator(columnGeneratorsOut);
00384
00385 verifyOutput(*pOutputStream, mockParams.nRows, expectedResultGenerator);
00386 }
00387
00388 FENNEL_UNIT_TEST_SUITE(LhxAggExecStreamTest);
00389
00390