00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/common/FemEnums.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00026 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00027 #include "fennel/btree/BTreeBuilder.h"
00028 #include "fennel/ftrs/BTreeInsertExecStream.h"
00029 #include "fennel/ftrs/BTreeSearchExecStream.h"
00030 #include "fennel/ftrs/BTreeExecStream.h"
00031 #include "fennel/tuple/StandardTypeDescriptor.h"
00032 #include "fennel/tuple/TupleDescriptor.h"
00033 #include "fennel/exec/MockProducerExecStream.h"
00034 #include "fennel/exec/ValuesExecStream.h"
00035 #include "fennel/exec/ExecStreamEmbryo.h"
00036 #include "fennel/exec/SplitterExecStream.h"
00037 #include "fennel/exec/BarrierExecStream.h"
00038 #include "fennel/exec/DynamicParam.h"
00039 #include "fennel/cache/Cache.h"
00040 #include <stdarg.h>
00041
00042 #include <boost/test/test_tools.hpp>
00043
00044 using namespace fennel;
00045
00049 class LcsMultiClusterAppendTest : public ExecStreamUnitTestBase
00050 {
00051 protected:
00052 StandardTypeDescriptorFactory stdTypeFactory;
00053 TupleAttributeDescriptor attrDesc_int64;
00054 TupleAttributeDescriptor attrDesc_bitmap;
00055
00056 vector<boost::shared_ptr<BTreeDescriptor> > bTreeClusters;
00057
00069 void loadClusters(uint nRows, uint nCols, uint nClusters);
00070
00076 void scanCols(
00077 uint nRows, uint nCols, uint nClusters, TupleProjection proj);
00078
00079 public:
00080 explicit LcsMultiClusterAppendTest()
00081 {
00082 FENNEL_UNIT_TEST_CASE(LcsMultiClusterAppendTest, testLoad);
00083 }
00084
00085 void testCaseSetUp();
00086 void testCaseTearDown();
00087
00088 void testLoad();
00089 };
00090
00091 void LcsMultiClusterAppendTest::testLoad()
00092 {
00093
00094
00095
00096 uint nRows = 100000;
00097 uint nCols = 5;
00098 uint nClusters = 7;
00099 TupleProjection proj;
00100
00101 loadClusters(nRows, nCols, nClusters);
00102 resetExecStreamTest();
00103
00104
00105 for (uint i = 0; i < nClusters * nCols; i++) {
00106 proj.push_back(i);
00107 }
00108 scanCols(nRows, nCols, nClusters, proj);
00109 }
00110
00111 void LcsMultiClusterAppendTest::loadClusters(
00112 uint nRows,
00113 uint nCols,
00114 uint nClusters)
00115 {
00116
00117
00118 MockProducerExecStreamParams mockParams;
00119 for (uint i = 0; i < nCols * nClusters; i++) {
00120 mockParams.outputTupleDesc.push_back(attrDesc_int64);
00121 }
00122 mockParams.nRows = nRows;
00123
00124 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00125 for (uint i = 0; i < nCols * nClusters; i++) {
00126 SharedInt64ColumnGenerator col =
00127 SharedInt64ColumnGenerator(new DupColumnGenerator(i + 1));
00128 columnGenerators.push_back(col);
00129 }
00130 mockParams.pGenerator.reset(
00131 new CompositeExecStreamGenerator(columnGenerators));
00132
00133 ExecStreamEmbryo mockStreamEmbryo;
00134 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00135 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00136
00137
00138
00139 SplitterExecStreamParams splitterParams;
00140 ExecStreamEmbryo splitterStreamEmbryo;
00141
00142 splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams);
00143 splitterStreamEmbryo.getStream()->setName("SplitterExecStream");
00144
00145
00146
00147 vector<ExecStreamEmbryo> lcsAppendEmbryos;
00148 for (uint i = 0; i < nClusters; i++) {
00149 LcsClusterAppendExecStreamParams lcsAppendParams;
00150 lcsAppendParams.scratchAccessor =
00151 pSegmentFactory->newScratchSegment(pCache, 10);
00152 lcsAppendParams.pCacheAccessor = pCache;
00153 lcsAppendParams.pSegment = pRandomSegment;
00154
00155
00156
00157 (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00158 (lcsAppendParams.tupleDesc).push_back(attrDesc_int64);
00159
00160
00161 (lcsAppendParams.keyProj).push_back(0);
00162
00163
00164 lcsAppendParams.outputTupleDesc.push_back(attrDesc_int64);
00165
00166 for (uint j = 0; j < nCols; j++) {
00167 lcsAppendParams.inputProj.push_back(i * nCols + j);
00168 }
00169 lcsAppendParams.pRootMap = 0;
00170 lcsAppendParams.rootPageIdParamId = DynamicParamId(0);
00171
00172
00173
00174
00175 boost::shared_ptr<BTreeDescriptor> pBTreeDesc =
00176 boost::shared_ptr<BTreeDescriptor> (new BTreeDescriptor());
00177 bTreeClusters.push_back(pBTreeDesc);
00178 pBTreeDesc->segmentAccessor.pSegment = lcsAppendParams.pSegment;
00179 pBTreeDesc->segmentAccessor.pCacheAccessor = pCache;
00180 pBTreeDesc->tupleDescriptor = lcsAppendParams.tupleDesc;
00181 pBTreeDesc->keyProjection = lcsAppendParams.keyProj;
00182 pBTreeDesc->rootPageId = NULL_PAGE_ID;
00183 lcsAppendParams.pageOwnerId = pBTreeDesc->pageOwnerId;
00184 lcsAppendParams.segmentId = pBTreeDesc->segmentId;
00185
00186 BTreeBuilder builder(*pBTreeDesc, pRandomSegment);
00187 builder.createEmptyRoot();
00188 lcsAppendParams.rootPageId = pBTreeDesc->rootPageId =
00189 builder.getRootPageId();
00190
00191
00192
00193 LcsClusterAppendExecStream *lcsStream =
00194 new LcsClusterAppendExecStream();
00195
00196 ExecStreamEmbryo lcsAppendStreamEmbryo;
00197 lcsAppendStreamEmbryo.init(lcsStream, lcsAppendParams);
00198 std::ostringstream oss;
00199 oss << "LcsClusterAppendExecStream" << "#" << i;
00200 lcsAppendStreamEmbryo.getStream()-> setName(oss.str());
00201 lcsAppendEmbryos.push_back(lcsAppendStreamEmbryo);
00202 }
00203
00204
00205
00206 BarrierExecStreamParams barrierParams;
00207 barrierParams.outputTupleDesc.push_back(attrDesc_int64);
00208 barrierParams.returnMode = BARRIER_RET_ANY_INPUT;
00209
00210 ExecStreamEmbryo barrierStreamEmbryo;
00211 barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams);
00212 barrierStreamEmbryo.getStream()->setName("BarrierExecStream");
00213
00214
00215 SharedExecStream pOutputStream = prepareDAG(
00216 mockStreamEmbryo, splitterStreamEmbryo, lcsAppendEmbryos,
00217 barrierStreamEmbryo);
00218
00219
00220 RampExecStreamGenerator expectedResultGenerator(mockParams.nRows);
00221
00222 verifyOutput(*pOutputStream, 1, expectedResultGenerator);
00223 }
00224
00225 void LcsMultiClusterAppendTest::scanCols(
00226 uint nRows,
00227 uint nCols,
00228 uint nClusters,
00229 TupleProjection proj)
00230 {
00231
00232
00233
00234 LcsRowScanExecStreamParams scanParams;
00235 scanParams.hasExtraFilter = false;
00236 scanParams.isFullScan = true;
00237 scanParams.samplingMode = SAMPLING_OFF;
00238 for (uint i = 0; i < nClusters; i++) {
00239 struct LcsClusterScanDef clusterScanDef;
00240
00241 for (uint j = 0; j < nCols; j++) {
00242 clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64);
00243 }
00244
00245 clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment;
00246 clusterScanDef.pCacheAccessor =
00247 bTreeClusters[i]->segmentAccessor.pCacheAccessor;
00248 clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor;
00249 clusterScanDef.keyProj = bTreeClusters[i]->keyProjection;
00250 clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId;
00251 clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId;
00252 clusterScanDef.segmentId = bTreeClusters[i]->segmentId;
00253
00254 scanParams.lcsClusterScanDefs.push_back(clusterScanDef);
00255 }
00256
00257
00258
00259 scanParams.outputProj = proj;
00260 for (uint i = 0; i < proj.size(); i++) {
00261 scanParams.outputTupleDesc.push_back(attrDesc_int64);
00262 }
00263
00264
00265
00266
00267 ValuesExecStreamParams valuesParams;
00268 ExecStreamEmbryo valuesStreamEmbryo;
00269 boost::shared_array<FixedBuffer> pBuffer;
00270
00271 valuesParams.outputTupleDesc.push_back(attrDesc_int64);
00272 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00273 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap);
00274
00275 uint bufferSize = 16;
00276 pBuffer.reset(new FixedBuffer[bufferSize]);
00277 valuesParams.pTupleBuffer = pBuffer;
00278 valuesParams.bufSize = 0;
00279 valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams);
00280 valuesStreamEmbryo.getStream()->setName("ValuesExecStream");
00281
00282 ExecStreamEmbryo scanStreamEmbryo;
00283
00284 scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams);
00285 scanStreamEmbryo.getStream()->setName("RowScanExecStream");
00286
00287 SharedExecStream pOutputStream =
00288 prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo);
00289
00290
00291
00292 vector<boost::shared_ptr<ColumnGenerator<int64_t> > > columnGenerators;
00293 for (uint i = 0; i < proj.size(); i++) {
00294 SharedInt64ColumnGenerator col =
00295 SharedInt64ColumnGenerator(new DupColumnGenerator(proj[i] + 1));
00296 columnGenerators.push_back(col);
00297 }
00298
00299 CompositeExecStreamGenerator resultGenerator(columnGenerators);
00300 verifyOutput(*pOutputStream, nRows, resultGenerator);
00301 }
00302
00303 void LcsMultiClusterAppendTest::testCaseSetUp()
00304 {
00305 ExecStreamUnitTestBase::testCaseSetUp();
00306
00307 attrDesc_int64 = TupleAttributeDescriptor(
00308 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00309 attrDesc_bitmap = TupleAttributeDescriptor(
00310 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
00311 true, pRandomSegment->getUsablePageSize() / 8);
00312 }
00313
00314 void LcsMultiClusterAppendTest::testCaseTearDown()
00315 {
00316 for (uint i = 0; i < bTreeClusters.size(); i++) {
00317 bTreeClusters[i]->segmentAccessor.reset();
00318 }
00319 ExecStreamUnitTestBase::testCaseTearDown();
00320 }
00321
00322 FENNEL_UNIT_TEST_SUITE(LcsMultiClusterAppendTest);
00323
00324
00325