ExecStreamTestSuite.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/test/ExecStreamTestSuite.cpp#26 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/common/FemEnums.h"
00026 #include "fennel/btree/BTreeDescriptor.h"
00027 #include "fennel/btree/BTreeBuilder.h"
00028 #include "fennel/btree/BTreeReader.h"
00029 #include "fennel/test/ExecStreamTestSuite.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/exec/ExecStream.h"
00032 #include "fennel/exec/ExecStreamGraph.h"
00033 #include "fennel/exec/ExecStreamBufAccessor.h"
00034 #include "fennel/exec/MockProducerExecStream.h"
00035 #include "fennel/exec/ScratchBufferExecStream.h"
00036 #include "fennel/exec/DoubleBufferExecStream.h"
00037 #include "fennel/exec/CopyExecStream.h"
00038 #include "fennel/exec/MergeExecStream.h"
00039 #include "fennel/exec/SegBufferExecStream.h"
00040 #include "fennel/exec/SegBufferReaderExecStream.h"
00041 #include "fennel/exec/SegBufferWriterExecStream.h"
00042 #include "fennel/exec/CartesianJoinExecStream.h"
00043 #include "fennel/exec/SortedAggExecStream.h"
00044 #include "fennel/exec/ReshapeExecStream.h"
00045 #include "fennel/exec/SplitterExecStream.h"
00046 #include "fennel/exec/BarrierExecStream.h"
00047 #include "fennel/exec/ValuesExecStream.h"
00048 #include "fennel/exec/NestedLoopJoinExecStream.h"
00049 #include "fennel/exec/ExecStreamEmbryo.h"
00050 #include "fennel/tuple/StandardTypeDescriptor.h"
00051 #include "fennel/ftrs/BTreeInsertExecStream.h"
00052 
00053 using namespace fennel;
00054 
00055 uint ExecStreamTestSuite::getDegreeOfParallelism()
00056 {
00057     return 1;
00058 }
00059 
00060 void ExecStreamTestSuite::testScratchBufferExecStream()
00061 {
00062     StandardTypeDescriptorFactory stdTypeFactory;
00063     TupleAttributeDescriptor attrDesc(
00064         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00065 
00066     MockProducerExecStreamParams mockParams;
00067     mockParams.outputTupleDesc.push_back(attrDesc);
00068     mockParams.nRows = 5000;     // at least two buffers
00069     mockParams.pGenerator.reset(new RampExecStreamGenerator());
00070 
00071     ExecStreamEmbryo mockStreamEmbryo;
00072     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00073     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00074 
00075     ScratchBufferExecStreamParams bufParams;
00076     bufParams.scratchAccessor =
00077         pSegmentFactory->newScratchSegment(pCache,1);
00078 
00079     ExecStreamEmbryo bufStreamEmbryo;
00080     bufStreamEmbryo.init(new ScratchBufferExecStream(),bufParams);
00081     bufStreamEmbryo.getStream()->setName("ScratchBufferExecStream");
00082 
00083     SharedExecStream pOutputStream = prepareTransformGraph(
00084         mockStreamEmbryo, bufStreamEmbryo);
00085 
00086     verifyOutput(
00087         *pOutputStream,
00088         mockParams.nRows,
00089         *(mockParams.pGenerator));
00090 }
00091 
00092 
00093 void ExecStreamTestSuite::testDoubleBufferExecStream()
00094 {
00095     StandardTypeDescriptorFactory stdTypeFactory;
00096     TupleAttributeDescriptor attrDesc(
00097         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00098 
00099     MockProducerExecStreamParams mockParams;
00100     mockParams.outputTupleDesc.push_back(attrDesc);
00101     mockParams.nRows = 25000;     // cycle through a few buffers
00102     mockParams.pGenerator.reset(new RampExecStreamGenerator());
00103 
00104     ExecStreamEmbryo mockStreamEmbryo;
00105     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00106     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00107 
00108     DoubleBufferExecStreamParams bufParams;
00109     bufParams.scratchAccessor =
00110         pSegmentFactory->newScratchSegment(pCache,1);
00111 
00112     ExecStreamEmbryo bufStreamEmbryo;
00113     bufStreamEmbryo.init(new DoubleBufferExecStream(),bufParams);
00114     bufStreamEmbryo.getStream()->setName("DoubleBufferExecStream");
00115 
00116     SharedExecStream pOutputStream = prepareTransformGraph(
00117         mockStreamEmbryo, bufStreamEmbryo);
00118 
00119     verifyOutput(
00120         *pOutputStream,
00121         mockParams.nRows,
00122         *(mockParams.pGenerator));
00123 }
00124 
00125 void ExecStreamTestSuite::testCopyExecStream()
00126 {
00127     StandardTypeDescriptorFactory stdTypeFactory;
00128     TupleAttributeDescriptor attrDesc(
00129         stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00130 
00131     MockProducerExecStreamParams mockParams;
00132     mockParams.outputTupleDesc.push_back(attrDesc);
00133     mockParams.nRows = 10000;   // at least two buffers
00134 
00135     ExecStreamEmbryo mockStreamEmbryo;
00136     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00137     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00138 
00139     CopyExecStreamParams copyParams;
00140     copyParams.outputTupleDesc.push_back(attrDesc);
00141 
00142     ExecStreamEmbryo copyStreamEmbryo;
00143     copyStreamEmbryo.init(new CopyExecStream(),copyParams);
00144     copyStreamEmbryo.getStream()->setName("CopyExecStream");
00145 
00146     SharedExecStream pOutputStream = prepareTransformGraph(
00147         mockStreamEmbryo,copyStreamEmbryo);
00148 
00149     int32_t zero = 0;
00150     TupleDescriptor expectedDesc;
00151     expectedDesc.push_back(attrDesc);
00152     TupleData expectedTuple;
00153     expectedTuple.compute(expectedDesc);
00154     expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00155     verifyConstantOutput(
00156         *pOutputStream,
00157         expectedTuple,
00158         mockParams.nRows);
00159 }
00160 
00161 void ExecStreamTestSuite::testMergeExecStream()
00162 {
00163     // simulate SELECT * FROM t10k UNION ALL SELECT * FROM 10k;
00164 
00165     StandardTypeDescriptorFactory stdTypeFactory;
00166     TupleAttributeDescriptor attrDesc(
00167         stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00168 
00169     MockProducerExecStreamParams paramsMock;
00170     paramsMock.outputTupleDesc.push_back(attrDesc);
00171     paramsMock.nRows = 10000; // at least two buffers
00172 
00173     ExecStreamEmbryo mockStreamEmbryo1;
00174     mockStreamEmbryo1.init(new MockProducerExecStream(),paramsMock);
00175     mockStreamEmbryo1.getStream()->setName("MockProducerExecStream1");
00176 
00177     ExecStreamEmbryo mockStreamEmbryo2;
00178     mockStreamEmbryo2.init(new MockProducerExecStream(),paramsMock);
00179     mockStreamEmbryo2.getStream()->setName("MockProducerExecStream2");
00180 
00181     MergeExecStreamParams paramsMerge;
00182     paramsMerge.outputTupleDesc.push_back(attrDesc);
00183     if (getDegreeOfParallelism() != 1) {
00184         paramsMerge.isParallel = true;
00185     }
00186 
00187     ExecStreamEmbryo mergeStreamEmbryo;
00188     mergeStreamEmbryo.init(new MergeExecStream(),paramsMerge);
00189     mergeStreamEmbryo.getStream()->setName("MergeExecStream");
00190 
00191     SharedExecStream pOutputStream = prepareConfluenceGraph(
00192         mockStreamEmbryo1,
00193         mockStreamEmbryo2,
00194         mergeStreamEmbryo);
00195 
00196     int32_t zero = 0;
00197     TupleDescriptor expectedDesc;
00198     expectedDesc.push_back(attrDesc);
00199     TupleData expectedTuple;
00200     expectedTuple.compute(expectedDesc);
00201     expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00202     verifyConstantOutput(
00203         *pOutputStream,
00204         expectedTuple,
00205         2*paramsMock.nRows);
00206 }
00207 
00208 void ExecStreamTestSuite::testSegBufferExecStream()
00209 {
00210     StandardTypeDescriptorFactory stdTypeFactory;
00211     TupleAttributeDescriptor attrDesc(
00212         stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00213 
00214     MockProducerExecStreamParams mockParams;
00215     mockParams.outputTupleDesc.push_back(attrDesc);
00216     mockParams.nRows = 10000;     // at least two buffers
00217 
00218     ExecStreamEmbryo mockStreamEmbryo;
00219     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00220     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00221 
00222     SegBufferExecStreamParams bufParams;
00223     bufParams.scratchAccessor.pSegment = pRandomSegment;
00224     bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
00225     bufParams.multipass = false;
00226 
00227     ExecStreamEmbryo bufStreamEmbryo;
00228     bufStreamEmbryo.init(new SegBufferExecStream(),bufParams);
00229     bufStreamEmbryo.getStream()->setName("SegBufferExecStream");
00230 
00231     SharedExecStream pOutputStream = prepareTransformGraph(
00232         mockStreamEmbryo, bufStreamEmbryo);
00233 
00234     int32_t zero = 0;
00235     TupleDescriptor expectedDesc;
00236     expectedDesc.push_back(attrDesc);
00237     TupleData expectedTuple;
00238     expectedTuple.compute(expectedDesc);
00239     expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00240     verifyConstantOutput(
00241         *pOutputStream,
00242         expectedTuple,
00243         mockParams.nRows);
00244 }
00245 
00246 void ExecStreamTestSuite::testCartesianJoinExecStream(
00247     uint nRowsOuter,uint nRowsInner)
00248 {
00249     // simulate SELECT * FROM t1, t2
00250 
00251     StandardTypeDescriptorFactory stdTypeFactory;
00252     TupleAttributeDescriptor attrDesc(
00253         stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00254 
00255     MockProducerExecStreamParams paramsMockOuter;
00256     paramsMockOuter.outputTupleDesc.push_back(attrDesc);
00257     paramsMockOuter.nRows = nRowsOuter;
00258 
00259     ExecStreamEmbryo outerStreamEmbryo;
00260     outerStreamEmbryo.init(new MockProducerExecStream(),paramsMockOuter);
00261     outerStreamEmbryo.getStream()->setName("OuterProducerExecStream");
00262 
00263     MockProducerExecStreamParams paramsMockInner(paramsMockOuter);
00264     paramsMockInner.nRows = nRowsInner;
00265 
00266     ExecStreamEmbryo innerStreamEmbryo;
00267     innerStreamEmbryo.init(new MockProducerExecStream(),paramsMockInner);
00268     innerStreamEmbryo.getStream()->setName("InnerProducerExecStream");
00269 
00270     CartesianJoinExecStreamParams paramsJoin;
00271     paramsJoin.leftOuter = false;
00272 
00273     ExecStreamEmbryo joinStreamEmbryo;
00274     joinStreamEmbryo.init(new CartesianJoinExecStream(),paramsJoin);
00275     joinStreamEmbryo.getStream()->setName("CartesianJoinExecStream");
00276 
00277     SharedExecStream pOutputStream = prepareConfluenceGraph(
00278         outerStreamEmbryo,
00279         innerStreamEmbryo,
00280         joinStreamEmbryo);
00281 
00282     int32_t zero = 0;
00283     TupleDescriptor expectedDesc;
00284     expectedDesc.push_back(attrDesc);
00285     expectedDesc.push_back(attrDesc);
00286     TupleData expectedTuple;
00287     expectedTuple.compute(expectedDesc);
00288     expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
00289     expectedTuple[1].pData = reinterpret_cast<PBuffer>(&zero);
00290     verifyConstantOutput(
00291         *pOutputStream,
00292         expectedTuple,
00293         nRowsOuter*nRowsInner);
00294 }
00295 
00296 void ExecStreamTestSuite::testCountAggExecStream()
00297 {
00298     StandardTypeDescriptorFactory stdTypeFactory;
00299     TupleAttributeDescriptor attrDesc(
00300         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00301 
00302     MockProducerExecStreamParams mockParams;
00303     mockParams.outputTupleDesc.push_back(attrDesc);
00304     mockParams.nRows = 10000;   // at least two buffers
00305 
00306     ExecStreamEmbryo mockStreamEmbryo;
00307     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00308     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00309 
00310     // simulate SELECT COUNT(*) FROM t10k
00311     SortedAggExecStreamParams aggParams;
00312     aggParams.groupByKeyCount = 0;
00313     aggParams.outputTupleDesc.push_back(attrDesc);
00314     AggInvocation countInvocation;
00315     countInvocation.aggFunction = AGG_FUNC_COUNT;
00316     countInvocation.iInputAttr = -1; // interpreted as COUNT(*)
00317     aggParams.aggInvocations.push_back(countInvocation);
00318 
00319     ExecStreamEmbryo aggStreamEmbryo;
00320     aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00321     aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00322 
00323     SharedExecStream pOutputStream = prepareTransformGraph(
00324         mockStreamEmbryo,aggStreamEmbryo);
00325 
00326     // set up a generator which can produce the expected output
00327     // (a count of 10000)
00328     RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00329 
00330     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00331 }
00332 
00333 void ExecStreamTestSuite::testSumAggExecStream()
00334 {
00335     StandardTypeDescriptorFactory stdTypeFactory;
00336     TupleAttributeDescriptor attrDesc(
00337         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00338 
00339     MockProducerExecStreamParams mockParams;
00340     mockParams.outputTupleDesc.push_back(attrDesc);
00341     mockParams.nRows = 10000;   // at least two buffers
00342     mockParams.pGenerator.reset(new RampExecStreamGenerator());
00343 
00344     ExecStreamEmbryo mockStreamEmbryo;
00345     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00346     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00347 
00348     // simulate SELECT SUM(x) FROM t10k with x iterating from 0 to 9999
00349     SortedAggExecStreamParams aggParams;
00350     aggParams.groupByKeyCount = 0;
00351     attrDesc.isNullable = true;
00352     aggParams.outputTupleDesc.push_back(attrDesc);
00353     AggInvocation sumInvocation;
00354     sumInvocation.aggFunction = AGG_FUNC_SUM;
00355     sumInvocation.iInputAttr = 0;
00356     aggParams.aggInvocations.push_back(sumInvocation);
00357 
00358     ExecStreamEmbryo aggStreamEmbryo;
00359     aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00360     aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00361 
00362     SharedExecStream pOutputStream = prepareTransformGraph(
00363         mockStreamEmbryo,aggStreamEmbryo);
00364 
00365     // set up a generator which can produce the expected output
00366     // (a count of 5000*9999)
00367     RampExecStreamGenerator expectedResultGenerator(
00368         (mockParams.nRows-1)*mockParams.nRows/2);
00369 
00370     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00371 }
00372 
00373 void ExecStreamTestSuite::testGroupAggExecStreamNrows(uint nrows)
00374 {
00375     StandardTypeDescriptorFactory stdTypeFactory;
00376     TupleAttributeDescriptor attrDesc(
00377         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00378 
00379     // Create two columns, both with two duplicates per column.
00380     MockProducerExecStreamParams mockParams;
00381     mockParams.outputTupleDesc.push_back(attrDesc);
00382     mockParams.outputTupleDesc.push_back(attrDesc);
00383     mockParams.nRows = nrows;   // at least two buffers
00384     mockParams.pGenerator.reset(new RampDuplicateExecStreamGenerator());
00385 
00386     ExecStreamEmbryo mockStreamEmbryo;
00387     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00388     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00389 
00390     // simulate SELECT col, COUNT(*) FROM t10k GROUP BY col;
00391     SortedAggExecStreamParams aggParams;
00392     aggParams.groupByKeyCount = 1;
00393     aggParams.outputTupleDesc.push_back(attrDesc);
00394     aggParams.outputTupleDesc.push_back(attrDesc);
00395     AggInvocation countInvocation;
00396     countInvocation.aggFunction = AGG_FUNC_COUNT;
00397     countInvocation.iInputAttr = -1; // interpreted as COUNT(*)
00398     aggParams.aggInvocations.push_back(countInvocation);
00399 
00400     ExecStreamEmbryo aggStreamEmbryo;
00401 
00402     aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00403     aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00404 
00405     SharedExecStream pOutputStream = prepareTransformGraph(
00406         mockStreamEmbryo,aggStreamEmbryo);
00407 
00408     // Result should be a sequence of values in the first column
00409     // and 2 for the second column
00410     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00411 
00412     SharedInt64ColumnGenerator col =
00413         SharedInt64ColumnGenerator(new SeqColumnGenerator());
00414     columnGenerators.push_back(col);
00415 
00416     col = SharedInt64ColumnGenerator(new ConstColumnGenerator(2));
00417     columnGenerators.push_back(col);
00418 
00419     CompositeExecStreamGenerator expectedResultGenerator(columnGenerators);
00420 
00421     verifyOutput(*pOutputStream, mockParams.nRows/2, expectedResultGenerator);
00422 }
00423 
00424 void ExecStreamTestSuite::testReshapeExecStream(
00425     bool filter, bool cast, uint expectedNRows, int expectedStart,
00426     bool compareParam,
00427     std::hash_set<int64_t> const &outputParams)
00428 {
00429     assert(!compareParam || filter == compareParam);
00430     StandardTypeDescriptorFactory stdTypeFactory;
00431     TupleAttributeDescriptor nullAttrDesc(
00432         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64),
00433         true, sizeof(int64_t));
00434     TupleAttributeDescriptor notNullAttrDesc(
00435         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00436 
00437     // Input consists of 6 not nullable columns
00438     // - the first 4 columns will consist of sequential values, the 0th
00439     //   column starting at 0, the first at 1, 2nd at 2, 3rd at 3
00440     // - the 4th column will consist of sequential values starting at 0, each
00441     //   value repeating 25 times
00442     // - the 5th column will also consist of sequential values starting at 0,
00443     //   each value repeating 10 times
00444     MockProducerExecStreamParams mockParams;
00445     for (int i = 0; i < 6; i++) {
00446         mockParams.outputTupleDesc.push_back(notNullAttrDesc);
00447     }
00448     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00449     SharedInt64ColumnGenerator colGen;
00450     for (int i = 0; i < 4; i++) {
00451         colGen = SharedInt64ColumnGenerator(new SeqColumnGenerator(i));
00452         columnGenerators.push_back(colGen);
00453     }
00454     colGen = SharedInt64ColumnGenerator(new DupColumnGenerator(25, 0));
00455     columnGenerators.push_back(colGen);
00456     colGen = SharedInt64ColumnGenerator(new DupColumnGenerator(10, 0));
00457     columnGenerators.push_back(colGen);
00458     mockParams.nRows = 1000;
00459     mockParams.pGenerator.reset(
00460         new CompositeExecStreamGenerator(columnGenerators));
00461 
00462     ExecStreamEmbryo mockStreamEmbryo;
00463     mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
00464     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00465 
00466     // Setup stream parameters as follows:
00467     // 1. If filtering is specified, filter columns 4 and 5 against values 20
00468     //    and 50.  If filtering on dynamic parameters, filter column 5 against
00469     //    the dynamic parameter value 50.
00470     // 2. Project columns 3, 0, and 2; if casting is specified, project them
00471     //    into nullable columns; else not nullable
00472     // 3. If outputting dynamic parameters, append them to the end of the
00473     //    output tuple
00474     ReshapeExecStreamParams rsParams;
00475     boost::shared_array<FixedBuffer> pBuffer;
00476     std::vector<int64_t> paramVals;
00477     paramVals.push_back(10);
00478     paramVals.push_back(20);
00479     paramVals.push_back(50);
00480     if (!filter) {
00481         rsParams.compOp = COMP_NOOP;
00482     } else {
00483         rsParams.compOp = COMP_EQ;
00484         TupleDescriptor compareDesc;
00485         // comparison type needs to be nullable to allow filtering of nulls
00486         compareDesc.push_back(nullAttrDesc);
00487         if (!compareParam) {
00488             compareDesc.push_back(nullAttrDesc);
00489         }
00490         TupleData compareData;
00491         compareData.compute(compareDesc);
00492         if (compareParam) {
00493             compareData[0].pData = (PConstBuffer) &paramVals[1];
00494         } else {
00495             compareData[0].pData = (PConstBuffer) &paramVals[1];
00496             compareData[1].pData = (PConstBuffer) &paramVals[2];
00497         }
00498         TupleAccessor tupleAccessor;
00499         tupleAccessor.compute(compareDesc);
00500         pBuffer.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]);
00501         tupleAccessor.marshal(compareData, pBuffer.get());
00502     }
00503     rsParams.pCompTupleBuffer = pBuffer;
00504 
00505     TupleProjection tupleProj;
00506     tupleProj.push_back(4);
00507     tupleProj.push_back(5);
00508     rsParams.inputCompareProj = tupleProj;
00509 
00510     tupleProj.clear();
00511     tupleProj.push_back(3);
00512     tupleProj.push_back(0);
00513     tupleProj.push_back(2);
00514     rsParams.outputProj = tupleProj;
00515 
00516     for (int i = 0; i < 3; i++) {
00517         if (cast) {
00518             rsParams.outputTupleDesc.push_back(nullAttrDesc);
00519         } else {
00520             rsParams.outputTupleDesc.push_back(notNullAttrDesc);
00521         }
00522     }
00523 
00524     // Setup the dynamic parameters, marking the ones that will be
00525     // included in the output stream.  If comparing against a dynamic
00526     // parameter, compare the third dynamic parameter against column 5.
00527     std::vector<ReshapeParameter> dynamicParams;
00528     if (compareParam || outputParams.size() > 0) {
00529         for (uint i = 1; i < paramVals.size() + 1; i++) {
00530             SharedDynamicParamManager pDynamicParamManager =
00531             pGraph->getDynamicParamManager();
00532             pDynamicParamManager->createParam(
00533                 DynamicParamId(i),
00534                 notNullAttrDesc);
00535             TupleDatum paramValDatum;
00536             paramValDatum.pData = (PConstBuffer) &(paramVals[i - 1]);
00537             paramValDatum.cbData = sizeof(int64_t);
00538             pDynamicParamManager->writeParam(
00539                 DynamicParamId(i),
00540                 paramValDatum);
00541             dynamicParams.push_back(
00542                 ReshapeParameter(
00543                     DynamicParamId(i),
00544                     ((i == 3) && compareParam) ? uint(5) : MAXU,
00545                     (outputParams.find(i - 1) != outputParams.end())));
00546         }
00547     }
00548     rsParams.dynamicParameters = dynamicParams;
00549 
00550     // Setup the expected result
00551     columnGenerators.clear();
00552     colGen = SharedInt64ColumnGenerator(
00553         new SeqColumnGenerator(expectedStart + 3));
00554     columnGenerators.push_back(colGen);
00555     colGen = SharedInt64ColumnGenerator(
00556         new SeqColumnGenerator(expectedStart));
00557     columnGenerators.push_back(colGen);
00558     colGen = SharedInt64ColumnGenerator(
00559         new SeqColumnGenerator(expectedStart + 2));
00560     columnGenerators.push_back(colGen);
00561     for (uint i = 0; i < dynamicParams.size(); i++) {
00562         if (dynamicParams[i].outputParam) {
00563             colGen =
00564                 SharedInt64ColumnGenerator(
00565                     new ConstColumnGenerator(
00566                         paramVals[opaqueToInt(
00567                             dynamicParams[i].dynamicParamId) - 1]));
00568             columnGenerators.push_back(colGen);
00569             rsParams.outputTupleDesc.push_back(notNullAttrDesc);
00570         }
00571     }
00572 
00573     ExecStreamEmbryo rsStreamEmbryo;
00574     rsStreamEmbryo.init(new ReshapeExecStream(),rsParams);
00575     rsStreamEmbryo.getStream()->setName("ReshapeExecStream");
00576     SharedExecStream pOutputStream = prepareTransformGraph(
00577         mockStreamEmbryo, rsStreamEmbryo);
00578 
00579     CompositeExecStreamGenerator resultGenerator(columnGenerators);
00580     verifyOutput(*pOutputStream, expectedNRows, resultGenerator);
00581 }
00582 
00583 void ExecStreamTestSuite::testSingleValueAggExecStream()
00584 {
00585     StandardTypeDescriptorFactory stdTypeFactory;
00586     TupleAttributeDescriptor attrDesc(
00587         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00588     TupleAttributeDescriptor attrDescNullable(
00589         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), true,
00590         sizeof(int64_t));
00591 
00592     // Result should be a sequence of values in the first column
00593     // and 2 for the second column
00594     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGeneratorsIn;
00595 
00596     SharedInt64ColumnGenerator col =
00597         SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00598     columnGeneratorsIn.push_back(col);
00599 
00600     // Create two columns, both with two duplicates per column.
00601     MockProducerExecStreamParams mockParams;
00602     mockParams.outputTupleDesc.push_back(attrDesc);
00603     mockParams.nRows = 10;
00604     mockParams.pGenerator.reset(
00605         new CompositeExecStreamGenerator(columnGeneratorsIn));
00606 
00607     ExecStreamEmbryo mockStreamEmbryo;
00608     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00609     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00610 
00611     // simulate SELECT col, SINGLE_VALUE(col) FROM t10k GROUP BY col;
00612     SortedAggExecStreamParams aggParams;
00613     aggParams.groupByKeyCount = 1;
00614     aggParams.outputTupleDesc.push_back(attrDesc);
00615     aggParams.outputTupleDesc.push_back(attrDescNullable);
00616     AggInvocation singleValueInvocation;
00617     singleValueInvocation.aggFunction = AGG_FUNC_SINGLE_VALUE;
00618     singleValueInvocation.iInputAttr = 0;
00619     aggParams.aggInvocations.push_back(singleValueInvocation);
00620 
00621     ExecStreamEmbryo aggStreamEmbryo;
00622 
00623     aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00624     aggStreamEmbryo.getStream()->setName("SortedAggExecStream");
00625 
00626     SharedExecStream pOutputStream = prepareTransformGraph(
00627         mockStreamEmbryo,aggStreamEmbryo);
00628 
00629     // Result should be a sequence of values in both columns
00630     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGeneratorsOut;
00631 
00632     col =
00633         SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00634     columnGeneratorsOut.push_back(col);
00635 
00636     col =
00637         SharedInt64ColumnGenerator(new DupColumnGenerator(1));
00638     columnGeneratorsOut.push_back(col);
00639 
00640     CompositeExecStreamGenerator expectedResultGenerator(columnGeneratorsOut);
00641 
00642     verifyOutput(*pOutputStream, mockParams.nRows, expectedResultGenerator);
00643 }
00644 
00645 void ExecStreamTestSuite::testMergeImplicitPullInputs()
00646 {
00647     // This testcase exercises the case where production of rows from inputs
00648     // into the MergeExecStream needs to occur independent of explicit requests
00649     // made by the MergeExecStream.  The initial input is a single stream
00650     // that is split by a SplitterExecStream and then brought back together by
00651     // the MergeExecSteam in a different order from the original input.
00652 
00653     StandardTypeDescriptorFactory stdTypeFactory;
00654     TupleAttributeDescriptor attrDesc(
00655         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00656     TupleAttributeDescriptor nullAttrDesc(
00657         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64),
00658         true, sizeof(int64_t));
00659 
00660     // Initial input stream is a repeating sequence of
00661     // 0, 1, 2, ... nInputs - 1, 0, 1, 2, ..., nInputs - 1, 0, 1, 2, ...
00662     // Note that we're using a column generator here because there's already a
00663     // column generator class that produces the kind of input stream we need.
00664 
00665     MockProducerExecStreamParams mockParams;
00666     mockParams.outputTupleDesc.push_back(attrDesc);
00667     // needs to fill up at least 2 buffers per input into the merge
00668     uint nInputs = 5;
00669     uint nRows = nInputs * 4000;
00670     mockParams.nRows = nRows;
00671     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerator;
00672     columnGenerator.push_back(
00673         SharedInt64ColumnGenerator(
00674             new DupRepeatingSeqColumnGenerator(nInputs, 1)));
00675     mockParams.pGenerator.reset(
00676         new CompositeExecStreamGenerator(columnGenerator));
00677 
00678     ExecStreamEmbryo mockStreamEmbryo;
00679     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00680     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00681 
00682     // split the mock data stream with each value being redirected to
00683     // one of the merge inputs, based on its value, by using reshape streams
00684     // to filter the input values
00685 
00686     SplitterExecStreamParams splitterParams;
00687     ExecStreamEmbryo splitterStreamEmbryo;
00688     splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams);
00689     splitterStreamEmbryo.getStream()->setName("SplitterExecStream");
00690 
00691     vector<vector<ExecStreamEmbryo> > reshapeEmbryoStreamList;
00692     for (int i = 0; i < nInputs; i++) {
00693         ReshapeExecStreamParams rsParams;
00694         boost::shared_array<FixedBuffer> pBuffer;
00695         rsParams.compOp = COMP_EQ;
00696         int64_t key = i;
00697         TupleDescriptor compareDesc;
00698         // comparison type needs to be nullable to allow filtering of nulls
00699         compareDesc.push_back(nullAttrDesc);
00700         TupleData compareData;
00701         compareData.compute(compareDesc);
00702         compareData[0].pData = (PConstBuffer) &key;
00703         TupleAccessor tupleAccessor;
00704         tupleAccessor.compute(compareDesc);
00705         pBuffer.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]);
00706         tupleAccessor.marshal(compareData, pBuffer.get());
00707         rsParams.pCompTupleBuffer = pBuffer;
00708         TupleProjection tupleProj;
00709         tupleProj.push_back(0);
00710         rsParams.inputCompareProj = tupleProj;
00711         rsParams.outputProj = tupleProj;
00712 
00713         ExecStreamEmbryo rsStreamEmbryo;
00714         rsStreamEmbryo.init(new ReshapeExecStream(), rsParams);
00715         std::ostringstream oss;
00716         oss << "ReshapeExecStream" << "#" << i;
00717         rsStreamEmbryo.getStream()->setName(oss.str());
00718 
00719         vector<ExecStreamEmbryo> reshapeStreamEmbryo;
00720         reshapeStreamEmbryo.push_back(rsStreamEmbryo);
00721 
00722         // since the splitter needs to pass through all rows but the merge
00723         // needs to return each input in the order of the inputs, we need to
00724         // buffer all but the first input to prevent the splitter from blocking
00725         if (i != 0) {
00726             SegBufferExecStreamParams bufParams;
00727             bufParams.scratchAccessor.pSegment = pRandomSegment;
00728             bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
00729             bufParams.multipass = false;
00730 
00731             ExecStreamEmbryo bufStreamEmbryo;
00732             bufStreamEmbryo.init(new SegBufferExecStream(), bufParams);
00733             std::ostringstream oss;
00734             oss << "SegBufferExecStream" << "#" << i;
00735             bufStreamEmbryo.getStream()->setName(oss.str());
00736 
00737             reshapeStreamEmbryo.push_back(bufStreamEmbryo);
00738         }
00739         reshapeEmbryoStreamList.push_back(reshapeStreamEmbryo);
00740     }
00741 
00742     // merge the inputs with each input being read in sequence
00743     MergeExecStreamParams mergeParams;
00744     mergeParams.outputTupleDesc.push_back(attrDesc);
00745 
00746     ExecStreamEmbryo mergeStreamEmbryo;
00747     mergeStreamEmbryo.init(new MergeExecStream(), mergeParams);
00748     mergeStreamEmbryo.getStream()->setName("MergeExecStream");
00749 
00750     SharedExecStream pOutputStream =
00751         prepareDAG(
00752             mockStreamEmbryo,
00753             splitterStreamEmbryo,
00754             reshapeEmbryoStreamList,
00755             mergeStreamEmbryo);
00756 
00757     // setup the generator for the expected result -- 0's, followed by 1's,
00758     // followed by 2's, etc.
00759     StairCaseExecStreamGenerator expectedResultGenerator(1, nRows / nInputs);
00760 
00761     verifyOutput(*pOutputStream, nRows, expectedResultGenerator);
00762 }
00763 
00764 void ExecStreamTestSuite::testBTreeInsertExecStream(
00765     bool useDynamicBTree,
00766     uint nRows)
00767 {
00768     StandardTypeDescriptorFactory stdTypeFactory;
00769     TupleAttributeDescriptor attrDesc(
00770         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00771 
00772     // Use a MockProducer to generate the btree records with a single key
00773     // and single value for each record.  The key sequences from
00774     // 0 ... nRows-1 while the value sequences from nRows to nRows*2-1
00775     MockProducerExecStreamParams mockParams;
00776     mockParams.nRows = nRows;
00777     mockParams.outputTupleDesc.push_back(attrDesc);
00778     mockParams.outputTupleDesc.push_back(attrDesc);
00779     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00780     columnGenerators.push_back(
00781         SharedInt64ColumnGenerator(new SeqColumnGenerator(0)));
00782     columnGenerators.push_back(
00783         SharedInt64ColumnGenerator(new SeqColumnGenerator(nRows)));
00784     mockParams.pGenerator.reset(
00785         new CompositeExecStreamGenerator(columnGenerators));
00786 
00787     ExecStreamEmbryo mockStreamEmbryo;
00788     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00789     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00790 
00791     // Setup the BTreeInsert stream
00792     BTreeDescriptor descriptor;
00793     BTreeInsertExecStreamParams bTreeInsertParams;
00794     descriptor.tupleDescriptor.push_back(attrDesc);
00795     descriptor.tupleDescriptor.push_back(attrDesc);
00796     descriptor.keyProjection.push_back(0);
00797     descriptor.segmentAccessor.pSegment = pRandomSegment;
00798     descriptor.segmentAccessor.pCacheAccessor = pCacheAccessor;
00799     BTreeBuilder builder(descriptor, pRandomSegment);
00800     if (!useDynamicBTree) {
00801         builder.createEmptyRoot();
00802         descriptor.rootPageId = builder.getRootPageId();
00803         bTreeInsertParams.rootPageIdParamId = DynamicParamId(0);
00804     } else {
00805         descriptor.rootPageId = NULL_PAGE_ID;
00806         bTreeInsertParams.rootPageIdParamId = DynamicParamId(1);
00807     }
00808 
00809     bTreeInsertParams.scratchAccessor =
00810         pSegmentFactory->newScratchSegment(pCache, 10);
00811     bTreeInsertParams.pCacheAccessor = pCacheAccessor;
00812     bTreeInsertParams.distinctness = DUP_FAIL;
00813     bTreeInsertParams.monotonic = true;
00814     bTreeInsertParams.pSegment = pRandomSegment;
00815     bTreeInsertParams.pCacheAccessor = pCacheAccessor;
00816     bTreeInsertParams.rootPageId = descriptor.rootPageId;
00817     bTreeInsertParams.segmentId = descriptor.segmentId;
00818     bTreeInsertParams.pageOwnerId = descriptor.pageOwnerId;
00819     bTreeInsertParams.tupleDesc = descriptor.tupleDescriptor;
00820     bTreeInsertParams.keyProj = descriptor.keyProjection;
00821     bTreeInsertParams.pRootMap = 0;
00822     bTreeInsertParams.outputTupleDesc.push_back(attrDesc);
00823 
00824     ExecStreamEmbryo bTreeInsertEmbryo;
00825     bTreeInsertEmbryo.init(new BTreeInsertExecStream(), bTreeInsertParams);
00826     bTreeInsertEmbryo.getStream()->setName("BTreeInsertExecStream");
00827 
00828     SharedExecStream pOutputStream =
00829         prepareTransformGraph(mockStreamEmbryo, bTreeInsertEmbryo);
00830 
00831     ConstExecStreamGenerator expectedResultGenerator(0);
00832     verifyOutput(*pOutputStream, 0, expectedResultGenerator);
00833 
00834     // Get the rootPageId created by the stream so we can read from the tree
00835     if (useDynamicBTree) {
00836         descriptor.rootPageId =
00837             *reinterpret_cast<PageId const *>(
00838                 pGraph->getDynamicParamManager()->getParam(
00839                     DynamicParamId(1)).getDatum().pData);
00840     }
00841 
00842     // Now that we've loaded the btree, verify the contents by directly
00843     // reading from the btree using a BTreeReader
00844     BTreeReader reader(descriptor);
00845     bool found = reader.searchFirst();
00846     if (!found) {
00847         BOOST_FAIL("searchFirst found nothing");
00848     }
00849     TupleData tupleData;
00850     tupleData.compute(descriptor.tupleDescriptor);
00851     for (uint i = 0; i < nRows; i++) {
00852         if (!found) {
00853             BOOST_FAIL("Could not searchNext for key #" << i);
00854         }
00855         reader.getTupleAccessorForRead().unmarshal(tupleData);
00856         uint64_t key = *reinterpret_cast<uint64_t const *>(tupleData[0].pData);
00857         uint64_t val = *reinterpret_cast<uint64_t const *>(tupleData[1].pData);
00858         BOOST_CHECK_EQUAL(key, i);
00859         BOOST_CHECK_EQUAL(val, i + nRows);
00860         found = reader.searchNext();
00861     }
00862     if (!reader.isSingular()) {
00863         BOOST_FAIL("Should have reached end of tree");
00864     }
00865     reader.endSearch();
00866 }
00867 
00868 void ExecStreamTestSuite::testNestedLoopJoinExecStream(
00869     uint nRowsLeft,
00870     uint nRowsRight)
00871 {
00872     // simulate SELECT t1.a, t2.a FROM t1, t2 WHERE t1.a = t2.a
00873 
00874     StandardTypeDescriptorFactory stdTypeFactory;
00875     TupleAttributeDescriptor attrDesc(
00876         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00877 
00878     // 1st input is a mock producer that generates values 0 ... nRowsLeft - 1
00879     MockProducerExecStreamParams paramsMockOuter;
00880     paramsMockOuter.outputTupleDesc.push_back(attrDesc);
00881     paramsMockOuter.nRows = nRowsLeft;
00882     paramsMockOuter.pGenerator.reset(new RampExecStreamGenerator(0, 1));
00883 
00884     ExecStreamEmbryo outerStreamEmbryo;
00885     outerStreamEmbryo.init(new MockProducerExecStream(), paramsMockOuter);
00886     outerStreamEmbryo.getStream()->setName("OuterProducerExecStream");
00887 
00888     // 2nd input consists of a mock producer that generates values
00889     // 0 ... nRowsRight - 1, but those values are then filtered by a
00890     // ReshapeExecStream that only outputs rows matching the current left hand
00891     // side row.  The current left hand side row is passed to the right via
00892     // a dynamic parameter
00893     MockProducerExecStreamParams paramsMockInner;
00894     paramsMockInner.outputTupleDesc.push_back(attrDesc);
00895     paramsMockInner.nRows = nRowsRight;
00896     paramsMockInner.pGenerator.reset(new RampExecStreamGenerator(0, 1));
00897 
00898     ExecStreamEmbryo innerStreamEmbryo;
00899     innerStreamEmbryo.init(new MockProducerExecStream(), paramsMockInner);
00900     innerStreamEmbryo.getStream()->setName("InnerProducerExecStream");
00901 
00902     ReshapeExecStreamParams paramsReshape;
00903     paramsReshape.compOp = COMP_EQ;
00904     paramsReshape.outputProj.push_back(0);
00905     paramsReshape.inputCompareProj.push_back(0);
00906     paramsReshape.dynamicParameters.push_back(
00907         ReshapeParameter(DynamicParamId(1), 0, false));
00908     paramsReshape.outputTupleDesc.push_back(attrDesc);
00909 
00910     ExecStreamEmbryo reshapeStreamEmbryo;
00911     reshapeStreamEmbryo.init(new ReshapeExecStream(), paramsReshape);
00912     reshapeStreamEmbryo.getStream()->setName("ReshapeExecStream");
00913 
00914     // For the 3rd input, just create a an empty values stream.  This doesn't
00915     // do anything, but is just there to make sure the stream handles reading
00916     // from it.
00917     ValuesExecStreamParams paramsValues;
00918     paramsValues.bufSize = 0;
00919     paramsValues.outputTupleDesc.push_back(attrDesc);
00920     ExecStreamEmbryo valuesStreamEmbryo;
00921     valuesStreamEmbryo.init(new ValuesExecStream(), paramsValues);
00922     valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00923 
00924     // String together the inputs
00925     std::vector<std::vector<ExecStreamEmbryo> > sourceStreamEmbryosList;
00926     std::vector<ExecStreamEmbryo> sourceStreamEmbryos;
00927     sourceStreamEmbryos.push_back(outerStreamEmbryo);
00928     sourceStreamEmbryosList.push_back(sourceStreamEmbryos);
00929 
00930     sourceStreamEmbryos.clear();
00931     sourceStreamEmbryos.push_back(innerStreamEmbryo);
00932     sourceStreamEmbryos.push_back(reshapeStreamEmbryo);
00933     sourceStreamEmbryosList.push_back(sourceStreamEmbryos);
00934 
00935     sourceStreamEmbryos.clear();
00936     sourceStreamEmbryos.push_back(valuesStreamEmbryo);
00937     sourceStreamEmbryosList.push_back(sourceStreamEmbryos);
00938 
00939     NestedLoopJoinExecStreamParams paramsJoin;
00940     paramsJoin.leftOuter = false;
00941     paramsJoin.leftJoinKeys.push_back(
00942         NestedLoopJoinKey(DynamicParamId(1), 0));
00943 
00944     ExecStreamEmbryo joinStreamEmbryo;
00945     joinStreamEmbryo.init(new NestedLoopJoinExecStream(), paramsJoin);
00946     joinStreamEmbryo.getStream()->setName("NestedLoopJoinExecStream");
00947 
00948     SharedExecStream pOutputStream =
00949         prepareConfluenceGraph(sourceStreamEmbryosList, joinStreamEmbryo);
00950 
00951     vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00952     SharedInt64ColumnGenerator colGen =
00953         SharedInt64ColumnGenerator(new SeqColumnGenerator(0));
00954     columnGenerators.push_back(colGen);
00955     colGen = SharedInt64ColumnGenerator(new SeqColumnGenerator(0));
00956     columnGenerators.push_back(colGen);
00957 
00958     CompositeExecStreamGenerator resultGenerator(columnGenerators);
00959     verifyOutput(
00960         *pOutputStream, std::min(nRowsLeft, nRowsRight), resultGenerator);
00961 }
00962 
00963 void ExecStreamTestSuite::testSplitterPlusBarrier()
00964 {
00965     StandardTypeDescriptorFactory stdTypeFactory;
00966     TupleAttributeDescriptor attrDesc(
00967         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00968     MockProducerExecStreamParams mockParams;
00969     mockParams.outputTupleDesc.push_back(attrDesc);
00970     uint nRows = 10000;
00971     mockParams.nRows = nRows;
00972     ExecStreamEmbryo mockStreamEmbryo;
00973     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00974     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00975 
00976     SplitterExecStreamParams splitterParams;
00977     ExecStreamEmbryo splitterStreamEmbryo;
00978     splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams);
00979     splitterStreamEmbryo.getStream()->setName("SplitterExecStream");
00980 
00981     vector<vector<ExecStreamEmbryo> > aggEmbryoStreamList;
00982     for (int i = 0; i < 10; i++) {
00983         SortedAggExecStreamParams aggParams;
00984         aggParams.groupByKeyCount = 0;
00985         aggParams.outputTupleDesc.push_back(attrDesc);
00986         AggInvocation countInvocation;
00987         countInvocation.aggFunction = AGG_FUNC_COUNT;
00988         countInvocation.iInputAttr = -1;
00989         aggParams.aggInvocations.push_back(countInvocation);
00990 
00991         ExecStreamEmbryo aggStreamEmbryo;
00992         aggStreamEmbryo.init(new SortedAggExecStream(),aggParams);
00993         std::ostringstream oss;
00994         oss << "AggExecStream" << "#" << i;
00995         aggStreamEmbryo.getStream()->setName(oss.str());
00996         vector<ExecStreamEmbryo> v;
00997         v.push_back(aggStreamEmbryo);
00998         aggEmbryoStreamList.push_back(v);
00999     }
01000 
01001     BarrierExecStreamParams barrierParams;
01002     barrierParams.outputTupleDesc.push_back(attrDesc);
01003     barrierParams.returnMode = BARRIER_RET_ANY_INPUT;
01004     ExecStreamEmbryo barrierStreamEmbryo;
01005     barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams);
01006     barrierStreamEmbryo.getStream()->setName("BarrierExecStream");
01007 
01008     SharedExecStream pOutputStream =
01009         prepareDAG(
01010             mockStreamEmbryo,
01011             splitterStreamEmbryo,
01012             aggEmbryoStreamList,
01013             barrierStreamEmbryo);
01014 
01015     ConstExecStreamGenerator expectedResultGenerator(nRows);
01016     verifyOutput(*pOutputStream, 1, expectedResultGenerator);
01017 }
01018 
01019 void ExecStreamTestSuite::testSegBufferReaderWriterExecStream(
01020     bool restartable,
01021     bool earlyClose)
01022 {
01023     // This testcase exercises the SegBufferReaderExecStream and
01024     // SegBufferWriterExecStream classes using either a MergeExecStream
01025     // (inputs read serially) or a CartesianJoinExecStream (restartable
01026     // input), depending on the "restartable" parameter.
01027     //
01028     // The stream graph consists of a MergeExecStream or a
01029     // CartesianJoinExecStream with two inputs that are
01030     // SegBufferReaderExecStreams.  A SegBufferWriterExecStream buffers a
01031     // mock producer so it can be read by SegBufferReaderExecStreams.
01032     //
01033     // If the earlyClose parameter is set, then the
01034     // SegBufferReaderExecStreams output their data to a SegBufferExecStream
01035     // which will execute an early close once it has read all of its input.
01036 
01037     StandardTypeDescriptorFactory stdTypeFactory;
01038     TupleAttributeDescriptor attrDesc(
01039         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
01040 
01041     MockProducerExecStreamParams mockParams;
01042     mockParams.outputTupleDesc.push_back(attrDesc);
01043     uint nRows;
01044     if (!restartable) {
01045         nRows = 10000;
01046     } else {
01047         nRows = 200;
01048     }
01049     mockParams.nRows = nRows;
01050 
01051     ExecStreamEmbryo mockStreamEmbryo;
01052     mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
01053     mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
01054 
01055     SegBufferWriterExecStreamParams writerParams;
01056     writerParams.scratchAccessor.pSegment = pRandomSegment;
01057     writerParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01058     writerParams.readerRefCountParamId = DynamicParamId(1);
01059     writerParams.outputTupleDesc.push_back(attrDesc);
01060 
01061     ExecStreamEmbryo writerStreamEmbryo;
01062     writerStreamEmbryo.init(
01063         new SegBufferWriterExecStream(),
01064         writerParams);
01065     writerStreamEmbryo.getStream()->setName("SegBufferWriterExecStream");
01066 
01067     SegBufferReaderExecStreamParams readerParams;
01068     readerParams.scratchAccessor.pSegment = pRandomSegment;
01069     readerParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01070     readerParams.readerRefCountParamId = DynamicParamId(1);
01071     readerParams.outputTupleDesc.push_back(attrDesc);
01072 
01073     ExecStreamEmbryo readerStreamEmbryo1;
01074     readerStreamEmbryo1.init(
01075         new SegBufferReaderExecStream(),
01076         readerParams);
01077     readerStreamEmbryo1.getStream()->setName("SegBufferReaderExecStream1");
01078 
01079     ExecStreamEmbryo readerStreamEmbryo2;
01080     readerStreamEmbryo2.init(
01081         new SegBufferReaderExecStream(),
01082         readerParams);
01083     readerStreamEmbryo2.getStream()->setName("SegBufferReaderExecStream2");
01084 
01085     std::vector<std::vector<ExecStreamEmbryo> > interStreamEmbryosList;
01086     std::vector<ExecStreamEmbryo> readerInput;
01087     readerInput.push_back(readerStreamEmbryo1);
01088     if (earlyClose) {
01089         SegBufferExecStreamParams bufParams;
01090         bufParams.scratchAccessor.pSegment = pRandomSegment;
01091         bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01092         bufParams.multipass = false;
01093 
01094         ExecStreamEmbryo bufStreamEmbryo;
01095         bufStreamEmbryo.init(new SegBufferExecStream(),bufParams);
01096         bufStreamEmbryo.getStream()->setName("SegBufferExecStream1");
01097         readerInput.push_back(bufStreamEmbryo);
01098     }
01099     interStreamEmbryosList.push_back(readerInput);
01100 
01101     readerInput.clear();
01102     readerInput.push_back(readerStreamEmbryo2);
01103     if (earlyClose) {
01104         SegBufferExecStreamParams bufParams;
01105         bufParams.scratchAccessor.pSegment = pRandomSegment;
01106         bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor;
01107         if (restartable) {
01108             bufParams.multipass = true;
01109         }
01110 
01111         ExecStreamEmbryo bufStreamEmbryo;
01112         bufStreamEmbryo.init(new SegBufferExecStream(),bufParams);
01113         bufStreamEmbryo.getStream()->setName("SegBufferExecStream2");
01114         readerInput.push_back(bufStreamEmbryo);
01115     }
01116     interStreamEmbryosList.push_back(readerInput);
01117 
01118     MergeExecStreamParams mergeParams;
01119     CartesianJoinExecStreamParams joinParams;
01120     if (restartable) {
01121         joinParams.leftOuter = false;
01122     } else {
01123         mergeParams.outputTupleDesc.push_back(attrDesc);
01124         if (getDegreeOfParallelism() != 1) {
01125             mergeParams.isParallel = true;
01126         }
01127     }
01128 
01129     ExecStreamEmbryo execStreamEmbryo;
01130     if (restartable) {
01131         execStreamEmbryo.init(new CartesianJoinExecStream(), joinParams);
01132         execStreamEmbryo.getStream()->setName("CartesianJoinExecStream");
01133     } else {
01134         execStreamEmbryo.init(new MergeExecStream(), mergeParams);
01135         execStreamEmbryo.getStream()->setName("MergeExecStream");
01136     }
01137 
01138     SharedExecStream pOutputStream =
01139         prepareDAG(
01140             mockStreamEmbryo,
01141             writerStreamEmbryo,
01142             interStreamEmbryosList,
01143             execStreamEmbryo);
01144 
01145     int64_t zero = 0;
01146     TupleDescriptor expectedDesc;
01147     expectedDesc.push_back(attrDesc);
01148     if (restartable) {
01149         expectedDesc.push_back(attrDesc);
01150     }
01151     TupleData expectedTuple;
01152     expectedTuple.compute(expectedDesc);
01153     expectedTuple[0].pData = reinterpret_cast<PBuffer>(&zero);
01154     if (restartable) {
01155         expectedTuple[1].pData = reinterpret_cast<PBuffer>(&zero);
01156     }
01157 
01158     verifyConstantOutput(
01159         *pOutputStream,
01160         expectedTuple,
01161         restartable ? nRows * nRows : 2 * nRows);
01162 }
01163 
01164 // End ExecStreamTestSuite.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1