00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/hashexe/LhxJoinExecStream.h"
00026 #include "fennel/sorter/ExternalSortExecStream.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 #include "fennel/exec/MockProducerExecStream.h"
00029 #include "fennel/exec/ExecStreamEmbryo.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/exec/ExecStreamGraph.h"
00032 #include "fennel/cache/Cache.h"
00033
00034 #include <boost/test/test_tools.hpp>
00035
00036 using namespace fennel;
00037
00038 class LhxJoinExecStreamTest : public ExecStreamUnitTestBase
00039 {
00040 void testSequentialImpl(
00041 uint numRows,
00042 uint forcePartitionLevel,
00043 bool enableJoinFilter,
00044 bool enableSubPartStat);
00045
00046 void testDupImpl(
00047 uint numRows,
00048 uint cndKeyLeft,
00049 uint cndKeyRight,
00050 uint forcePartitionLevel,
00051 bool enableJoinFilter,
00052 bool enableSubPartStat,
00053 bool needSort,
00054 bool fakeInterrupt);
00055
00056 void testImpl(
00057 uint numInputRows,
00058 uint keyCount,
00059 uint cndKeys,
00060 uint numResultRows,
00061 TupleDescriptor &inputDesc,
00062 TupleDescriptor &outputDesc,
00063 TupleProjection &outputProj,
00064 SharedMockProducerExecStreamGenerator pLeftGenerator,
00065 SharedMockProducerExecStreamGenerator pRightGenerator,
00066 CompositeExecStreamGenerator &verifier,
00067 uint forcePartitionLevel,
00068 bool enableJoinFilter,
00069 bool enableSubPartStat,
00070 bool needSort,
00071 bool fakeInterrupt);
00072
00073 public:
00074 explicit LhxJoinExecStreamTest()
00075 {
00076 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testSequential);
00077 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testDup1);
00078 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testDup2);
00079 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testConst);
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119 FENNEL_UNIT_TEST_CASE(
00120 LhxJoinExecStreamTest,
00121 testSequentialPartitionFilterStat);
00122 FENNEL_UNIT_TEST_CASE(
00123 LhxJoinExecStreamTest,
00124 testDup1PartitionFilterStat);
00125 FENNEL_UNIT_TEST_CASE(
00126 LhxJoinExecStreamTest,
00127 testDup2PartitionFilterStat);
00128 FENNEL_UNIT_TEST_CASE(
00129 LhxJoinExecStreamTest,
00130 testConstPartitionFilterStat);
00131
00132 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testConstCleanup);
00133 }
00134
00135
00136
00137
00138 void testSequential();
00139
00140 void testSequentialPartition();
00141 void testSequentialPartitionFilter();
00142 void testSequentialPartitionStat();
00143 void testSequentialPartitionFilterStat();
00144
00145
00146
00147
00148
00149
00150
00151
00152 void testDup1();
00153
00154 void testDup1Partition();
00155 void testDup1PartitionFilter();
00156 void testDup1PartitionStat();
00157 void testDup1PartitionFilterStat();
00158
00159
00160
00161
00162
00163
00164
00165
00166 void testDup2();
00167
00168 void testDup2Partition();
00169 void testDup2PartitionFilter();
00170 void testDup2PartitionStat();
00171 void testDup2PartitionFilterStat();
00172
00173
00174
00175
00176
00177
00178
00179
00180 void testConst();
00181 void testConstPartition();
00182 void testConstPartitionStat();
00183 void testConstPartitionFilterStat();
00184 void testConstCleanup();
00185 };
00186
00187 void LhxJoinExecStreamTest::testSequential()
00188 {
00189 testSequentialImpl(1000, 0, true, true);
00190 }
00191
00192 void LhxJoinExecStreamTest::testSequentialPartition()
00193 {
00194 testSequentialImpl(1000, 2, false, false);
00195 }
00196
00197 void LhxJoinExecStreamTest::testSequentialPartitionFilter()
00198 {
00199 testSequentialImpl(1000, 2, true, false);
00200 }
00201
00202 void LhxJoinExecStreamTest::testSequentialPartitionStat()
00203 {
00204 testSequentialImpl(1000, 2, false, true);
00205 }
00206
00207 void LhxJoinExecStreamTest::testSequentialPartitionFilterStat()
00208 {
00209 testSequentialImpl(1000, 2, true, true);
00210 }
00211
00212 void LhxJoinExecStreamTest::testDup1()
00213 {
00214 testDupImpl(960, 16, 60, 0, false, false, false, false);
00215 }
00216
00217 void LhxJoinExecStreamTest::testDup1Partition()
00218 {
00219 testDupImpl(960, 16, 60, 2, false, false, true, false);
00220 }
00221
00222 void LhxJoinExecStreamTest::testDup1PartitionFilter()
00223 {
00224 testDupImpl(960, 16, 60, 2, true, false, true, false);
00225 }
00226
00227 void LhxJoinExecStreamTest::testDup1PartitionStat()
00228 {
00229 testDupImpl(960, 16, 60, 2, false, true, true, false);
00230 }
00231
00232 void LhxJoinExecStreamTest::testDup1PartitionFilterStat()
00233 {
00234 testDupImpl(960, 16, 60, 2, true, true, true, false);
00235 }
00236
00237 void LhxJoinExecStreamTest::testDup2()
00238 {
00239 testDupImpl(960, 60, 16, 0, false, false, false, false);
00240 }
00241
00242 void LhxJoinExecStreamTest::testDup2Partition()
00243 {
00244 testDupImpl(960, 60, 16, 2, false, false, true, false);
00245 }
00246
00247 void LhxJoinExecStreamTest::testDup2PartitionFilter()
00248 {
00249 testDupImpl(960, 60, 16, 2, true, false, true, false);
00250 }
00251
00252 void LhxJoinExecStreamTest::testDup2PartitionStat()
00253 {
00254 testDupImpl(960, 60, 16, 2, false, true, true, false);
00255 }
00256
00257 void LhxJoinExecStreamTest::testDup2PartitionFilterStat()
00258 {
00259 testDupImpl(960, 60, 16, 2, true, true, true, false);
00260 }
00261
00262 void LhxJoinExecStreamTest::testConst()
00263 {
00264 testDupImpl(960, 1, 60, 0, false, false, false, false);
00265 }
00266
00267 void LhxJoinExecStreamTest::testConstPartition()
00268 {
00269 testDupImpl(960, 1, 60, 2, false, false, false, false);
00270 }
00271
00272 void LhxJoinExecStreamTest::testConstPartitionStat()
00273 {
00274 testDupImpl(960, 1, 60, 2, false, true, false, false);
00275 }
00276
00277 void LhxJoinExecStreamTest::testConstPartitionFilterStat()
00278 {
00279 testDupImpl(960, 1, 60, 2, true, true, false, false);
00280 }
00281
00282 void LhxJoinExecStreamTest::testConstCleanup()
00283 {
00284
00285
00286
00287 testDupImpl(960, 1, 60, 2, false, false, false, true);
00288 }
00289
00290 void LhxJoinExecStreamTest::testSequentialImpl(
00291 uint numRows,
00292 uint forcePartitionLevel,
00293 bool enableJoinFilter,
00294 bool enableSubPartStat)
00295 {
00296 uint numColsLeft;
00297 uint numColsRight;
00298 numColsRight = numColsLeft = 1;
00299 uint keyCount = 1;
00300 uint cndKeys = numRows;
00301
00302 assert (keyCount <= numColsRight && keyCount <= numColsLeft);
00303
00304 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00305 leftColumnGenerators;
00306 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00307 rightColumnGenerators;
00308 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00309 outColumnGenerators;
00310
00311 StandardTypeDescriptorFactory stdTypeFactory;
00312 TupleAttributeDescriptor attrDesc(
00313 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00314
00315 TupleDescriptor inputDesc;
00316 TupleDescriptor outputDesc;
00317 TupleProjection outputProj;
00318
00319 uint i;
00320
00321 for (i = 0; i < numColsLeft; i++) {
00322 leftColumnGenerators.push_back(
00323 SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00324
00325
00326
00327 inputDesc.push_back(attrDesc);
00328
00329
00330
00331
00332 outColumnGenerators.push_back(
00333 SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00334 outputDesc.push_back(attrDesc);
00335 outputProj.push_back(i);
00336 }
00337
00338 for (; i < numColsLeft + numColsRight; i++) {
00339 rightColumnGenerators.push_back(
00340 SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00341
00342
00343
00344
00345 outColumnGenerators.push_back(
00346 SharedInt64ColumnGenerator(new SeqColumnGenerator()));
00347 outputDesc.push_back(attrDesc);
00348 outputProj.push_back(i);
00349 }
00350
00351 SharedMockProducerExecStreamGenerator pLeftGenerator(
00352 new CompositeExecStreamGenerator(leftColumnGenerators));
00353
00354 SharedMockProducerExecStreamGenerator pRightGenerator(
00355 new CompositeExecStreamGenerator(rightColumnGenerators));
00356
00357 CompositeExecStreamGenerator verifier(outColumnGenerators);
00358
00359 bool needSort = (forcePartitionLevel > 0) ? true : false;
00360 bool fakeInterrupt = false;
00361
00362 testImpl(
00363 numRows, keyCount, cndKeys, numRows, inputDesc, outputDesc,
00364 outputProj, pLeftGenerator, pRightGenerator, verifier,
00365 forcePartitionLevel, enableJoinFilter, enableSubPartStat,
00366 needSort, fakeInterrupt);
00367 }
00368
00369 void LhxJoinExecStreamTest::testDupImpl(
00370 uint numRows,
00371 uint cndKeyLeft,
00372 uint cndKeyRight,
00373 uint forcePartitionLevel,
00374 bool enableJoinFilter,
00375 bool enableSubPartStat,
00376 bool needSort,
00377 bool fakeInterrupt)
00378 {
00379 assert (!fakeInterrupt || !needSort);
00380
00381 uint numColsLeft;
00382 uint numColsRight;
00383 numColsRight = numColsLeft = 2;
00384 uint keyCount = 1;
00385 uint cndKeys;
00386
00387 assert (keyCount <= numColsRight && keyCount <= numColsLeft);
00388
00389 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00390 leftColumnGenerators;
00391 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00392 rightColumnGenerators;
00393 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > >
00394 outColumnGenerators;
00395
00396 StandardTypeDescriptorFactory stdTypeFactory;
00397 TupleAttributeDescriptor attrDesc(
00398 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00399
00400 TupleDescriptor inputDesc;
00401 TupleDescriptor outputDesc;
00402 TupleProjection outputProj;
00403
00404 uint i;
00405
00406 for (i = 0; i < numColsLeft; i++) {
00407 leftColumnGenerators.push_back(
00408 SharedInt64ColumnGenerator(new
00409 DupColumnGenerator(numRows / cndKeyLeft)));
00410 outColumnGenerators.push_back(
00411 SharedInt64ColumnGenerator(new
00412 DupColumnGenerator(numRows*numRows/cndKeyLeft/cndKeyRight)));
00413
00414 inputDesc.push_back(attrDesc);
00415 outputDesc.push_back(attrDesc);
00416 outputProj.push_back(i);
00417 }
00418
00419 for (; i < numColsLeft + numColsRight; i++) {
00420 rightColumnGenerators.push_back(
00421 SharedInt64ColumnGenerator(new
00422 DupColumnGenerator(numRows / cndKeyRight)));
00423 outColumnGenerators.push_back(
00424 SharedInt64ColumnGenerator(new
00425 DupColumnGenerator(numRows*numRows/cndKeyLeft/cndKeyRight)));
00426
00427 outputDesc.push_back(attrDesc);
00428 outputProj.push_back(i);
00429 }
00430
00431 cndKeys = cndKeyRight;
00432
00433 SharedMockProducerExecStreamGenerator pLeftGenerator(
00434 new CompositeExecStreamGenerator(leftColumnGenerators));
00435
00436 SharedMockProducerExecStreamGenerator pRightGenerator(
00437 new CompositeExecStreamGenerator(rightColumnGenerators));
00438
00439 CompositeExecStreamGenerator verifier(outColumnGenerators);
00440
00441 uint numResRows = (cndKeyLeft > cndKeyRight) ?
00442 (numRows * numRows / cndKeyLeft) :
00443 (numRows * numRows / cndKeyRight);
00444
00445 testImpl(
00446 numRows, keyCount, cndKeys, numResRows, inputDesc, outputDesc,
00447 outputProj, pLeftGenerator, pRightGenerator, verifier,
00448 forcePartitionLevel, enableJoinFilter, enableSubPartStat, needSort,
00449 fakeInterrupt);
00450 }
00451
00452 void LhxJoinExecStreamTest::testImpl(
00453 uint numInputRows, uint keyCount, uint cndKeys, uint numResultRows,
00454 TupleDescriptor &inputDesc, TupleDescriptor &outputDesc,
00455 TupleProjection &outputProj,
00456 SharedMockProducerExecStreamGenerator pLeftGenerator,
00457 SharedMockProducerExecStreamGenerator pRightGenerator,
00458 CompositeExecStreamGenerator &verifier,
00459 uint forcePartitionLevel, bool enableJoinFilter, bool enableSubPartStat,
00460 bool needSort, bool fakeInterrupt)
00461 {
00462 TupleProjection leftKeyProj;
00463 TupleProjection rightKeyProj;
00464
00465
00466
00467
00468 MockProducerExecStreamParams mockParams;
00469 mockParams.outputTupleDesc = inputDesc;
00470 mockParams.nRows = numInputRows;
00471
00472 mockParams.pGenerator = pLeftGenerator;
00473 ExecStreamEmbryo leftInputStreamEmbryo;
00474 leftInputStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00475 leftInputStreamEmbryo.getStream()->setName("LeftInputExecStream");
00476
00477
00478
00479
00480 mockParams.pGenerator = pRightGenerator;
00481 ExecStreamEmbryo rightInputStreamEmbryo;
00482 rightInputStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00483 rightInputStreamEmbryo.getStream()->setName("RightInputExecStream");
00484
00485
00486
00487
00488 LhxJoinExecStreamParams joinParams;
00489
00490
00491
00492 joinParams.leftInner = true;
00493 joinParams.leftOuter = false;
00494 joinParams.rightInner = true;
00495 joinParams.rightOuter = false;
00496
00497 joinParams.setopAll = false;
00498 joinParams.setopDistinct = false;
00499
00500 joinParams.forcePartitionLevel = forcePartitionLevel;
00501 joinParams.enableJoinFilter = enableJoinFilter;
00502 joinParams.enableSubPartStat = enableSubPartStat;
00503 joinParams.enableSwing = true;
00504
00505 joinParams.outputProj = outputProj;
00506 joinParams.cndKeys = cndKeys;
00507 joinParams.numRows = numInputRows;
00508
00509 for (int i = 0; i < keyCount; i ++) {
00510 joinParams.leftKeyProj.push_back(i);
00511 joinParams.rightKeyProj.push_back(i);
00512 }
00513
00514
00515
00516
00517 joinParams.outputTupleDesc = outputDesc;
00518
00519
00520
00521 joinParams.pCacheAccessor = pCache;
00522 int cacheSize = 100;
00523 joinParams.scratchAccessor =
00524 pSegmentFactory->newScratchSegment(pCache, cacheSize);
00525 joinParams.pTempSegment = pRandomSegment;
00526
00527 ExecStreamEmbryo joinStreamEmbryo;
00528 joinStreamEmbryo.init(new LhxJoinExecStream(),joinParams);
00529 joinStreamEmbryo.getStream()->setName("LhxJoinExecStream");
00530
00531 SharedExecStream pOutputStream;
00532
00533 if (needSort) {
00534 ExternalSortExecStreamParams sortParams;
00535 sortParams.outputTupleDesc = outputDesc;
00536 sortParams.distinctness = DUP_ALLOW;
00537 sortParams.pTempSegment = pRandomSegment;
00538 sortParams.pCacheAccessor = pCache;
00539 sortParams.scratchAccessor =
00540 pSegmentFactory->newScratchSegment(pCache, 10);
00541 sortParams.keyProj.push_back(0);
00542 sortParams.storeFinalRun = false;
00543 sortParams.estimatedNumRows = MAXU;
00544 sortParams.earlyClose = false;
00545 ExecStreamEmbryo sortStreamEmbryo;
00546 sortStreamEmbryo.init(
00547 ExternalSortExecStream::newExternalSortExecStream(),sortParams);
00548 sortStreamEmbryo.getStream()->setName("ExternalSortExecStream");
00549
00550 pOutputStream = prepareConfluenceTransformGraph(
00551 leftInputStreamEmbryo, rightInputStreamEmbryo, joinStreamEmbryo,
00552 sortStreamEmbryo);
00553 } else {
00554 pOutputStream = prepareConfluenceGraph(
00555 leftInputStreamEmbryo, rightInputStreamEmbryo, joinStreamEmbryo);
00556 }
00557
00558
00559
00560
00561 verifyOutput(
00562 *pOutputStream,
00563 fakeInterrupt ? 1 : numResultRows,
00564 verifier,
00565 fakeInterrupt);
00566
00567 if (fakeInterrupt) {
00568
00569 pScheduler->stop();
00570 pGraph->close();
00571 }
00572
00573 BOOST_CHECK_EQUAL(0, pRandomSegment->getAllocatedSizeInPages());
00574 }
00575
00576 FENNEL_UNIT_TEST_SUITE(LhxJoinExecStreamTest);
00577
00578