00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/common/FemEnums.h"
00026 #include "fennel/btree/BTreeDescriptor.h"
00027 #include "fennel/btree/BTreeBuilder.h"
00028 #include "fennel/btree/BTreeReader.h"
00029 #include "fennel/test/ExecStreamTestSuite.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/exec/ExecStream.h"
00032 #include "fennel/exec/ExecStreamGraph.h"
00033 #include "fennel/exec/ExecStreamBufAccessor.h"
00034 #include "fennel/exec/MockProducerExecStream.h"
00035 #include "fennel/exec/ScratchBufferExecStream.h"
00036 #include "fennel/exec/DoubleBufferExecStream.h"
00037 #include "fennel/exec/CopyExecStream.h"
00038 #include "fennel/exec/MergeExecStream.h"
00039 #include "fennel/exec/SegBufferExecStream.h"
00040 #include "fennel/exec/SegBufferReaderExecStream.h"
00041 #include "fennel/exec/SegBufferWriterExecStream.h"
00042 #include "fennel/exec/CartesianJoinExecStream.h"
00043 #include "fennel/exec/SortedAggExecStream.h"
00044 #include "fennel/exec/ReshapeExecStream.h"
00045 #include "fennel/exec/SplitterExecStream.h"
00046 #include "fennel/exec/BarrierExecStream.h"
00047 #include "fennel/exec/ValuesExecStream.h"
00048 #include "fennel/exec/NestedLoopJoinExecStream.h"
00049 #include "fennel/exec/ExecStreamEmbryo.h"
00050 #include "fennel/tuple/StandardTypeDescriptor.h"
00051 #include "fennel/ftrs/BTreeInsertExecStream.h"
00052
00053 using namespace fennel;
00054
00055 uint ExecStreamTestSuite::getDegreeOfParallelism()
00056 {
00057 return 1;
00058 }
00059
00060 void ExecStreamTestSuite::testScratchBufferExecStream()
00061 {
00062 StandardTypeDescriptorFactory stdTypeFactory;
00063 TupleAttributeDescriptor attrDesc(
00064 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00065
00066 MockProducerExecStreamParams mockParams;
00067 mockParams.outputTupleDesc.push_back(attrDesc);
00068 mockParams.nRows = 5000;
00069 mockParams.pGenerator.reset(new RampExecStreamGenerator());
00070
00071 ExecStreamEmbryo mockStreamEmbryo;
00072 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00073 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00074
00075 ScratchBufferExecStreamParams bufParams;
00076 bufParams.scratchAccessor =
00077 pSegmentFactory->newScratchSegment(pCache,1);
00078
00079 ExecStreamEmbryo bufStreamEmbryo;
00080 bufStreamEmbryo.init(new ScratchBufferExecStream(),bufParams);
00081 bufStreamEmbryo.getStream()->setName("ScratchBufferExecStream");
00082
00083 SharedExecStream pOutputStream = prepareTransformGraph(
00084 mockStreamEmbryo, bufStreamEmbryo);
00085
00086 verifyOutput(
00087 *pOutputStream,
00088 mockParams.nRows,
00089 *(mockParams.pGenerator));
00090 }
00091
00092
00093 void ExecStreamTestSuite::testDoubleBufferExecStream()
00094 {
00095 StandardTypeDescriptorFactory stdTypeFactory;
00096 TupleAttributeDescriptor attrDesc(
00097 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00098
00099 MockProducerExecStreamParams mockParams;
00100 mockParams.outputTupleDesc.push_back(attrDesc);
00101 mockParams.nRows = 25000;
00102 mockParams.pGenerator.reset(new RampExecStreamGenerator());
00103
00104 ExecStreamEmbryo mockStreamEmbryo;
00105 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00106 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00107
00108 DoubleBufferExecStreamParams bufParams;
00109 bufParams.scratchAccessor =
00110 pSegmentFactory->newScratchSegment(pCache,1);
00111
00112 ExecStreamEmbryo bufStreamEmbryo;
00113 bufStreamEmbryo.init(new DoubleBufferExecStream(),bufParams);
00114 bufStreamEmbryo.getStream()->setName("DoubleBufferExecStream");
00115
00116 SharedExecStream pOutputStream = prepareTransformGraph(
00117 mockStreamEmbryo, bufStreamEmbryo);
00118
00119 verifyOutput(
00120 *pOutputStream,
00121 mockParams.nRows,
00122 *(mockParams.pGenerator));
00123 }
00124
00125 void ExecStreamTestSuite::testCopyExecStream()
00126 {
00127 StandardTypeDescriptorFactory stdTypeFactory;
00128 TupleAttributeDescriptor attrDesc(
00129 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00130
00131 MockProducerExecStreamParams mockParams;
00132 mockParams.outputTupleDesc.push_back(attrDesc);
00133 mockParams.nRows = 10000;
00134
00135 ExecStreamEmbryo mockStreamEmbryo;
00136 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00137 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00138
00139 CopyExecStreamParams copyParams;
00140 copyParams.outputTupleDesc.push_back(attrDesc);
00141
00142 ExecStreamEmbryo copyStreamEmbryo;
00143 copyStreamEmbryo.init(new CopyExecStream(),copyParams);
00144 copyStreamEmbryo.getStream()->setName("CopyExecStream");
00145
00146 SharedExecStream pOutputStream = prepareTransformGraph(
00147 mockStreamEmbryo,copyStreamEmbryo);
00148
00149 int32_t zero = 0;
00150 TupleDescriptor expectedDesc;
00151 expectedDesc.push_back(attrDesc);
00152 TupleData expectedTuple;
00153 expectedTuple.compute(expectedDesc);
00154 expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00155 verifyConstantOutput(
00156 *pOutputStream,
00157 expectedTuple,
00158 mockParams.nRows);
00159 }
00160
00161 void ExecStreamTestSuite::testMergeExecStream()
00162 {
00163
00164
00165 StandardTypeDescriptorFactory stdTypeFactory;
00166 TupleAttributeDescriptor attrDesc(
00167 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00168
00169 MockProducerExecStreamParams paramsMock;
00170 paramsMock.outputTupleDesc.push_back(attrDesc);
00171 paramsMock.nRows = 10000;
00172
00173 ExecStreamEmbryo mockStreamEmbryo1;
00174 mockStreamEmbryo1.init(new MockProducerExecStream(),paramsMock);
00175 mockStreamEmbryo1.getStream()->setName("MockProducerExecStream1");
00176
00177 ExecStreamEmbryo mockStreamEmbryo2;
00178 mockStreamEmbryo2.init(new MockProducerExecStream(),paramsMock);
00179 mockStreamEmbryo2.getStream()->setName("MockProducerExecStream2");
00180
00181 MergeExecStreamParams paramsMerge;
00182 paramsMerge.outputTupleDesc.push_back(attrDesc);
00183 if (getDegreeOfParallelism() != 1) {
00184 paramsMerge.isParallel = true;
00185 }
00186
00187 ExecStreamEmbryo mergeStreamEmbryo;
00188 mergeStreamEmbryo.init(new MergeExecStream(),paramsMerge);
00189 mergeStreamEmbryo.getStream()->setName("MergeExecStream");
00190
00191 SharedExecStream pOutputStream = prepareConfluenceGraph(
00192 mockStreamEmbryo1,
00193 mockStreamEmbryo2,
00194 mergeStreamEmbryo);
00195
00196 int32_t zero = 0;
00197 TupleDescriptor expectedDesc;
00198 expectedDesc.push_back(attrDesc);
00199 TupleData expectedTuple;
00200 expectedTuple.compute(expectedDesc);
00201 expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00202 verifyConstantOutput(
00203 *pOutputStream,
00204 expectedTuple,
00205 2*paramsMock.nRows);
00206 }
00207
00208 void ExecStreamTestSuite::testSegBufferExecStream()
00209 {
00210 StandardTypeDescriptorFactory stdTypeFactory;
00211 TupleAttributeDescriptor attrDesc(
00212 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00213
00214 MockProducerExecStreamParams mockParams;
00215 mockParams.outputTupleDesc.push_back(attrDesc);
00216 mockParams.nRows = 10000;
00217
00218 ExecStreamEmbryo mockStreamEmbryo;
00219 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00220 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00221
00222 SegBufferExecStreamParams bufParams;
00223 bufParams.scratchAccessor.pSegment = pRandomSegment;
00224 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
00225 bufParams.multipass = false;
00226
00227 ExecStreamEmbryo bufStreamEmbryo;
00228 bufStreamEmbryo.init(new SegBufferExecStream(),bufParams);
00229 bufStreamEmbryo.getStream()->setName("SegBufferExecStream");
00230
00231 SharedExecStream pOutputStream = prepareTransformGraph(
00232 mockStreamEmbryo, bufStreamEmbryo);
00233
00234 int32_t zero = 0;
00235 TupleDescriptor expectedDesc;
00236 expectedDesc.push_back(attrDesc);
00237 TupleData expectedTuple;
00238 expectedTuple.compute(expectedDesc);
00239 expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00240 verifyConstantOutput(
00241 *pOutputStream,
00242 expectedTuple,
00243 mockParams.nRows);
00244 }
00245
00246 void ExecStreamTestSuite::testCartesianJoinExecStream(
00247 uint nRowsOuter,uint nRowsInner)
00248 {
00249
00250
00251 StandardTypeDescriptorFactory stdTypeFactory;
00252 TupleAttributeDescriptor attrDesc(
00253 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00254
00255 MockProducerExecStreamParams paramsMockOuter;
00256 paramsMockOuter.outputTupleDesc.push_back(attrDesc);
00257 paramsMockOuter.nRows = nRowsOuter;
00258
00259 ExecStreamEmbryo outerStreamEmbryo;
00260 outerStreamEmbryo.init(new MockProducerExecStream(),paramsMockOuter);
00261 outerStreamEmbryo.getStream()->setName("OuterProducerExecStream");
00262
00263 MockProducerExecStreamParams paramsMockInner(paramsMockOuter);
00264 paramsMockInner.nRows = nRowsInner;
00265
00266 ExecStreamEmbryo innerStreamEmbryo;
00267 innerStreamEmbryo.init(new MockProducerExecStream(),paramsMockInner);
00268 innerStreamEmbryo.getStream()->setName("InnerProducerExecStream");
00269
00270 CartesianJoinExecStreamParams paramsJoin;
00271 paramsJoin.leftOuter = false;
00272
00273 ExecStreamEmbryo joinStreamEmbryo;
00274 joinStreamEmbryo.init(new CartesianJoinExecStream(),paramsJoin);
00275 joinStreamEmbryo.getStream()->setName("CartesianJoinExecStream");
00276
00277 SharedExecStream pOutputStream = prepareConfluenceGraph(
00278 outerStreamEmbryo,
00279 innerStreamEmbryo,
00280 joinStreamEmbryo);
00281
00282 int32_t zero = 0;
00283 TupleDescriptor expectedDesc;
00284 expectedDesc.push_back(attrDesc);
00285 expectedDesc.push_back(attrDesc);
00286 TupleData expectedTuple;
00287 expectedTuple.compute(expectedDesc);
00288 expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00289 expectedTuple[1].pData = reinterpret_cast<PBuffer>(&zero);
00290 verifyConstantOutput(
00291 *pOutputStream,
00292 expectedTuple,
00293 nRowsOuter*nRowsInner);
00294 }
00295
00296 void ExecStreamTestSuite::testCountAggExecStream()
00297 {
00298 StandardTypeDescriptorFactory stdTypeFactory;
00299 TupleAttributeDescriptor attrDesc(
00300 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00301
00302 MockProducerExecStreamParams mockParams;
00303 mockParams.outputTupleDesc.push_back(attrDesc);
00304 mockParams.nRows = 10000;
00305
00306 ExecStreamEmbryo mockStreamEmbryo;
00307 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00308 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00309
00310
00311 SortedAggExecStreamParams aggParams;
00312 aggParams.groupByKeyCount = 0;
00313 aggParams.outputTupleDesc.push_back(attrDesc);
00314 AggInvocation countInvocation;
00315 countInvocation.aggFunction = AGG_FUNC_COUNT;
00316 countInvocation.iInputAttr = -1;
00317 aggParams.aggInvocations.push_back(countInvocation);
00318
00319 ExecStreamEmbryo aggStreamEmbryo;
00320 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00321 aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00322
00323 SharedExecStream pOutputStream = prepareTransformGraph(
00324 mockStreamEmbryo,aggStreamEmbryo);
00325
00326
00327
00328 RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00329
00330 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00331 }
00332
00333 void ExecStreamTestSuite::testSumAggExecStream()
00334 {
00335 StandardTypeDescriptorFactory stdTypeFactory;
00336 TupleAttributeDescriptor attrDesc(
00337 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00338
00339 MockProducerExecStreamParams mockParams;
00340 mockParams.outputTupleDesc.push_back(attrDesc);
00341 mockParams.nRows = 10000;
00342 mockParams.pGenerator.reset(new RampExecStreamGenerator());
00343
00344 ExecStreamEmbryo mockStreamEmbryo;
00345 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00346 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00347
00348
00349 SortedAggExecStreamParams aggParams;
00350 aggParams.groupByKeyCount = 0;
00351 attrDesc.isNullable = true;
00352 aggParams.outputTupleDesc.push_back(attrDesc);
00353 AggInvocation sumInvocation;
00354 sumInvocation.aggFunction = AGG_FUNC_SUM;
00355 sumInvocation.iInputAttr = 0;
00356 aggParams.aggInvocations.push_back(sumInvocation);
00357
00358 ExecStreamEmbryo aggStreamEmbryo;
00359 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00360 aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00361
00362 SharedExecStream pOutputStream = prepareTransformGraph(
00363 mockStreamEmbryo,aggStreamEmbryo);
00364
00365
00366
00367 RampExecStreamGenerator expectedResultGenerator(
00368 (mockParams.nRows-1)*mockParams.nRows/2);
00369
00370 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00371 }
00372
00373 void ExecStreamTestSuite::testGroupAggExecStreamNrows(uint nrows)
00374 {
00375 StandardTypeDescriptorFactory stdTypeFactory;
00376 TupleAttributeDescriptor attrDesc(
00377 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00378
00379
00380 MockProducerExecStreamParams mockParams;
00381 mockParams.outputTupleDesc.push_back(attrDesc);
00382 mockParams.outputTupleDesc.push_back(attrDesc);
00383 mockParams.nRows = nrows;
00384 mockParams.pGenerator.reset(new RampDuplicateExecStreamGenerator());
00385
00386 ExecStreamEmbryo mockStreamEmbryo;
00387 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00388 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00389
00390
00391 SortedAggExecStreamParams aggParams;
00392 aggParams.groupByKeyCount = 1;
00393 aggParams.outputTupleDesc.push_back(attrDesc);
00394 aggParams.outputTupleDesc.push_back(attrDesc);
00395 AggInvocation countInvocation;
00396 countInvocation.aggFunction = AGG_FUNC_COUNT;
00397 countInvocation.iInputAttr = -1;
00398 aggParams.aggInvocations.push_back(countInvocation);
00399
00400 ExecStreamEmbryo aggStreamEmbryo;
00401
00402 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00403 aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00404
00405 SharedExecStream pOutputStream = prepareTransformGraph(
00406 mockStreamEmbryo,aggStreamEmbryo);
00407
00408
00409
00410 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00411
00412 SharedInt64ColumnGenerator col =
00413 SharedInt64ColumnGenerator(new SeqColumnGenerator());
00414 columnGenerators.push_back(col);
00415
00416 col = SharedInt64ColumnGenerator(new ConstColumnGenerator(2));
00417 columnGenerators.push_back(col);
00418
00419 CompositeExecStreamGenerator expectedResultGenerator(columnGenerators);
00420
00421 verifyOutput(*pOutputStream, mockParams.nRows/2, expectedResultGenerator);
00422 }
00423
00424 void ExecStreamTestSuite::testReshapeExecStream(
00425 bool filter, bool cast, uint expectedNRows, int expectedStart,
00426 bool compareParam,
00427 std::hash_set<int64_t> const &outputParams)
00428 {
00429 assert(!compareParam || filter == compareParam);
00430 StandardTypeDescriptorFactory stdTypeFactory;
00431 TupleAttributeDescriptor nullAttrDesc(
00432 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64),
00433 true, sizeof(int64_t));
00434 TupleAttributeDescriptor notNullAttrDesc(
00435 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00436
00437
00438
00439
00440
00441
00442
00443
00444 MockProducerExecStreamParams mockParams;
00445 for (int i = 0; i < 6; i++) {
00446 mockParams.outputTupleDesc.push_back(notNullAttrDesc);
00447 }
00448 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00449 SharedInt64ColumnGenerator colGen;
00450 for (int i = 0; i < 4; i++) {
00451 colGen = SharedInt64ColumnGenerator(new SeqColumnGenerator(i));
00452 columnGenerators.push_back(colGen);
00453 }
00454 colGen = SharedInt64ColumnGenerator(new DupColumnGenerator(25, 0));
00455 columnGenerators.push_back(colGen);
00456 colGen = SharedInt64ColumnGenerator(new DupColumnGenerator(10, 0));
00457 columnGenerators.push_back(colGen);
00458 mockParams.nRows = 1000;
00459 mockParams.pGenerator.reset(
00460 new CompositeExecStreamGenerator(columnGenerators));
00461
00462 ExecStreamEmbryo mockStreamEmbryo;
00463 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00464 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474 ReshapeExecStreamParams rsParams;
00475 boost::shared_array<FixedBuffer> pBuffer;
00476 std::vector<int64_t> paramVals;
00477 paramVals.push_back(10);
00478 paramVals.push_back(20);
00479 paramVals.push_back(50);
00480 if (!filter) {
00481 rsParams.compOp = COMP_NOOP;
00482 } else {
00483 rsParams.compOp = COMP_EQ;
00484 TupleDescriptor compareDesc;
00485
00486 compareDesc.push_back(nullAttrDesc);
00487 if (!compareParam) {
00488 compareDesc.push_back(nullAttrDesc);
00489 }
00490 TupleData compareData;
00491 compareData.compute(compareDesc);
00492 if (compareParam) {
00493 compareData[0].pData = (PConstBuffer) ¶mVals[1];
00494 } else {
00495 compareData[0].pData = (PConstBuffer) ¶mVals[1];
00496 compareData[1].pData = (PConstBuffer) ¶mVals[2];
00497 }
00498 TupleAccessor tupleAccessor;
00499 tupleAccessor.compute(compareDesc);
00500 pBuffer.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]);
00501 tupleAccessor.marshal(compareData, pBuffer.get());
00502 }
00503 rsParams.pCompTupleBuffer = pBuffer;
00504
00505 TupleProjection tupleProj;
00506 tupleProj.push_back(4);
00507 tupleProj.push_back(5);
00508 rsParams.inputCompareProj = tupleProj;
00509
00510 tupleProj.clear();
00511 tupleProj.push_back(3);
00512 tupleProj.push_back(0);
00513 tupleProj.push_back(2);
00514 rsParams.outputProj = tupleProj;
00515
00516 for (int i = 0; i < 3; i++) {
00517 if (cast) {
00518 rsParams.outputTupleDesc.push_back(nullAttrDesc);
00519 } else {
00520 rsParams.outputTupleDesc.push_back(notNullAttrDesc);
00521 }
00522 }
00523
00524
00525
00526
00527 std::vector<ReshapeParameter> dynamicParams;
00528 if (compareParam || outputParams.size() > 0) {
00529 for (uint i = 1; i < paramVals.size() + 1; i++) {
00530 SharedDynamicParamManager pDynamicParamManager =
00531 pGraph->getDynamicParamManager();
00532 pDynamicParamManager->createParam(
00533 DynamicParamId(i),
00534 notNullAttrDesc);
00535 TupleDatum paramValDatum;
00536 paramValDatum.pData = (PConstBuffer) &(paramVals[i - 1]);
00537 paramValDatum.cbData = sizeof(int64_t);
00538 pDynamicParamManager->writeParam(
00539 DynamicParamId(i),
00540 paramValDatum);
00541 dynamicParams.push_back(
00542 ReshapeParameter(
00543 DynamicParamId(i),
00544 ((i == 3) && compareParam) ? uint(5) : MAXU,
00545 (outputParams.find(i - 1) != outputParams.end())));
00546 }
00547 }
00548 rsParams.dynamicParameters = dynamicParams;
00549
00550
00551 columnGenerators.clear();
00552 colGen = SharedInt64ColumnGenerator(
00553 new SeqColumnGenerator(expectedStart + 3));
00554 columnGenerators.push_back(colGen);
00555 colGen = SharedInt64ColumnGenerator(
00556 new SeqColumnGenerator(expectedStart));
00557 columnGenerators.push_back(colGen);
00558 colGen = SharedInt64ColumnGenerator(
00559 new SeqColumnGenerator(expectedStart + 2));
00560 columnGenerators.push_back(colGen);
00561 for (uint i = 0; i < dynamicParams.size(); i++) {
00562 if (dynamicParams[i].outputParam) {
00563 colGen =
00564 SharedInt64ColumnGenerator(
00565 new ConstColumnGenerator(
00566 paramVals[opaqueToInt(
00567 dynamicParams[i].dynamicParamId) - 1]));
00568 columnGenerators.push_back(colGen);
00569 rsParams.outputTupleDesc.push_back(notNullAttrDesc);
00570 }
00571 }
00572
00573 ExecStreamEmbryo rsStreamEmbryo;
00574 rsStreamEmbryo.init(new ReshapeExecStream(),rsParams);
00575 rsStreamEmbryo.getStream()->setName("ReshapeExecStream");
00576 SharedExecStream pOutputStream = prepareTransformGraph(
00577 mockStreamEmbryo, rsStreamEmbryo);
00578
00579 CompositeExecStreamGenerator resultGenerator(columnGenerators);
00580 verifyOutput(*pOutputStream, expectedNRows, resultGenerator);
00581 }
00582
00583 void ExecStreamTestSuite::testSingleValueAggExecStream()
00584 {
00585 StandardTypeDescriptorFactory stdTypeFactory;
00586 TupleAttributeDescriptor attrDesc(
00587 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00588 TupleAttributeDescriptor attrDescNullable(
00589 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), true,
00590 sizeof(int64_t));
00591
00592
00593
00594 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGeneratorsIn;
00595
00596 SharedInt64ColumnGenerator col =
00597 SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00598 columnGeneratorsIn.push_back(col);
00599
00600
00601 MockProducerExecStreamParams mockParams;
00602 mockParams.outputTupleDesc.push_back(attrDesc);
00603 mockParams.nRows = 10;
00604 mockParams.pGenerator.reset(
00605 new CompositeExecStreamGenerator(columnGeneratorsIn));
00606
00607 ExecStreamEmbryo mockStreamEmbryo;
00608 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00609 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00610
00611
00612 SortedAggExecStreamParams aggParams;
00613 aggParams.groupByKeyCount = 1;
00614 aggParams.outputTupleDesc.push_back(attrDesc);
00615 aggParams.outputTupleDesc.push_back(attrDescNullable);
00616 AggInvocation singleValueInvocation;
00617 singleValueInvocation.aggFunction = AGG_FUNC_SINGLE_VALUE;
00618 singleValueInvocation.iInputAttr = 0;
00619 aggParams.aggInvocations.push_back(singleValueInvocation);
00620
00621 ExecStreamEmbryo aggStreamEmbryo;
00622
00623 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00624 aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00625
00626 SharedExecStream pOutputStream = prepareTransformGraph(
00627 mockStreamEmbryo,aggStreamEmbryo);
00628
00629
00630 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGeneratorsOut;
00631
00632 col =
00633 SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00634 columnGeneratorsOut.push_back(col);
00635
00636 col =
00637 SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00638 columnGeneratorsOut.push_back(col);
00639
00640 CompositeExecStreamGenerator expectedResultGenerator(columnGeneratorsOut);
00641
00642 verifyOutput(*pOutputStream, mockParams.nRows, expectedResultGenerator);
00643 }
00644
00645 void ExecStreamTestSuite::testMergeImplicitPullInputs()
00646 {
00647
00648
00649
00650
00651
00652
00653 StandardTypeDescriptorFactory stdTypeFactory;
00654 TupleAttributeDescriptor attrDesc(
00655 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00656 TupleAttributeDescriptor nullAttrDesc(
00657 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64),
00658 true, sizeof(int64_t));
00659
00660
00661
00662
00663
00664
00665 MockProducerExecStreamParams mockParams;
00666 mockParams.outputTupleDesc.push_back(attrDesc);
00667
00668 uint nInputs = 5;
00669 uint nRows = nInputs * 4000;
00670 mockParams.nRows = nRows;
00671 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerator;
00672 columnGenerator.push_back(
00673 SharedInt64ColumnGenerator(
00674 new DupRepeatingSeqColumnGenerator(nInputs, 1)));
00675 mockParams.pGenerator.reset(
00676 new CompositeExecStreamGenerator(columnGenerator));
00677
00678 ExecStreamEmbryo mockStreamEmbryo;
00679 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00680 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00681
00682
00683
00684
00685
00686 SplitterExecStreamParams splitterParams;
00687 ExecStreamEmbryo splitterStreamEmbryo;
00688 splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams);
00689 splitterStreamEmbryo.getStream()->setName("SplitterExecStream");
00690
00691 vector<vector<ExecStreamEmbryo> > reshapeEmbryoStreamList;
00692 for (int i = 0; i < nInputs; i++) {
00693 ReshapeExecStreamParams rsParams;
00694 boost::shared_array<FixedBuffer> pBuffer;
00695 rsParams.compOp = COMP_EQ;
00696 int64_t key = i;
00697 TupleDescriptor compareDesc;
00698
00699 compareDesc.push_back(nullAttrDesc);
00700 TupleData compareData;
00701 compareData.compute(compareDesc);
00702 compareData[0].pData = (PConstBuffer) &key;
00703 TupleAccessor tupleAccessor;
00704 tupleAccessor.compute(compareDesc);
00705 pBuffer.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]);
00706 tupleAccessor.marshal(compareData, pBuffer.get());
00707 rsParams.pCompTupleBuffer = pBuffer;
00708 TupleProjection tupleProj;
00709 tupleProj.push_back(0);
00710 rsParams.inputCompareProj = tupleProj;
00711 rsParams.outputProj = tupleProj;
00712
00713 ExecStreamEmbryo rsStreamEmbryo;
00714 rsStreamEmbryo.init(new ReshapeExecStream(), rsParams);
00715 std::ostringstream oss;
00716 oss << "ReshapeExecStream" << "#" << i;
00717 rsStreamEmbryo.getStream()->setName(oss.str());
00718
00719 vector<ExecStreamEmbryo> reshapeStreamEmbryo;
00720 reshapeStreamEmbryo.push_back(rsStreamEmbryo);
00721
00722
00723
00724
00725 if (i != 0) {
00726 SegBufferExecStreamParams bufParams;
00727 bufParams.scratchAccessor.pSegment = pRandomSegment;
00728 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
00729 bufParams.multipass = false;
00730
00731 ExecStreamEmbryo bufStreamEmbryo;
00732 bufStreamEmbryo.init(new SegBufferExecStream(), bufParams);
00733 std::ostringstream oss;
00734 oss << "SegBufferExecStream" << "#" << i;
00735 bufStreamEmbryo.getStream()->setName(oss.str());
00736
00737 reshapeStreamEmbryo.push_back(bufStreamEmbryo);
00738 }
00739 reshapeEmbryoStreamList.push_back(reshapeStreamEmbryo);
00740 }
00741
00742
00743 MergeExecStreamParams mergeParams;
00744 mergeParams.outputTupleDesc.push_back(attrDesc);
00745
00746 ExecStreamEmbryo mergeStreamEmbryo;
00747 mergeStreamEmbryo.init(new MergeExecStream(), mergeParams);
00748 mergeStreamEmbryo.getStream()->setName("MergeExecStream");
00749
00750 SharedExecStream pOutputStream =
00751 prepareDAG(
00752 mockStreamEmbryo,
00753 splitterStreamEmbryo,
00754 reshapeEmbryoStreamList,
00755 mergeStreamEmbryo);
00756
00757
00758
00759 StairCaseExecStreamGenerator expectedResultGenerator(1, nRows / nInputs);
00760
00761 verifyOutput(*pOutputStream, nRows, expectedResultGenerator);
00762 }
00763
00764 void ExecStreamTestSuite::testBTreeInsertExecStream(
00765 bool useDynamicBTree,
00766 uint nRows)
00767 {
00768 StandardTypeDescriptorFactory stdTypeFactory;
00769 TupleAttributeDescriptor attrDesc(
00770 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00771
00772
00773
00774
00775 MockProducerExecStreamParams mockParams;
00776 mockParams.nRows = nRows;
00777 mockParams.outputTupleDesc.push_back(attrDesc);
00778 mockParams.outputTupleDesc.push_back(attrDesc);
00779 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00780 columnGenerators.push_back(
00781 SharedInt64ColumnGenerator(new SeqColumnGenerator(0)));
00782 columnGenerators.push_back(
00783 SharedInt64ColumnGenerator(new SeqColumnGenerator(nRows)));
00784 mockParams.pGenerator.reset(
00785 new CompositeExecStreamGenerator(columnGenerators));
00786
00787 ExecStreamEmbryo mockStreamEmbryo;
00788 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00789 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00790
00791
00792 BTreeDescriptor descriptor;
00793 BTreeInsertExecStreamParams bTreeInsertParams;
00794 descriptor.tupleDescriptor.push_back(attrDesc);
00795 descriptor.tupleDescriptor.push_back(attrDesc);
00796 descriptor.keyProjection.push_back(0);
00797 descriptor.segmentAccessor.pSegment = pRandomSegment;
00798 descriptor.segmentAccessor.pCacheAccessor = pCacheAccessor;
00799 BTreeBuilder builder(descriptor, pRandomSegment);
00800 if (!useDynamicBTree) {
00801 builder.createEmptyRoot();
00802 descriptor.rootPageId = builder.getRootPageId();
00803 bTreeInsertParams.rootPageIdParamId = DynamicParamId(0);
00804 } else {
00805 descriptor.rootPageId = NULL_PAGE_ID;
00806 bTreeInsertParams.rootPageIdParamId = DynamicParamId(1);
00807 }
00808
00809 bTreeInsertParams.scratchAccessor =
00810 pSegmentFactory->newScratchSegment(pCache, 10);
00811 bTreeInsertParams.pCacheAccessor = pCacheAccessor;
00812 bTreeInsertParams.distinctness = DUP_FAIL;
00813 bTreeInsertParams.monotonic = true;
00814 bTreeInsertParams.pSegment = pRandomSegment;
00815 bTreeInsertParams.pCacheAccessor = pCacheAccessor;
00816 bTreeInsertParams.rootPageId = descriptor.rootPageId;
00817 bTreeInsertParams.segmentId = descriptor.segmentId;
00818 bTreeInsertParams.pageOwnerId = descriptor.pageOwnerId;
00819 bTreeInsertParams.tupleDesc = descriptor.tupleDescriptor;
00820 bTreeInsertParams.keyProj = descriptor.keyProjection;
00821 bTreeInsertParams.pRootMap = 0;
00822 bTreeInsertParams.outputTupleDesc.push_back(attrDesc);
00823
00824 ExecStreamEmbryo bTreeInsertEmbryo;
00825 bTreeInsertEmbryo.init(new BTreeInsertExecStream(), bTreeInsertParams);
00826 bTreeInsertEmbryo.getStream()->setName("BTreeInsertExecStream");
00827
00828 SharedExecStream pOutputStream =
00829 prepareTransformGraph(mockStreamEmbryo, bTreeInsertEmbryo);
00830
00831 ConstExecStreamGenerator expectedResultGenerator(0);
00832 verifyOutput(*pOutputStream, 0, expectedResultGenerator);
00833
00834
00835 if (useDynamicBTree) {
00836 descriptor.rootPageId =
00837 *reinterpret_cast<PageId const *>(
00838 pGraph->getDynamicParamManager()->getParam(
00839 DynamicParamId(1)).getDatum().pData);
00840 }
00841
00842
00843
00844 BTreeReader reader(descriptor);
00845 bool found = reader.searchFirst();
00846 if (!found) {
00847 BOOST_FAIL("searchFirst found nothing");
00848 }
00849 TupleData tupleData;
00850 tupleData.compute(descriptor.tupleDescriptor);
00851 for (uint i = 0; i < nRows; i++) {
00852 if (!found) {
00853 BOOST_FAIL("Could not searchNext for key #" << i);
00854 }
00855 reader.getTupleAccessorForRead().unmarshal(tupleData);
00856 uint64_t key = *reinterpret_cast<uint64_t const *>(tupleData[0].pData);
00857 uint64_t val = *reinterpret_cast<uint64_t const *>(tupleData[1].pData);
00858 BOOST_CHECK_EQUAL(key, i);
00859 BOOST_CHECK_EQUAL(val, i + nRows);
00860 found = reader.searchNext();
00861 }
00862 if (!reader.isSingular()) {
00863 BOOST_FAIL("Should have reached end of tree");
00864 }
00865 reader.endSearch();
00866 }
00867
00868 void ExecStreamTestSuite::testNestedLoopJoinExecStream(
00869 uint nRowsLeft,
00870 uint nRowsRight)
00871 {
00872
00873
00874 StandardTypeDescriptorFactory stdTypeFactory;
00875 TupleAttributeDescriptor attrDesc(
00876 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00877
00878
00879 MockProducerExecStreamParams paramsMockOuter;
00880 paramsMockOuter.outputTupleDesc.push_back(attrDesc);
00881 paramsMockOuter.nRows = nRowsLeft;
00882 paramsMockOuter.pGenerator.reset(new RampExecStreamGenerator(0, 1));
00883
00884 ExecStreamEmbryo outerStreamEmbryo;
00885 outerStreamEmbryo.init(new MockProducerExecStream(), paramsMockOuter);
00886 outerStreamEmbryo.getStream()->setName("OuterProducerExecStream");
00887
00888
00889
00890
00891
00892
00893 MockProducerExecStreamParams paramsMockInner;
00894 paramsMockInner.outputTupleDesc.push_back(attrDesc);
00895 paramsMockInner.nRows = nRowsRight;
00896 paramsMockInner.pGenerator.reset(new RampExecStreamGenerator(0, 1));
00897
00898 ExecStreamEmbryo innerStreamEmbryo;
00899 innerStreamEmbryo.init(new MockProducerExecStream(), paramsMockInner);
00900 innerStreamEmbryo.getStream()->setName("InnerProducerExecStream");
00901
00902 ReshapeExecStreamParams paramsReshape;
00903 paramsReshape.compOp = COMP_EQ;
00904 paramsReshape.outputProj.push_back(0);
00905 paramsReshape.inputCompareProj.push_back(0);
00906 paramsReshape.dynamicParameters.push_back(
00907 ReshapeParameter(DynamicParamId(1), 0, false));
00908 paramsReshape.outputTupleDesc.push_back(attrDesc);
00909
00910 ExecStreamEmbryo reshapeStreamEmbryo;
00911 reshapeStreamEmbryo.init(new ReshapeExecStream(), paramsReshape);
00912 reshapeStreamEmbryo.getStream()->setName("ReshapeExecStream");
00913
00914
00915
00916
00917 ValuesExecStreamParams paramsValues;
00918 paramsValues.bufSize = 0;
00919 paramsValues.outputTupleDesc.push_back(attrDesc);
00920 ExecStreamEmbryo valuesStreamEmbryo;
00921 valuesStreamEmbryo.init(new ValuesExecStream(), paramsValues);
00922 valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00923
00924
00925 std::vector<std::vector<ExecStreamEmbryo> > sourceStreamEmbryosList;
00926 std::vector<ExecStreamEmbryo> sourceStreamEmbryos;
00927 sourceStreamEmbryos.push_back(outerStreamEmbryo);
00928 sourceStreamEmbryosList.push_back(sourceStreamEmbryos);
00929
00930 sourceStreamEmbryos.clear();
00931 sourceStreamEmbryos.push_back(innerStreamEmbryo);
00932 sourceStreamEmbryos.push_back(reshapeStreamEmbryo);
00933 sourceStreamEmbryosList.push_back(sourceStreamEmbryos);
00934
00935 sourceStreamEmbryos.clear();
00936 sourceStreamEmbryos.push_back(valuesStreamEmbryo);
00937 sourceStreamEmbryosList.push_back(sourceStreamEmbryos);
00938
00939 NestedLoopJoinExecStreamParams paramsJoin;
00940 paramsJoin.leftOuter = false;
00941 paramsJoin.leftJoinKeys.push_back(
00942 NestedLoopJoinKey(DynamicParamId(1), 0));
00943
00944 ExecStreamEmbryo joinStreamEmbryo;
00945 joinStreamEmbryo.init(new NestedLoopJoinExecStream(), paramsJoin);
00946 joinStreamEmbryo.getStream()->setName("NestedLoopJoinExecStream");
00947
00948 SharedExecStream pOutputStream =
00949 prepareConfluenceGraph(sourceStreamEmbryosList, joinStreamEmbryo);
00950
00951 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00952 SharedInt64ColumnGenerator colGen =
00953 SharedInt64ColumnGenerator(new SeqColumnGenerator(0));
00954 columnGenerators.push_back(colGen);
00955 colGen = SharedInt64ColumnGenerator(new SeqColumnGenerator(0));
00956 columnGenerators.push_back(colGen);
00957
00958 CompositeExecStreamGenerator resultGenerator(columnGenerators);
00959 verifyOutput(
00960 *pOutputStream, std::min(nRowsLeft, nRowsRight), resultGenerator);
00961 }
00962
00963 void ExecStreamTestSuite::testSplitterPlusBarrier()
00964 {
00965 StandardTypeDescriptorFactory stdTypeFactory;
00966 TupleAttributeDescriptor attrDesc(
00967 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00968 MockProducerExecStreamParams mockParams;
00969 mockParams.outputTupleDesc.push_back(attrDesc);
00970 uint nRows = 10000;
00971 mockParams.nRows = nRows;
00972 ExecStreamEmbryo mockStreamEmbryo;
00973 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00974 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00975
00976 SplitterExecStreamParams splitterParams;
00977 ExecStreamEmbryo splitterStreamEmbryo;
00978 splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams);
00979 splitterStreamEmbryo.getStream()->setName("SplitterExecStream");
00980
00981 vector<vector<ExecStreamEmbryo> > aggEmbryoStreamList;
00982 for (int i = 0; i < 10; i++) {
00983 SortedAggExecStreamParams aggParams;
00984 aggParams.groupByKeyCount = 0;
00985 aggParams.outputTupleDesc.push_back(attrDesc);
00986 AggInvocation countInvocation;
00987 countInvocation.aggFunction = AGG_FUNC_COUNT;
00988 countInvocation.iInputAttr = -1;
00989 aggParams.aggInvocations.push_back(countInvocation);
00990
00991 ExecStreamEmbryo aggStreamEmbryo;
00992 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00993 std::ostringstream oss;
00994 oss << "AggExecStream" << "#" << i;
00995 aggStreamEmbryo.getStream()->setName(oss.str());
00996 vector<ExecStreamEmbryo> v;
00997 v.push_back(aggStreamEmbryo);
00998 aggEmbryoStreamList.push_back(v);
00999 }
01000
01001 BarrierExecStreamParams barrierParams;
01002 barrierParams.outputTupleDesc.push_back(attrDesc);
01003 barrierParams.returnMode = BARRIER_RET_ANY_INPUT;
01004 ExecStreamEmbryo barrierStreamEmbryo;
01005 barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams);
01006 barrierStreamEmbryo.getStream()->setName("BarrierExecStream");
01007
01008 SharedExecStream pOutputStream =
01009 prepareDAG(
01010 mockStreamEmbryo,
01011 splitterStreamEmbryo,
01012 aggEmbryoStreamList,
01013 barrierStreamEmbryo);
01014
01015 ConstExecStreamGenerator expectedResultGenerator(nRows);
01016 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
01017 }
01018
01019 void ExecStreamTestSuite::testSegBufferReaderWriterExecStream(
01020 bool restartable,
01021 bool earlyClose)
01022 {
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037 StandardTypeDescriptorFactory stdTypeFactory;
01038 TupleAttributeDescriptor attrDesc(
01039 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
01040
01041 MockProducerExecStreamParams mockParams;
01042 mockParams.outputTupleDesc.push_back(attrDesc);
01043 uint nRows;
01044 if (!restartable) {
01045 nRows = 10000;
01046 } else {
01047 nRows = 200;
01048 }
01049 mockParams.nRows = nRows;
01050
01051 ExecStreamEmbryo mockStreamEmbryo;
01052 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
01053 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
01054
01055 SegBufferWriterExecStreamParams writerParams;
01056 writerParams.scratchAccessor.pSegment = pRandomSegment;
01057 writerParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01058 writerParams.readerRefCountParamId = DynamicParamId(1);
01059 writerParams.outputTupleDesc.push_back(attrDesc);
01060
01061 ExecStreamEmbryo writerStreamEmbryo;
01062 writerStreamEmbryo.init(
01063 new SegBufferWriterExecStream(),
01064 writerParams);
01065 writerStreamEmbryo.getStream()->setName("SegBufferWriterExecStream");
01066
01067 SegBufferReaderExecStreamParams readerParams;
01068 readerParams.scratchAccessor.pSegment = pRandomSegment;
01069 readerParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01070 readerParams.readerRefCountParamId = DynamicParamId(1);
01071 readerParams.outputTupleDesc.push_back(attrDesc);
01072
01073 ExecStreamEmbryo readerStreamEmbryo1;
01074 readerStreamEmbryo1.init(
01075 new SegBufferReaderExecStream(),
01076 readerParams);
01077 readerStreamEmbryo1.getStream()->setName("SegBufferReaderExecStream1");
01078
01079 ExecStreamEmbryo readerStreamEmbryo2;
01080 readerStreamEmbryo2.init(
01081 new SegBufferReaderExecStream(),
01082 readerParams);
01083 readerStreamEmbryo2.getStream()->setName("SegBufferReaderExecStream2");
01084
01085 std::vector<std::vector<ExecStreamEmbryo> > interStreamEmbryosList;
01086 std::vector<ExecStreamEmbryo> readerInput;
01087 readerInput.push_back(readerStreamEmbryo1);
01088 if (earlyClose) {
01089 SegBufferExecStreamParams bufParams;
01090 bufParams.scratchAccessor.pSegment = pRandomSegment;
01091 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01092 bufParams.multipass = false;
01093
01094 ExecStreamEmbryo bufStreamEmbryo;
01095 bufStreamEmbryo.init(new SegBufferExecStream(),bufParams);
01096 bufStreamEmbryo.getStream()->setName("SegBufferExecStream1");
01097 readerInput.push_back(bufStreamEmbryo);
01098 }
01099 interStreamEmbryosList.push_back(readerInput);
01100
01101 readerInput.clear();
01102 readerInput.push_back(readerStreamEmbryo2);
01103 if (earlyClose) {
01104 SegBufferExecStreamParams bufParams;
01105 bufParams.scratchAccessor.pSegment = pRandomSegment;
01106 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01107 if (restartable) {
01108 bufParams.multipass = true;
01109 }
01110
01111 ExecStreamEmbryo bufStreamEmbryo;
01112 bufStreamEmbryo.init(new SegBufferExecStream(),bufParams);
01113 bufStreamEmbryo.getStream()->setName("SegBufferExecStream2");
01114 readerInput.push_back(bufStreamEmbryo);
01115 }
01116 interStreamEmbryosList.push_back(readerInput);
01117
01118 MergeExecStreamParams mergeParams;
01119 CartesianJoinExecStreamParams joinParams;
01120 if (restartable) {
01121 joinParams.leftOuter = false;
01122 } else {
01123 mergeParams.outputTupleDesc.push_back(attrDesc);
01124 if (getDegreeOfParallelism() != 1) {
01125 mergeParams.isParallel = true;
01126 }
01127 }
01128
01129 ExecStreamEmbryo execStreamEmbryo;
01130 if (restartable) {
01131 execStreamEmbryo.init(new CartesianJoinExecStream(), joinParams);
01132 execStreamEmbryo.getStream()->setName("CartesianJoinExecStream");
01133 } else {
01134 execStreamEmbryo.init(new MergeExecStream(), mergeParams);
01135 execStreamEmbryo.getStream()->setName("MergeExecStream");
01136 }
01137
01138 SharedExecStream pOutputStream =
01139 prepareDAG(
01140 mockStreamEmbryo,
01141 writerStreamEmbryo,
01142 interStreamEmbryosList,
01143 execStreamEmbryo);
01144
01145 int64_t zero = 0;
01146 TupleDescriptor expectedDesc;
01147 expectedDesc.push_back(attrDesc);
01148 if (restartable) {
01149 expectedDesc.push_back(attrDesc);
01150 }
01151 TupleData expectedTuple;
01152 expectedTuple.compute(expectedDesc);
01153 expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
01154 if (restartable) {
01155 expectedTuple[1].pData = reinterpret_cast<PBuffer>(&zero);
01156 }
01157
01158 verifyConstantOutput(
01159 *pOutputStream,
01160 expectedTuple,
01161 restartable ? nRows * nRows : 2 * nRows);
01162 }
01163
01164