00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/test/ExecStreamUnitTestBase.h"
00024 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00025 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00026 #include "fennel/lucidera/bitmap/LbmEntry.h"
00027 #include "fennel/lucidera/test/SamplingExecStreamGenerator.h"
00028 #include "fennel/btree/BTreeBuilder.h"
00029 #include "fennel/ftrs/BTreeInsertExecStream.h"
00030 #include "fennel/ftrs/BTreeSearchExecStream.h"
00031 #include "fennel/ftrs/BTreeExecStream.h"
00032 #include "fennel/tuple/StandardTypeDescriptor.h"
00033 #include "fennel/tuple/TupleDescriptor.h"
00034 #include "fennel/tuple/TupleAccessor.h"
00035 #include "fennel/exec/MockProducerExecStream.h"
00036 #include "fennel/exec/ValuesExecStream.h"
00037 #include "fennel/exec/ExecStreamEmbryo.h"
00038 #include "fennel/exec/DynamicParam.h"
00039 #include "fennel/cache/Cache.h"
00040 #include <stdarg.h>
00041
00042 #include <boost/test/test_tools.hpp>
00043
00044 using namespace fennel;
00045
00051 class LcsRowScanExecStreamTest : public ExecStreamUnitTestBase
00052 {
00053 protected:
00054 static const uint NDUPS = 20;
00055 StandardTypeDescriptorFactory stdTypeFactory;
00056 TupleAttributeDescriptor attrDesc_int64;
00057 TupleAttributeDescriptor attrDesc_bitmap;
00058 TupleAttributeDescriptor attrDesc_char1;
00059 uint bitmapColSize;
00060
00061 vector<boost::shared_ptr<BTreeDescriptor> > bTreeClusters;
00062
00066 void loadClusters(
00067 uint nRows,
00068 uint nCols,
00069 uint nClusters,
00070 bool compressed);
00071
00080 void loadOneCluster(
00081 uint nRows,
00082 uint nCols,
00083 int colStart,
00084 BTreeDescriptor &bTreeDescriptor,
00085 bool compressed);
00086
00105 void testScanCols(
00106 uint nRows,
00107 uint nCols,
00108 uint nClusters,
00109 TupleProjection proj,
00110 uint skipRows,
00111 uint expectedNumRows);
00112
00133 void testFilterCols(
00134 uint nRows,
00135 uint nCols,
00136 uint nClusters,
00137 TupleProjection proj,
00138 uint skipRows,
00139 uint expectedNumRows,
00140 bool compressed);
00141
00142 void setSearchKey(
00143 char lowerDirective,
00144 char upperDirective,
00145 uint64_t lowerVal,
00146 uint64_t upperVal,
00147 PBuffer inputBuf,
00148 uint &offset,
00149 TupleAccessor &inputTupleAccessor,
00150 TupleData &inputTupleData);
00151
00181 void testSampleScanCols(
00182 uint nRows,
00183 uint nRowsActual,
00184 uint nCols,
00185 uint nClusters,
00186 TupleProjection proj,
00187 uint skipRows,
00188 TableSamplingMode mode,
00189 float rate,
00190 int seed,
00191 uint clumps,
00192 uint expectedNumRows);
00193
00208 int generateBitmaps(
00209 uint nRows, uint skipRows, TupleDescriptor const &bitmapTupleDesc,
00210 PBuffer pBuf);
00211
00212 void produceEntry(
00213 LbmEntry &lbmEntry, TupleAccessor &bitmapTupleAccessor, PBuffer pBuf,
00214 int &bufSize);
00215
00216 public:
00217 explicit LcsRowScanExecStreamTest()
00218 {
00219 FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testScans);
00220 FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testScanOnEmptyCluster);
00221 FENNEL_UNIT_TEST_CASE(
00222 LcsRowScanExecStreamTest, testScanPastEndOfCluster);
00223 FENNEL_UNIT_TEST_CASE(
00224 LcsRowScanExecStreamTest, testCompressedFiltering);
00225 FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testBernoulliSampling);
00226 FENNEL_UNIT_TEST_CASE(LcsRowScanExecStreamTest, testSystemSampling);
00227 }
00228
00229 void testCaseSetUp();
00230 void testCaseTearDown();
00231
00232 void testScans();
00233 void testScanOnEmptyCluster();
00234 void testScanPastEndOfCluster();
00235 void testCompressedFiltering();
00236 void testBernoulliSampling();
00237 void testSystemSampling();
00238 };
00239
00240 void LcsRowScanExecStreamTest::loadClusters(
00241 uint nRows,
00242 uint nCols,
00243 uint nClusters,
00244 bool compressed)
00245 {
00246 for (uint i = 0; i < nClusters; i++) {
00247 boost::shared_ptr<BTreeDescriptor> pBTreeDesc =
00248 boost::shared_ptr<BTreeDescriptor> (new BTreeDescriptor());
00249 bTreeClusters.push_back(pBTreeDesc);
00250 loadOneCluster(
00251 nRows, nCols, i * nCols, *(bTreeClusters[i]), compressed);
00252 resetExecStreamTest();
00253 }
00254 }
00255
00256 void LcsRowScanExecStreamTest::loadOneCluster(
00257 uint nRows,
00258 uint nCols,
00259 int colStart,
00260 BTreeDescriptor &bTreeDescriptor,
00261 bool compressed)
00262 {
00263 MockProducerExecStreamParams mockParams;
00264 for (uint i = 0; i < nCols; i++) {
00265 mockParams.outputTupleDesc.push_back(attrDesc_int64);
00266 }
00267 mockParams.nRows = nRows;
00268
00269
00270
00271 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00272 for (uint i = 0; i < nCols; i++) {
00273 SharedInt64ColumnGenerator col =
00274 SharedInt64ColumnGenerator(
00275 compressed
00276 ? (Int64ColumnGenerator *) new MixedDupColumnGenerator(
00277 NDUPS, i + colStart, 500)
00278 : new SeqColumnGenerator(i + colStart));
00279 columnGenerators.push_back(col);
00280 }
00281 mockParams.pGenerator.reset(
00282 new CompositeExecStreamGenerator(columnGenerators));
00283
00284 ExecStreamEmbryo mockStreamEmbryo;
00285 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00286 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00287
00288 LcsClusterAppendExecStreamParams lcsAppendParams;
00289 lcsAppendParams.scratchAccessor =
00290 pSegmentFactory->newScratchSegment(pCache, 10);
00291 lcsAppendParams.pCacheAccessor = pCache;
00292 lcsAppendParams.pSegment = pRandomSegment;
00293
00294
00295
00296 (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00297 (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00298
00299
00300 (lcsAppendParams.keyProj).push_back(0);
00301
00302
00303 lcsAppendParams.outputTupleDesc.push_back(attrDesc_int64);
00304
00305 for (uint i = 0; i < nCols; i++) {
00306 lcsAppendParams.inputProj.push_back(i);
00307 }
00308 lcsAppendParams.pRootMap = 0;
00309 lcsAppendParams.rootPageIdParamId = DynamicParamId(0);
00310
00311
00312
00313 bTreeDescriptor.segmentAccessor.pSegment = lcsAppendParams.pSegment;
00314 bTreeDescriptor.segmentAccessor.pCacheAccessor = pCache;
00315 bTreeDescriptor.tupleDescriptor = lcsAppendParams.tupleDesc;
00316 bTreeDescriptor.keyProjection = lcsAppendParams.keyProj;
00317 bTreeDescriptor.rootPageId = NULL_PAGE_ID;
00318 lcsAppendParams.segmentId = bTreeDescriptor.segmentId;
00319 lcsAppendParams.pageOwnerId = bTreeDescriptor.pageOwnerId;
00320
00321 BTreeBuilder builder(bTreeDescriptor, pRandomSegment);
00322 builder.createEmptyRoot();
00323 lcsAppendParams.rootPageId = bTreeDescriptor.rootPageId =
00324 builder.getRootPageId();
00325
00326
00327
00328 LcsClusterAppendExecStream *lcsStream = new LcsClusterAppendExecStream();
00329
00330 ExecStreamEmbryo lcsAppendStreamEmbryo;
00331 lcsAppendStreamEmbryo.init(lcsStream, lcsAppendParams);
00332 lcsAppendStreamEmbryo.getStream()->setName("LcsClusterAppendExecStream");
00333
00334 SharedExecStream pOutputStream = prepareTransformGraph(
00335 mockStreamEmbryo, lcsAppendStreamEmbryo);
00336
00337
00338 RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00339
00340 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00341 }
00342
00343 void LcsRowScanExecStreamTest::testScanCols(
00344 uint nRows,
00345 uint nCols,
00346 uint nClusters,
00347 TupleProjection proj,
00348 uint skipRows,
00349 uint expectedNumRows)
00350 {
00351
00352
00353 ValuesExecStreamParams valuesParams;
00354 boost::shared_array<FixedBuffer> pBuffer;
00355 ExecStreamEmbryo valuesStreamEmbryo;
00356 LcsRowScanExecStreamParams scanParams;
00357
00358 scanParams.hasExtraFilter = false;
00359 scanParams.samplingMode = SAMPLING_OFF;
00360
00361
00362
00363
00364 valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00365 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00366 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00367
00368
00369
00370
00371 uint bufferSize = std::max(
00372 16, (int) (nRows / 8 + nRows / bitmapColSize * 8));
00373 pBuffer.reset(new FixedBuffer[bufferSize]);
00374 valuesParams.pTupleBuffer = pBuffer;
00375
00376 if (nRows > 0) {
00377 valuesParams.bufSize = generateBitmaps(
00378 nRows, skipRows, valuesParams.outputTupleDesc, pBuffer.get());
00379 assert(valuesParams.bufSize <= bufferSize);
00380 scanParams.isFullScan = false;
00381 } else {
00382 scanParams.isFullScan = true;
00383 valuesParams.bufSize = 0;
00384 }
00385 valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
00386 valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00387
00388
00389
00390
00391 for (uint i = 0; i < nClusters; i++) {
00392 struct LcsClusterScanDef clusterScanDef;
00393
00394 for (uint j = 0; j < nCols; j++) {
00395 clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
00396 }
00397
00398 clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
00399 clusterScanDef.pCacheAccessor =
00400 bTreeClusters[i]->segmentAccessor.pCacheAccessor;
00401 clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
00402 clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
00403 clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
00404 clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
00405 clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
00406
00407 scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
00408 }
00409
00410
00411 scanParams.outputProj = proj;
00412 for (uint i = 0; i < proj.size(); i++) {
00413 scanParams.outputTupleDesc.push_back(attrDesc_int64);
00414 }
00415
00416 ExecStreamEmbryo scanStreamEmbryo;
00417 scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
00418 scanStreamEmbryo.getStream()->setName("RowScanExecStream");
00419 SharedExecStream pOutputStream;
00420
00421 pOutputStream =
00422 prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo);
00423
00424
00425
00426 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00427 for (uint i = 0; i < proj.size(); i++) {
00428 SharedInt64ColumnGenerator col =
00429 SharedInt64ColumnGenerator(
00430 new SeqColumnGenerator(
00431 proj[i],
00432 skipRows));
00433 columnGenerators.push_back(col);
00434 }
00435
00436 CompositeExecStreamGenerator resultGenerator(columnGenerators);
00437 verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
00438 }
00439
00440 int LcsRowScanExecStreamTest::generateBitmaps(
00441 uint nRows, uint skipRows, TupleDescriptor const &bitmapTupleDesc,
00442 PBuffer pBuf)
00443 {
00444 int bufSize = 0;
00445 LbmEntry lbmEntry;
00446 boost::scoped_array<FixedBuffer> entryBuf;
00447 TupleAccessor bitmapTupleAccessor;
00448 LcsRid rid = LcsRid(0);
00449
00450 TupleData bitmapTupleData(bitmapTupleDesc);
00451 bitmapTupleData[0].pData = (PConstBuffer) &rid;
00452 bitmapTupleData[1].pData = NULL;
00453 bitmapTupleData[1].cbData = 0;
00454 bitmapTupleData[2].pData = NULL;
00455 bitmapTupleData[2].cbData = 0;
00456
00457 bitmapTupleAccessor.compute(bitmapTupleDesc);
00458
00459
00460 uint scratchBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00461 entryBuf.reset(new FixedBuffer[scratchBufSize]);
00462 lbmEntry.init(entryBuf.get(), NULL, scratchBufSize, bitmapTupleDesc);
00463 lbmEntry.setEntryTuple(bitmapTupleData);
00464
00465
00466 for (rid = LcsRid(skipRows); rid < LcsRid(nRows); rid += skipRows) {
00467 if (!lbmEntry.setRID(LcsRid(rid))) {
00468
00469
00470 produceEntry(lbmEntry, bitmapTupleAccessor, pBuf, bufSize);
00471 lbmEntry.setEntryTuple(bitmapTupleData);
00472 }
00473 }
00474
00475 produceEntry(lbmEntry, bitmapTupleAccessor, pBuf, bufSize);
00476
00477 return bufSize;
00478 }
00479
00480 void LcsRowScanExecStreamTest::produceEntry(
00481 LbmEntry &lbmEntry, TupleAccessor &bitmapTupleAccessor, PBuffer pBuf,
00482 int &bufSize)
00483 {
00484 TupleData bitmapTuple = lbmEntry.produceEntryTuple();
00485 bitmapTupleAccessor.marshal(bitmapTuple, pBuf + bufSize);
00486 bufSize += bitmapTupleAccessor.getCurrentByteCount();
00487 }
00488
00489 void LcsRowScanExecStreamTest::testScans()
00490 {
00491
00492
00493
00494
00495
00496
00497 uint nRows = 50000;
00498 uint nCols = 12;
00499 uint nClusters = 3;
00500 TupleProjection proj;
00501
00502 loadClusters(nRows, nCols, nClusters, false);
00503
00504
00505
00506
00507 for (uint i = 0; i < nClusters; i++) {
00508 for (uint j = 0; j < nCols; j++) {
00509 proj.push_back(i * nCols + j);
00510 }
00511 }
00512 testScanCols(nRows, nCols, nClusters, proj, 1, nRows);
00513 resetExecStreamTest();
00514
00515
00516 proj.clear();
00517 proj.push_back(22);
00518 proj.push_back(10);
00519 proj.push_back(12);
00520 proj.push_back(26);
00521 proj.push_back(1);
00522 proj.push_back(35);
00523 proj.push_back(15);
00524 proj.push_back(5);
00525 proj.push_back(17);
00526 proj.push_back(30);
00527 proj.push_back(4);
00528 proj.push_back(20);
00529 proj.push_back(7);
00530 proj.push_back(13);
00531
00532 testScanCols(nRows, nCols, nClusters, proj, 1, nRows);
00533 resetExecStreamTest();
00534
00535
00536 testScanCols(
00537 nRows, nCols, nClusters, proj, 7, (int) ceil((double) nRows / 7));
00538 resetExecStreamTest();
00539
00540
00541
00542 testScanCols(
00543 nRows, nCols, nClusters, proj, 37, (int) ceil((double) nRows / 37));
00544 resetExecStreamTest();
00545
00546
00547 testScanCols(0, nCols, nClusters, proj, 1, nRows);
00548
00549 resetExecStreamTest();
00550
00551
00552 for (uint i = 0; i < nClusters; i++) {
00553 for (uint j = 0; j < nCols; j++) {
00554 proj.push_back(i * nCols + j);
00555 }
00556 }
00557 testFilterCols(nRows, nCols, nClusters, proj, 1, 1000, false);
00558
00559 resetExecStreamTest();
00560
00561
00562 proj.resize(0);
00563 for (uint i = 0; i < nClusters; i++) {
00564 for (uint j = 0; j < nCols; j++) {
00565 if (!(i < 2 && (j == 0 || j == 1))) {
00566 proj.push_back(i * nCols + j);
00567 }
00568 }
00569 }
00570 testFilterCols(nRows, nCols, nClusters, proj, 1, 1000, false);
00571
00572 resetExecStreamTest();
00573
00574
00575 proj.resize(0);
00576 for (uint i = 0; i < nClusters - 1; i++) {
00577 for (uint j = 0; j < nCols; j++) {
00578 proj.push_back(i * nCols + j);
00579 }
00580 }
00581 testFilterCols(
00582 nRows, nCols, nClusters, proj, 7, 1000 / 7 + 1, false);
00583 }
00584
00585 void LcsRowScanExecStreamTest::testCompressedFiltering()
00586 {
00587
00588
00589
00590
00591
00592
00593 uint nRows = 50000;
00594 uint nCols = 12;
00595 uint nClusters = 3;
00596 TupleProjection proj;
00597
00598
00599
00600 loadClusters(nRows, nCols, nClusters, true);
00601
00602
00603 proj.resize(0);
00604 for (uint i = 0; i < nClusters; i++) {
00605 for (uint j = 0; j < nCols; j++) {
00606 proj.push_back(i * nCols + j);
00607 }
00608 }
00609 testFilterCols(nRows, nCols, nClusters, proj, 1, 500*NDUPS+500, true);
00610
00611 resetExecStreamTest();
00612
00613
00614 proj.resize(0);
00615 for (uint i = 0; i < nClusters; i++) {
00616 for (uint j = 0; j < nCols; j++) {
00617 if (!(i < 2 && (j == 0 || j == 1))) {
00618 proj.push_back(i * nCols + j);
00619 }
00620 }
00621 }
00622 testFilterCols(nRows, nCols, nClusters, proj, 1, 500*NDUPS+500, true);
00623
00624 resetExecStreamTest();
00625
00626
00627 proj.resize(0);
00628 for (uint i = 0; i < nClusters - 1; i++) {
00629 for (uint j = 0; j < nCols; j++) {
00630 proj.push_back(i * nCols + j);
00631 }
00632 }
00633 testFilterCols(nRows, nCols, nClusters, proj, 1, 500*NDUPS+500, true);
00634 }
00635
00636
00640 void LcsRowScanExecStreamTest::testScanOnEmptyCluster()
00641 {
00642
00643
00644 BTreeDescriptor &bTreeDescriptor = *(bTreeClusters[0]);
00645
00646 bTreeDescriptor.segmentAccessor.pSegment = pRandomSegment;
00647 bTreeDescriptor.segmentAccessor.pCacheAccessor = pCache;
00648 bTreeDescriptor.tupleDescriptor.push_back(attrDesc_int64);
00649 bTreeDescriptor.tupleDescriptor.push_back(attrDesc_int64);
00650 bTreeDescriptor.keyProjection.push_back(0);
00651 bTreeDescriptor.rootPageId = NULL_PAGE_ID;
00652
00653 BTreeBuilder builder(bTreeDescriptor, pRandomSegment);
00654 builder.createEmptyRoot();
00655 bTreeDescriptor.rootPageId = builder.getRootPageId();
00656
00657
00658
00659
00660 TupleProjection proj;
00661
00662 proj.push_back(0);
00663 testScanCols(1, 1, 1, proj, 1, 0);
00664 }
00665
00670 void LcsRowScanExecStreamTest::testScanPastEndOfCluster()
00671 {
00672 loadOneCluster(1, 1, 0, *(bTreeClusters[0]), false);
00673 resetExecStreamTest();
00674
00675
00676
00677
00678 TupleProjection proj;
00679
00680 proj.push_back(0);
00681 testScanCols(2, 1, 1, proj, 1, 1);
00682 }
00683
00688 void LcsRowScanExecStreamTest::testBernoulliSampling()
00689 {
00690 uint nRows = 50000;
00691 uint nCols = 12;
00692 uint nClusters = 3;
00693 TupleProjection proj;
00694
00695 int seed = 19721212;
00696 float rate = 0.1;
00697 TableSamplingMode mode = SAMPLING_BERNOULLI;
00698
00699 loadClusters(nRows, nCols, nClusters, false);
00700
00701
00702
00703
00704 for (uint i = 0; i < nClusters; i++) {
00705 for (uint j = 0; j < nCols; j++) {
00706 proj.push_back(i * nCols + j);
00707 }
00708 }
00709
00710
00711 testSampleScanCols(
00712 0, nRows, nCols, nClusters, proj, 1, mode, rate, seed, 0, 4938);
00713 resetExecStreamTest();
00714
00715
00716 testSampleScanCols(
00717 nRows, nRows, nCols, nClusters, proj, 2, mode, rate, seed, 0, 2489);
00718 resetExecStreamTest();
00719 }
00720
00721
00726 void LcsRowScanExecStreamTest::testSystemSampling()
00727 {
00728 uint nRows = 50000;
00729 uint nCols = 12;
00730 uint nClusters = 3;
00731 TupleProjection proj;
00732
00733 TableSamplingMode mode = SAMPLING_SYSTEM;
00734
00735 loadClusters(nRows, nCols, nClusters, false);
00736
00737
00738
00739
00740 for (uint i = 0; i < nClusters; i++) {
00741 for (uint j = 0; j < nCols; j++) {
00742 proj.push_back(i * nCols + j);
00743 }
00744 }
00745
00746 testSampleScanCols(
00747 nRows, nRows, nCols, nClusters, proj, 1, mode, 0.1, -1, 10, 5000);
00748 resetExecStreamTest();
00749
00750 testSampleScanCols(
00751 nRows, nRows, nCols, nClusters, proj, 1, mode, 1.0, -1, 10, 50000);
00752 resetExecStreamTest();
00753
00754 testSampleScanCols(
00755 nRows, nRows, nCols, nClusters, proj, 1, mode, 0.33333, -1, 10, 16670);
00756 resetExecStreamTest();
00757 }
00758
00759 void LcsRowScanExecStreamTest::setSearchKey(
00760 char lowerDirective, char upperDirective, uint64_t lowerVal,
00761 uint64_t upperVal, PBuffer inputBuf, uint &offset,
00762 TupleAccessor &inputTupleAccessor, TupleData &inputTupleData)
00763 {
00764 inputTupleData[0].pData = (PConstBuffer) &lowerDirective;
00765 inputTupleData[2].pData = (PConstBuffer) &upperDirective;
00766 inputTupleData[1].pData = (PConstBuffer) &lowerVal;
00767 inputTupleData[3].pData = (PConstBuffer) &upperVal;
00768 inputTupleAccessor.marshal(inputTupleData, inputBuf + offset);
00769 offset += inputTupleAccessor.getCurrentByteCount();
00770 }
00771
00772 void LcsRowScanExecStreamTest::testFilterCols(
00773 uint nRows,
00774 uint nCols,
00775 uint nClusters,
00776 TupleProjection proj,
00777 uint skipRows,
00778 uint expectedNumRows,
00779 bool compressed)
00780 {
00781
00782
00783 ValuesExecStreamParams valuesParams;
00784 boost::shared_array<FixedBuffer> pBuffer;
00785 ExecStreamEmbryo valuesStreamEmbryo;
00786 LcsRowScanExecStreamParams scanParams;
00787
00788 scanParams.hasExtraFilter = true;
00789 scanParams.samplingMode = SAMPLING_OFF;
00790
00791
00792
00793
00794 valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00795 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00796 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00797
00798
00799
00800
00801
00802 uint bufferSize = std::max(
00803 16, (int) (nRows / 8 + nRows / bitmapColSize * 8));
00804 pBuffer.reset(new FixedBuffer[bufferSize]);
00805 valuesParams.pTupleBuffer = pBuffer;
00806
00807 if (nRows > 0) {
00808 valuesParams.bufSize = generateBitmaps(
00809 nRows, skipRows, valuesParams.outputTupleDesc, pBuffer.get());
00810 assert(valuesParams.bufSize <= bufferSize);
00811 scanParams.isFullScan = false;
00812 } else {
00813 scanParams.isFullScan = true;
00814 valuesParams.bufSize = 0;
00815 }
00816 valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
00817 valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00818
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828 TupleAttributeDescriptor attrDesc_nullableInt64 =
00829 TupleAttributeDescriptor(
00830 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64),
00831 true, sizeof(uint64_t));
00832
00833 valuesParams.outputTupleDesc.resize(0);
00834 TupleDescriptor inputTupleDesc;
00835 for (uint i = 0; i < 2; i++) {
00836 inputTupleDesc.push_back(attrDesc_char1);
00837 inputTupleDesc.push_back(attrDesc_nullableInt64);
00838 valuesParams.outputTupleDesc.push_back(attrDesc_char1);
00839 valuesParams.outputTupleDesc.push_back(attrDesc_nullableInt64);
00840 }
00841 TupleData inputTupleData(inputTupleDesc);
00842 TupleAccessor inputTupleAccessor;
00843 inputTupleAccessor.compute(inputTupleDesc);
00844
00845 uint nInputTuples = 3;
00846 boost::shared_array<FixedBuffer> inputBuffer;
00847 inputBuffer.reset(
00848 new FixedBuffer[nInputTuples * inputTupleAccessor.getMaxByteCount()]);
00849
00850 PBuffer inputBuf = inputBuffer.get();
00851 uint offset = 0;
00852
00853 setSearchKey(
00854 '-', ')', 0, 1000, inputBuf, offset, inputTupleAccessor,
00855 inputTupleData);
00856 setSearchKey(
00857 '[', '+', 2000, 0, inputBuf, offset, inputTupleAccessor,
00858 inputTupleData);
00859
00860 TupleData inputTupleData1(inputTupleDesc);
00861 boost::shared_array<FixedBuffer> inputBuffer1;
00862 inputBuffer1.reset(
00863 new FixedBuffer[nInputTuples * inputTupleAccessor.getMaxByteCount()]);
00864 PBuffer inputBuf1 = inputBuffer1.get();
00865 uint offset1 = 0;
00866
00867 setSearchKey(
00868 '[', ')', 500 + nCols, 2999 + nCols, inputBuf1, offset1,
00869 inputTupleAccessor,
00870 inputTupleData1);
00871 setSearchKey(
00872 '[', ']', 2999 + nCols, 2999 + nCols, inputBuf1, offset1,
00873 inputTupleAccessor, inputTupleData1);
00874
00875 TupleData inputTupleData2(inputTupleDesc);
00876 boost::shared_array<FixedBuffer> inputBuffer2;
00877 inputBuffer2.reset(
00878 new FixedBuffer[nInputTuples * inputTupleAccessor.getMaxByteCount()]);
00879 PBuffer inputBuf2 = inputBuffer2.get();
00880 uint offset2 = 0;
00881
00882 setSearchKey(
00883 '(', '+', 1500+2*nCols, 0, inputBuf2, offset2, inputTupleAccessor,
00884 inputTupleData1);
00885
00886 valuesParams.pTupleBuffer = inputBuffer;
00887 valuesParams.bufSize = offset;
00888
00889 ExecStreamEmbryo valuesStreamEmbryo1, valuesStreamEmbryo2,
00890 valuesStreamEmbryo3;
00891 valuesStreamEmbryo1.init(new ValuesExecStream(), valuesParams);
00892 valuesStreamEmbryo1.getStream()->setName("ValuesExecStream1");
00893
00894 valuesParams.pTupleBuffer = inputBuffer1;
00895 valuesParams.bufSize = offset1;
00896 valuesStreamEmbryo2.init(new ValuesExecStream(), valuesParams);
00897 valuesStreamEmbryo2.getStream()->setName("ValuesExecStream2");
00898
00899 valuesParams.pTupleBuffer = inputBuffer2;
00900 valuesParams.bufSize = offset2;
00901 valuesStreamEmbryo3.init(new ValuesExecStream(), valuesParams);
00902 valuesStreamEmbryo3.getStream()->setName("ValuesExecStream3");
00903
00904
00905
00906
00907 for (uint i = 0; i < nClusters; i++) {
00908 struct LcsClusterScanDef clusterScanDef;
00909
00910 for (uint j = 0; j < nCols; j++) {
00911 clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
00912 }
00913
00914 clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
00915 clusterScanDef.pCacheAccessor =
00916 bTreeClusters[i]->segmentAccessor.pCacheAccessor;
00917 clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
00918 clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
00919 clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
00920 clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
00921 clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
00922
00923 scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
00924 }
00925
00926
00927 scanParams.outputProj = proj;
00928 for (uint i = 0; i < proj.size(); i++) {
00929 scanParams.outputTupleDesc.push_back(attrDesc_int64);
00930 }
00931 scanParams.residualFilterCols.push_back(0);
00932 scanParams.residualFilterCols.push_back(nCols);
00933 scanParams.residualFilterCols.push_back(2*nCols);
00934
00935 ExecStreamEmbryo scanStreamEmbryo;
00936 scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
00937 scanStreamEmbryo.getStream()->setName("RowScanExecStream");
00938 SharedExecStream pOutputStream;
00939
00940 std::vector<ExecStreamEmbryo> sources;
00941 sources.push_back(valuesStreamEmbryo);
00942 sources.push_back(valuesStreamEmbryo1);
00943 sources.push_back(valuesStreamEmbryo2);
00944 sources.push_back(valuesStreamEmbryo3);
00945
00946 pOutputStream =
00947 prepareConfluenceGraph(sources, scanStreamEmbryo);
00948
00949
00950
00951 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00952 offset = (int) ceil(2000.0 / skipRows) * skipRows;
00953 for (uint i = 0; i < proj.size(); i++) {
00954 SharedInt64ColumnGenerator col =
00955 SharedInt64ColumnGenerator(
00956 compressed ?
00957 (Int64ColumnGenerator*) new MixedDupColumnGenerator(
00958 NDUPS, proj[i] + 2000,500) :
00959 new SeqColumnGenerator(proj[i] + offset, skipRows));
00960 columnGenerators.push_back(col);
00961 }
00962
00963
00964 CompositeExecStreamGenerator resultGenerator(columnGenerators);
00965 verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
00966 }
00967
00968
00969 void LcsRowScanExecStreamTest::testSampleScanCols(
00970 uint nRows,
00971 uint nRowsActual,
00972 uint nCols,
00973 uint nClusters,
00974 TupleProjection proj,
00975 uint skipRows,
00976 TableSamplingMode mode,
00977 float rate,
00978 int seed,
00979 uint clumps,
00980 uint expectedNumRows)
00981 {
00982
00983
00984 ValuesExecStreamParams valuesParams;
00985 boost::shared_array<FixedBuffer> pBuffer;
00986 ExecStreamEmbryo valuesStreamEmbryo;
00987 LcsRowScanExecStreamParams scanParams;
00988
00989 scanParams.hasExtraFilter = false;
00990
00991
00992
00993
00994 valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00995 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00996 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00997
00998 uint nRowsInternal = (mode == SAMPLING_SYSTEM) ? 0 : nRows;
00999
01000
01001
01002
01003
01004 uint bufferSize = std::max(
01005 16, (int) (nRowsInternal / 8 + nRowsInternal / bitmapColSize * 8));
01006 pBuffer.reset(new FixedBuffer[bufferSize]);
01007 valuesParams.pTupleBuffer = pBuffer;
01008
01009 if (nRowsInternal > 0) {
01010 valuesParams.bufSize = generateBitmaps(
01011 nRowsInternal, skipRows, valuesParams.outputTupleDesc,
01012 pBuffer.get());
01013 assert(valuesParams.bufSize <= bufferSize);
01014 scanParams.isFullScan = false;
01015 } else {
01016 scanParams.isFullScan = true;
01017 valuesParams.bufSize = 0;
01018 }
01019 valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
01020 valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
01021
01022
01023
01024
01025 for (uint i = 0; i < nClusters; i++) {
01026 struct LcsClusterScanDef clusterScanDef;
01027
01028 for (uint j = 0; j < nCols; j++) {
01029 clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
01030 }
01031
01032 clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
01033 clusterScanDef.pCacheAccessor =
01034 bTreeClusters[i]->segmentAccessor.pCacheAccessor;
01035 clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
01036 clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
01037 clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
01038 clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
01039 clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
01040
01041 scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
01042 }
01043
01044
01045 scanParams.outputProj = proj;
01046 for (uint i = 0; i < proj.size(); i++) {
01047 scanParams.outputTupleDesc.push_back(attrDesc_int64);
01048 }
01049
01050
01051
01052 scanParams.samplingMode = mode;
01053 scanParams.samplingRate = rate;
01054 scanParams.samplingIsRepeatable = true;
01055 scanParams.samplingRepeatableSeed = seed;
01056 scanParams.samplingClumps = clumps;
01057 scanParams.samplingRowCount = nRowsActual;
01058
01059 ExecStreamEmbryo scanStreamEmbryo;
01060 scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
01061 scanStreamEmbryo.getStream()->setName("RowScanExecStream");
01062 SharedExecStream pOutputStream;
01063
01064 pOutputStream =
01065 prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo);
01066
01067
01068
01069 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
01070 for (uint i = 0; i < proj.size(); i++) {
01071 SharedInt64ColumnGenerator col =
01072 SharedInt64ColumnGenerator(
01073 new SeqColumnGenerator(
01074 proj[i],
01075 skipRows));
01076 columnGenerators.push_back(col);
01077 }
01078
01079 boost::shared_ptr<CompositeExecStreamGenerator> baseResultGenerator(
01080 new CompositeExecStreamGenerator(columnGenerators));
01081
01082 if (mode == SAMPLING_BERNOULLI) {
01083 BernoulliSamplingExecStreamGenerator resultGenerator(
01084 baseResultGenerator,
01085 rate,
01086 seed,
01087 proj.size());
01088
01089 verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
01090 } else {
01091 SystemSamplingExecStreamGenerator resultGenerator(
01092 baseResultGenerator,
01093 rate,
01094 nRows,
01095 proj.size(),
01096 clumps);
01097
01098 verifyOutput(*pOutputStream, expectedNumRows, resultGenerator);
01099 }
01100 }
01101
01102 void LcsRowScanExecStreamTest::testCaseSetUp()
01103 {
01104 ExecStreamUnitTestBase::testCaseSetUp();
01105
01106 attrDesc_char1 = TupleAttributeDescriptor(
01107 stdTypeFactory.newDataType(STANDARD_TYPE_CHAR), false, 1);
01108 attrDesc_int64 = TupleAttributeDescriptor(
01109 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
01110 bitmapColSize = pRandomSegment->getUsablePageSize() / 8;
01111 attrDesc_bitmap = TupleAttributeDescriptor(
01112 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
01113 true, bitmapColSize);
01114 }
01115
01116 void LcsRowScanExecStreamTest::testCaseTearDown()
01117 {
01118 for (uint i = 0; i < bTreeClusters.size(); i++) {
01119 bTreeClusters[i]->segmentAccessor.reset();
01120 }
01121 ExecStreamUnitTestBase::testCaseTearDown();
01122 }
01123
01124 FENNEL_UNIT_TEST_SUITE(LcsRowScanExecStreamTest);
01125
01126
01127