00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ValuesExecStream.h"
00024 #include "fennel/lucidera/bitmap/LbmChopperExecStream.h"
00025 #include "fennel/lucidera/bitmap/LbmUnionExecStream.h"
00026 #include "fennel/lucidera/test/LbmExecStreamTestBase.h"
00027
00028 #include <boost/test/test_tools.hpp>
00029
00030 using namespace fennel;
00031
00032
00033 #define DECLARE_CONSTANT_ARRAY(name, value, size) \
00034 FixedBuffer name[size]; \
00035 memset(name, value, size);
00036
00037
00038 #define CHECK_THROWN(code) \
00039 { \
00040 bool thrown = false; \
00041 try { \
00042 code; \
00043 } catch (...) { \
00044 thrown = true; \
00045 } \
00046 BOOST_CHECK(thrown); \
00047 }
00048
00049 class LbmUnionExecStreamTest : public LbmExecStreamTestBase
00050 {
00051 boost::shared_array<PBuffer> ppBuffers;
00052 uint nBuffers;
00053
00054
00055 SharedByteBuffer allocateBuffer(uint nBuffers, uint bufferSize);
00056
00057
00058 void deleteBuffer();
00059
00060
00061 void testBuffer(SharedByteBuffer pByteBuffer);
00062
00063
00064 void verifyMerge(LbmUnionMergeArea area, PConstBuffer reference);
00065
00066
00067 void testUnion(uint nRows, std::vector<LbmNumberStreamInput> inputData);
00068
00069 public:
00070 explicit LbmUnionExecStreamTest()
00071 {
00072
00073
00074 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testSinglePageBuffer);
00075 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testTwoPageBuffer);
00076 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testMultiPageBuffer);
00077 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testTwoInputs);
00078 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testThreeInputs);
00079 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testLargeRids);
00080 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testSparse);
00081 FENNEL_UNIT_TEST_CASE(LbmUnionExecStreamTest, testGaps);
00082 }
00083
00084 void testSinglePageBuffer();
00085 void testTwoPageBuffer();
00086 void testMultiPageBuffer();
00087
00088 void testTwoInputs();
00089 void testThreeInputs();
00090 void testLargeRids();
00091 void testSparse();
00092 void testGaps();
00093 };
00094
00095 void LbmUnionExecStreamTest::testSinglePageBuffer()
00096 {
00097 SharedByteBuffer pByteBuffer = allocateBuffer(1, 16);
00098 testBuffer(pByteBuffer);
00099 deleteBuffer();
00100 }
00101
00102 void LbmUnionExecStreamTest::testTwoPageBuffer()
00103 {
00104 SharedByteBuffer pByteBuffer = allocateBuffer(2, 8);
00105 testBuffer(pByteBuffer);
00106 deleteBuffer();
00107 }
00108
00109 void LbmUnionExecStreamTest::testMultiPageBuffer()
00110 {
00111 SharedByteBuffer pByteBuffer = allocateBuffer(4, 4);
00112 testBuffer(pByteBuffer);
00113 deleteBuffer();
00114 }
00115
00116 void LbmUnionExecStreamTest::testTwoInputs()
00117 {
00118 uint nRows = 1000;
00119 std::vector<LbmNumberStreamInput> inputData;
00120
00121
00122 LbmNumberStreamInput input1;
00123 input1.pStream =
00124 SharedNumberStream(new SkipNumberStream(20, 500, 2));
00125 input1.bitmapSize = 4;
00126 inputData.push_back(input1);
00127
00128
00129 LbmNumberStreamInput input2;
00130 input2.pStream =
00131 SharedNumberStream(new SkipNumberStream(300, 990, 3));
00132 input2.bitmapSize = 8;
00133 inputData.push_back(input2);
00134
00135 testUnion(nRows, inputData);
00136 }
00137
00138 void LbmUnionExecStreamTest::testThreeInputs()
00139 {
00140 uint nRows = 1051;
00141 std::vector<LbmNumberStreamInput> inputData;
00142
00143
00144 LbmNumberStreamInput input1;
00145 input1.pStream =
00146 SharedNumberStream(new SkipNumberStream(21, 700, 7));
00147 input1.bitmapSize = 4;
00148 inputData.push_back(input1);
00149
00150
00151 LbmNumberStreamInput input2;
00152 input2.pStream =
00153 SharedNumberStream(new SkipNumberStream(300, 990, 3));
00154 input2.bitmapSize = 8;
00155 inputData.push_back(input2);
00156
00157
00158 LbmNumberStreamInput input3;
00159 input3.pStream =
00160 SharedNumberStream(new SkipNumberStream(500, 1050, 5));
00161 input3.bitmapSize = 8;
00162 inputData.push_back(input3);
00163
00164 testUnion(nRows, inputData);
00165 }
00166
00167 void LbmUnionExecStreamTest::testLargeRids()
00168 {
00169 uint nRows = 5001000;
00170 std::vector<LbmNumberStreamInput> inputData;
00171
00172
00173 LbmNumberStreamInput input1;
00174 input1.pStream =
00175 SharedNumberStream(new SkipNumberStream(5000020, 5000500, 4));
00176 input1.bitmapSize = 4;
00177 inputData.push_back(input1);
00178
00179
00180 LbmNumberStreamInput input2;
00181 input2.pStream =
00182 SharedNumberStream(new SkipNumberStream(5000300, 5000990, 5));
00183 input2.bitmapSize = 8;
00184 inputData.push_back(input2);
00185
00186 testUnion(nRows, inputData);
00187 }
00188
00189 void LbmUnionExecStreamTest::testSparse()
00190 {
00191 uint nRows = 2900;
00192 std::vector<LbmNumberStreamInput> inputData;
00193
00194
00195 LbmNumberStreamInput input1;
00196 input1.pStream =
00197 SharedNumberStream(new SkipNumberStream(26, 1300, 13));
00198 input1.bitmapSize = 4;
00199 inputData.push_back(input1);
00200
00201
00202 LbmNumberStreamInput input2;
00203 input2.pStream =
00204 SharedNumberStream(new SkipNumberStream(340, 1700, 17));
00205 input2.bitmapSize = 8;
00206 inputData.push_back(input2);
00207
00208
00209 LbmNumberStreamInput input3;
00210 input3.pStream =
00211 SharedNumberStream(new SkipNumberStream(1100, 2200, 11));
00212 input3.bitmapSize = 8;
00213 inputData.push_back(input3);
00214
00215 testUnion(nRows, inputData);
00216 }
00217
00218 void LbmUnionExecStreamTest::testGaps()
00219 {
00220 uint nRows = 2000;
00221 std::vector<LbmNumberStreamInput> inputData;
00222
00223
00224 LbmNumberStreamInput input1;
00225 input1.pStream =
00226 SharedNumberStream(new SkipNumberStream(26, 520, 13));
00227 input1.bitmapSize = 4;
00228 inputData.push_back(input1);
00229
00230
00231 LbmNumberStreamInput input2;
00232 input2.pStream =
00233 SharedNumberStream(new SkipNumberStream(680, 1020, 17));
00234 input2.bitmapSize = 8;
00235 inputData.push_back(input2);
00236
00237
00238 LbmNumberStreamInput input3;
00239 input3.pStream =
00240 SharedNumberStream(new SkipNumberStream(1320, 1540, 11));
00241 input3.bitmapSize = 8;
00242 inputData.push_back(input3);
00243
00244
00245 LbmNumberStreamInput input4;
00246 input4.pStream =
00247 SharedNumberStream(new SkipNumberStream(1330, 1900, 11));
00248 input4.bitmapSize = 8;
00249 inputData.push_back(input4);
00250
00251 testUnion(nRows, inputData);
00252 }
00253
00254 SharedByteBuffer LbmUnionExecStreamTest::allocateBuffer(
00255 uint nBuffers, uint bufferSize)
00256 {
00257 ppBuffers.reset(new PBuffer[nBuffers]);
00258 this->nBuffers = nBuffers;
00259 for (uint i = 0; i < nBuffers; i++) {
00260 ppBuffers[i] = new FixedBuffer[bufferSize];
00261 }
00262
00263 SharedByteBuffer pByteBuffer(new ByteBuffer());
00264 pByteBuffer->init(ppBuffers, nBuffers, bufferSize);
00265 return pByteBuffer;
00266 }
00267
00268 void LbmUnionExecStreamTest::deleteBuffer()
00269 {
00270 for (uint i = 0; i < nBuffers; i++) {
00271 delete [] ppBuffers[i];
00272 }
00273 }
00274
00275 void LbmUnionExecStreamTest::testBuffer(SharedByteBuffer pByteBuffer)
00276 {
00277 DECLARE_CONSTANT_ARRAY(zeroes, 0, 16);
00278 DECLARE_CONSTANT_ARRAY(ones, 1, 16);
00279 DECLARE_CONSTANT_ARRAY(twos, 2, 16);
00280 DECLARE_CONSTANT_ARRAY(fours, 4, 16);
00281 DECLARE_CONSTANT_ARRAY(maxByte, 255, 16);
00282
00283 LbmUnionMergeArea mergeArea;
00284 mergeArea.init(pByteBuffer);
00285
00286
00287
00288 mergeArea.advance(10000);
00289 mergeArea.mergeMem(10006, ones, 8);
00290 FixedBuffer result0[16] = { 0,0,0,0, 0,0,1,1, 1,1,1,1, 1,1 };
00291 verifyMerge(mergeArea, result0);
00292
00293
00294 mergeArea.advance(10005);
00295 FixedBuffer result1[16] = { 0,1,1, 1,1,1,1, 1,1 };
00296 verifyMerge(mergeArea, result1);
00297
00298
00299
00300 mergeArea.mergeMem(10008, twos, 13);
00301 FixedBuffer result2[16] = { 0,1,1, 3,3,3,3, 3,3,2,2, 2,2,2,2, 2 };
00302 verifyMerge(mergeArea, result2);
00303
00304
00305 mergeArea.advance(10007);
00306 FixedBuffer result3[16] = { 1, 3,3,3,3, 3,3,2,2, 2,2,2,2, 2 };
00307 verifyMerge(mergeArea, result3);
00308
00309
00310 mergeArea.mergeMem(10010, fours, 9);
00311 FixedBuffer result4[16] = { 1, 3,3,7,7, 7,7,6,6, 6,6,6,2, 2 };
00312 verifyMerge(mergeArea, result4);
00313
00314
00315 mergeArea.advance(10030);
00316 mergeArea.mergeMem(10040, fours, 4);
00317 mergeArea.mergeMem(10036, twos, 4);
00318 mergeArea.mergeMem(10032, ones, 4);
00319 mergeArea.mergeMem(10044, zeroes, 2);
00320 FixedBuffer result5[16] = { 0,0,1,1, 1,1,2,2, 2,2,4,4, 4,4,0,0 };
00321 verifyMerge(mergeArea, result5);
00322
00323
00324 mergeArea.advance(10032);
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337 mergeArea.advance(10032);
00338 }
00339
00340 void LbmUnionExecStreamTest::verifyMerge(
00341 LbmUnionMergeArea area, PConstBuffer reference)
00342 {
00343 LbmByteNumberPrimitive start = area.getStart();
00344 uint size = opaqueToInt(area.getEnd() - start);
00345
00346 for (uint i = 0; i < size; i++) {
00347 BOOST_CHECK_EQUAL(area.getByte(start + i), reference[i]);
00348 }
00349 }
00350
00351 void LbmUnionExecStreamTest::testUnion(
00352 uint nRows, std::vector<LbmNumberStreamInput> inputData)
00353 {
00354 uint nInputs = inputData.size();
00355
00356
00357 UnionNumberStream *pUnion = new UnionNumberStream();
00358 for (uint i = 0; i < nInputs; i++) {
00359 SharedNumberStream pChild(inputData[i].pStream->clone());
00360 pUnion->addChild(pChild);
00361 }
00362 LbmNumberStreamInput expectedData;
00363 expectedData.pStream = SharedNumberStream(pUnion);
00364 expectedData.bitmapSize = resultBitmapSize(nRows);
00365
00366
00367 boost::scoped_array<BitmapInput> bmInputs;
00368 bmInputs.reset(new BitmapInput[nInputs + 1]);
00369
00370
00371 uint totalSize = 0;
00372 uint totalBitmaps = 0;
00373 for (uint i = 0; i < nInputs; i++) {
00374 initBitmapInput(bmInputs[i], nRows, inputData[i]);
00375 totalSize += bmInputs[i].currBufSize;
00376 totalBitmaps += bmInputs[i].nBitmaps;
00377 }
00378
00379 BitmapInput bmCombined;
00380 bmCombined.bufArray.reset(new FixedBuffer[totalSize]);
00381 bmCombined.currBufSize = bmCombined.fullBufSize = totalSize;
00382 bmCombined.nBitmaps = totalBitmaps;
00383 PBuffer pCurrent = bmCombined.bufArray.get();
00384 for (uint i = 0; i < nInputs; i++) {
00385 memcpy(pCurrent, bmInputs[i].bufArray.get(), bmInputs[i].currBufSize);
00386 pCurrent += bmInputs[i].currBufSize;
00387 }
00388
00389 ValuesExecStreamParams valuesParams;
00390 ExecStreamEmbryo valuesStreamEmbryo;
00391 initValuesExecStream(
00392 0, valuesParams, valuesStreamEmbryo, bmCombined);
00393
00394
00395 initBitmapInput(bmInputs[nInputs], nRows, expectedData);
00396
00397
00398 std::vector<ExecStreamEmbryo> transformEmbryoList;
00399
00400 LbmChopperExecStreamParams chopperParams;
00401 chopperParams.ridLimitParamId = DynamicParamId(1);
00402 chopperParams.outputTupleDesc = bitmapTupleDesc;
00403 chopperParams.scratchAccessor.pSegment = pRandomSegment;
00404 chopperParams.scratchAccessor.pCacheAccessor = pCache;
00405 ExecStreamEmbryo chopperEmbryo;
00406 chopperEmbryo.init(new LbmChopperExecStream(), chopperParams);
00407 chopperEmbryo.getStream()->setName("ChopperExecStream");
00408 transformEmbryoList.push_back(chopperEmbryo);
00409
00410 ExternalSortExecStreamParams sortParams;
00411 ExecStreamEmbryo sortEmbryo;
00412 initSorterExecStream(sortParams, sortEmbryo, bitmapTupleDesc);
00413 transformEmbryoList.push_back(sortEmbryo);
00414
00415 LbmUnionExecStreamParams unionParams;
00416 unionParams.maxRid = (LcsRid) 0;
00417 unionParams.ridLimitParamId = DynamicParamId(1);
00418 unionParams.startRidParamId = DynamicParamId(0);
00419 unionParams.segmentLimitParamId = DynamicParamId(0);
00420 unionParams.outputTupleDesc = bitmapTupleDesc;
00421 unionParams.scratchAccessor =
00422 pSegmentFactory->newScratchSegment(pCache, 10);
00423 ExecStreamEmbryo unionEmbryo;
00424 unionEmbryo.init(new LbmUnionExecStream(), unionParams);
00425 unionEmbryo.getStream()->setName("UnionExecStream");
00426 transformEmbryoList.push_back(unionEmbryo);
00427
00428 SharedExecStream pOutputStream = prepareTransformGraph(
00429 valuesStreamEmbryo, transformEmbryoList);
00430
00431 if (bmInputs[nInputs].bufArray.get()) {
00432 bitmapTupleAccessor.setCurrentTupleBuf(
00433 bmInputs[nInputs].bufArray.get());
00434 }
00435 verifyBufferedOutput(
00436 *pOutputStream, bitmapTupleDesc, bmInputs[nInputs].nBitmaps,
00437 bmInputs[nInputs].bufArray.get());
00438 }
00439
00440 FENNEL_UNIT_TEST_SUITE(LbmUnionExecStreamTest);
00441
00442