00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/test/SegStorageTestBase.h"
00025 #include "fennel/hashexe/LhxHashTable.h"
00026 #include "fennel/hashexe/LhxHashTableDump.h"
00027 #include "fennel/hashexe/LhxPartition.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029 #include "fennel/tuple/TupleDescriptor.h"
00030 #include "fennel/tuple/TuplePrinter.h"
00031 #include "fennel/cache/Cache.h"
00032
00033 #include <boost/scoped_array.hpp>
00034 #include <boost/test/test_tools.hpp>
00035
00036 using namespace fennel;
00037
00041 class LhxHashTableTest : virtual public SegStorageTestBase
00042 {
00043 StandardTypeDescriptorFactory stdTypeFactory;
00044 LhxHashInfo hashInfo;
00045 uint buildInputIndex;
00046
00047 uint writeHashTable(
00048 LhxHashInfo const &hashInfo,
00049 LhxHashTable &hashTable,
00050 SharedLhxPartition destPartition);
00051
00052 uint readPartition(
00053 LhxHashInfo &hashInfo,
00054 SharedLhxPartition srcPartition,
00055 ostringstream &dataTrace);
00056
00057 void testInsert(
00058 uint numRows,
00059 uint maxBlockCount,
00060 uint partitionLevel,
00061 VectorOfUint &repeatSeqValues,
00062 uint numKeyCols,
00063 uint numAggs,
00064 uint numDataCols,
00065 bool dumpHashTable,
00066 bool writeToPartition,
00067 uint recursivePartitioning,
00068 string testName);
00069
00070 public:
00071 explicit LhxHashTableTest()
00072 {
00073 FENNEL_UNIT_TEST_CASE(LhxHashTableTest, testInsert1Ka);
00074 FENNEL_UNIT_TEST_CASE(LhxHashTableTest, testInsert1Kb);
00075 }
00076
00077 virtual void testCaseSetUp();
00078 virtual void testCaseTearDown();
00079
00080 void testInsert1Ka();
00081 void testInsert1Kb();
00082 };
00083
00084 void LhxHashTableTest::testCaseSetUp()
00085 {
00086 openStorage(DeviceMode::createNew);
00087 openRandomSegment();
00088 hashInfo.externalSegmentAccessor.pSegment = pRandomSegment;
00089 hashInfo.externalSegmentAccessor.pCacheAccessor = pCache;
00090 hashInfo.memSegmentAccessor =
00091 pSegmentFactory->newScratchSegment(pCache, 100);
00092 }
00093
00094 void LhxHashTableTest::testCaseTearDown()
00095 {
00096 hashInfo.inputDesc.clear();
00097 hashInfo.keyProj.clear();
00098 hashInfo.isKeyColVarChar.clear();
00099
00100 hashInfo.aggsProj.clear();
00101 hashInfo.dataProj.clear();
00102
00103 hashInfo.memSegmentAccessor.pSegment->deallocatePageRange(
00104 NULL_PAGE_ID,
00105 NULL_PAGE_ID);
00106 hashInfo.externalSegmentAccessor.reset();
00107 hashInfo.memSegmentAccessor.reset();
00108 SegStorageTestBase::testCaseTearDown();
00109 }
00110
00111 uint LhxHashTableTest::writeHashTable(
00112 LhxHashInfo const &hashInfo,
00113 LhxHashTable &hashTable,
00114 SharedLhxPartition destPartition)
00115 {
00116 uint tuplesWritten = 0;
00117 LhxPartitionWriter writer;
00118
00119 LhxHashTableReader hashTableReader;
00120 hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00121 hashTableReader.bindKey(NULL);
00122 TupleData outputTuple;
00123
00124 outputTuple.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00125
00126
00127 writer.open(destPartition, (LhxHashInfo const &)hashInfo);
00128 while (hashTableReader.getNext(outputTuple)) {
00129 writer.marshalTuple(outputTuple);
00130 tuplesWritten ++;
00131 }
00132 writer.close();
00133
00134 return tuplesWritten;
00135 }
00136
00137 uint LhxHashTableTest::readPartition(
00138 LhxHashInfo &hashInfo,
00139 SharedLhxPartition srcPartition,
00140 ostringstream &dataTrace)
00141 {
00142 LhxPartitionReader reader;
00143 uint tuplesRead = 0;
00144 TupleData outputTuple;
00145 TuplePrinter tuplePrinter;
00146 TupleDescriptor &inputTupleDesc = hashInfo.inputDesc[1];
00147
00148 outputTuple.compute(hashInfo.inputDesc[srcPartition->inputIndex]);
00149
00150 reader.open(srcPartition, (LhxHashInfo const &)hashInfo);
00151
00152 for (;;) {
00153 if (!reader.isTupleConsumptionPending()) {
00154 if (reader.getState() == EXECBUF_EOS) {
00155 break;
00156 }
00157 if (!reader.demandData()) {
00158 break;
00159 }
00160 reader.unmarshalTuple(outputTuple);
00161
00162 tuplePrinter.print(dataTrace, inputTupleDesc, outputTuple);
00163 dataTrace << "\n";
00164
00165 tuplesRead ++;
00166 }
00167
00168 reader.consumeTuple();
00169 }
00170 reader.close();
00171 return tuplesRead;
00172 }
00173
00174 void LhxHashTableTest::testInsert(
00175 uint numRows,
00176 uint maxBlockCount,
00177 uint partitionLevel,
00178 VectorOfUint &repeatSeqValues,
00179 uint numKeyCols,
00180 uint numAggs,
00181 uint numDataCols,
00182 bool dumpHashTable,
00183 bool writeToPartition,
00184 uint recursivePartitioning,
00185 string testName)
00186 {
00187 LhxHashTable hashTable;
00188
00189 hashInfo.numCachePages = maxBlockCount;
00190
00191 TupleAttributeDescriptor attrDesc_int32 =
00192 TupleAttributeDescriptor(
00193 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32));
00194 TupleData inputTuple;
00195
00196 uint i, j;
00197
00198
00199
00200
00201 uint numCols = numKeyCols + numAggs + numDataCols;
00202
00203 boost::scoped_array<uint> colValues(new uint[numCols]);
00204
00205 TupleDescriptor inputDesc;
00206 TupleProjection keyProj;
00207 TupleProjection dataProj;
00208 std::vector<LhxHashTrim> isKeyVarChar;
00209
00210 for (i = 0; i < numCols; i ++) {
00211 inputDesc.push_back(attrDesc_int32);
00212
00213 if (i < numKeyCols) {
00214 keyProj.push_back(i);
00215 isKeyVarChar.push_back(HASH_TRIM_NONE);
00216 } else if (i < numKeyCols + numAggs) {
00217 hashInfo.aggsProj.push_back(i);
00218 } else {
00219 dataProj.push_back(i);
00220 }
00221 }
00222
00223
00224
00225
00226
00227 uint cndKeys = 1;
00228 for (i = 0; i < numKeyCols; i ++) {
00229 cndKeys *= repeatSeqValues[i];
00230 }
00231
00232 uint numInputs = 2;
00233 buildInputIndex = numInputs - 1;
00234 for (i = 0; i < numInputs; i ++) {
00235 hashInfo.inputDesc.push_back(inputDesc);
00236 hashInfo.keyProj.push_back(keyProj);
00237 hashInfo.isKeyColVarChar.push_back(isKeyVarChar);
00238 hashInfo.dataProj.push_back(dataProj);
00239 hashInfo.useJoinFilter.push_back(false);
00240 hashInfo.filterNull.push_back(false);
00241
00242 TupleProjection filterNullKeyProj;
00243 hashInfo.filterNullKeyProj.push_back(filterNullKeyProj);
00244 hashInfo.removeDuplicate.push_back(false);
00245 hashInfo.numRows.push_back(numRows);
00246 hashInfo.cndKeys.push_back(cndKeys);
00247 }
00248
00249 TupleDescriptor &inputTupleDesc = hashInfo.inputDesc.back();
00250 TupleProjection &keyColsProj = hashInfo.keyProj.back();
00251
00252 hashTable.init(partitionLevel, hashInfo, buildInputIndex);
00253
00254 uint usablePageSize =
00255 (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize();
00256
00257 hashTable.calculateNumSlots(
00258 cndKeys, usablePageSize, hashInfo.numCachePages);
00259
00260 bool status = hashTable.allocateResources();
00261
00262 assert(status);
00263
00264 inputTuple.compute(inputTupleDesc);
00265
00266
00267
00268
00269 for (i = 0; i < numRows; i ++) {
00270 for (j = 0; j < numCols; j++) {
00271 colValues[j] = i % repeatSeqValues[j];
00272 inputTuple[j].pData = (PBuffer)&(colValues[j]);
00273 }
00274
00275 status =
00276 hashTable.addTuple(inputTuple);
00277
00278 assert(status);
00279 }
00280
00281 LhxHashTableReader hashTableReader;
00282 hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00283 TupleData outputTuple;
00284
00285 outputTuple.compute(inputTupleDesc);
00286
00287 TuplePrinter tuplePrinter;
00288 ostringstream dataTrace;
00289 dataTrace << "All Inserted Tuples:\n";
00290 uint numTuples = 0;
00291
00292
00293
00294
00295 while (hashTableReader.getNext(outputTuple)) {
00296 tuplePrinter.print(dataTrace, inputTupleDesc, outputTuple);
00297 dataTrace << "\n";
00298 numTuples ++;
00299 }
00300 assert (numTuples == numRows);
00301
00302
00303
00304
00305 dataTrace << "All Matched Tuples:\n";
00306 numTuples = 0;
00307
00308
00309
00310
00311
00312
00313 for (i = 0; i < numRows; i ++) {
00314 for (j = 0; j < numCols; j++) {
00315 colValues[j] = i % repeatSeqValues[j];
00316 inputTuple[j].pData = (PBuffer)&(colValues[j]);
00317 }
00318
00319 PBuffer matchingKey =
00320 hashTable.findKey(inputTuple, keyColsProj, true);
00321
00322 if (matchingKey) {
00323 hashTableReader.bindKey(matchingKey);
00324 while (hashTableReader.getNext(outputTuple)) {
00325 tuplePrinter.print(dataTrace, inputTupleDesc, outputTuple);
00326 dataTrace << "\n";
00327 numTuples ++;
00328 }
00329 }
00330 }
00331
00332 assert (numTuples == numRows);
00333
00334 if (dumpHashTable) {
00335 LhxHashTableDump hashTableDump(
00336 TRACE_INFO,
00337 shared_from_this(),
00338 "LhxHashTableTest");
00339 hashTableDump.dump(hashTable);
00340 hashTableDump.dump(dataTrace.str());
00341 }
00342
00343 if (writeToPartition) {
00344 SharedLhxPartition partition =
00345 SharedLhxPartition(new LhxPartition(NULL));
00346 partition->inputIndex = 1;
00347
00348
00349 uint tuplesWritten =
00350 writeHashTable(
00351 (LhxHashInfo const &)hashInfo,
00352 hashTable,
00353 partition);
00354
00355
00356 ostringstream dataTrace;
00357
00358 dataTrace << "[Tuples read from partitions-1]\n";
00359
00360 uint tuplesRead =
00361 readPartition(hashInfo, partition, dataTrace);
00362
00363 if (dumpHashTable) {
00364 LhxHashTableDump hashTableDump(
00365 TRACE_INFO,
00366 shared_from_this(),
00367 "LhxHashTableTest");
00368 hashTableDump.dump(dataTrace.str());
00369 }
00370
00371 assert (tuplesWritten == numRows && tuplesRead == tuplesWritten);
00372 }
00373
00374 if (recursivePartitioning > 0) {
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384 std::vector<SharedLhxPartition> partitions;
00385 uint tuplesWritten[2];
00386
00387
00388 for (int j = 0; j < 2; j ++) {
00389 partitions.push_back(
00390 SharedLhxPartition(new LhxPartition(NULL)));
00391 partitions[j]->inputIndex = 1;
00392 tuplesWritten[j] =
00393 writeHashTable(
00394 (LhxHashInfo const &)hashInfo,
00395 hashTable,
00396 partitions[j]);
00397 }
00398
00399 assert (tuplesWritten[0] == numRows &&
00400 tuplesWritten[0] == tuplesWritten[1]);
00401
00402 uint tuplesRead[2];
00403 SharedLhxPlan plan = SharedLhxPlan(new LhxPlan());
00404
00405 plan->init(WeakLhxPlan(), 0, partitions, false);
00406
00407 LhxPlan *leafPlan;
00408 uint numLeafPlanCreated = 1;
00409 uint numLeafPlanRead = 0;
00410
00411 for (int i = 0; i < recursivePartitioning; i ++) {
00412 numLeafPlanCreated *= LhxPlan::LhxChildPartCount;
00413 numLeafPlanRead = 0;
00414 leafPlan = plan->getFirstLeaf();
00415
00416
00417 while (leafPlan) {
00418 leafPlan->createChildren(hashInfo, false);
00419
00420
00421 leafPlan = leafPlan->getFirstLeaf();
00422 for (int k = 0; k < LhxPlan::LhxChildPartCount; k ++) {
00423 leafPlan = leafPlan->getNextLeaf();
00424 }
00425 }
00426 }
00427
00428
00429
00430
00431
00432 tuplesRead[0] = 0;
00433 tuplesRead[1] = 0;
00434
00435
00436 leafPlan = plan->getFirstLeaf();
00437
00438 while (leafPlan) {
00439 numLeafPlanRead ++;
00440 for (int j = 0; j < 2; j ++) {
00441 ostringstream dataTrace;
00442 dataTrace << "[Tuples read from partitions-2]"
00443 << "recursion depth" << recursivePartitioning
00444 << "inputindex " << j << "\n";
00445 tuplesRead[j] +=
00446 readPartition(
00447 hashInfo,
00448 leafPlan->getPartition(j),
00449 dataTrace);
00450 if (dumpHashTable) {
00451 LhxHashTableDump hashTableDump(
00452 TRACE_INFO,
00453 shared_from_this(),
00454 "LhxHashTableTest");
00455 hashTableDump.dump(dataTrace.str());
00456 }
00457 }
00458 leafPlan = leafPlan->getNextLeaf();
00459 }
00460
00461 assert (numLeafPlanRead == numLeafPlanCreated);
00462 assert ((tuplesRead[0] == tuplesRead[1]) &&
00463 (tuplesRead[0] == numRows));
00464 }
00465
00466 hashTable.releaseResources();
00467 colValues.reset();
00468 }
00469
00470 void LhxHashTableTest::testInsert1Ka()
00471 {
00472 uint numRows = 100;
00473 uint maxBlockCount = 10;
00474 uint partitionLevel = 0;
00475 VectorOfUint values;
00476 uint numKeyCols = 1;
00477 uint numAggs = 0;
00478 uint numDataCols = 0;
00479 bool dumpHashTable = true;
00480 bool writeToPartition = true;
00481 uint recursivePartitioning = 3;
00482 string testName = "testInsert1K";
00483 uint i;
00484
00485 for (i = 0; i < numKeyCols; i ++) {
00486
00487
00488
00489 values.push_back(i + 10);
00490 }
00491
00492 for (i = 0; i < numAggs; i ++) {
00493 values.push_back(10);
00494 }
00495
00496 for (i = 0; i < numDataCols; i ++) {
00497 values.push_back(i + 1);
00498 }
00499
00500 testInsert(
00501 numRows, maxBlockCount, partitionLevel,
00502 values, numKeyCols, numAggs, numDataCols,
00503 dumpHashTable, writeToPartition, recursivePartitioning,
00504 testName);
00505 }
00506
00507 void LhxHashTableTest::testInsert1Kb()
00508 {
00509 uint numRows = 1000;
00510 uint maxBlockCount = 10;
00511 uint partitionLevel = 0;
00512 VectorOfUint values;
00513 uint numKeyCols = 2;
00514 uint numAggs = 0;
00515 uint numDataCols = 4;
00516 bool dumpHashTable = true;
00517 bool writeToPartition = true;
00518 uint recursivePartitioning = 3;
00519 string testName = "testInsert1K";
00520 uint i;
00521
00522 for (i = 0; i < numKeyCols; i ++) {
00523
00524
00525
00526 values.push_back(i + 10);
00527 }
00528
00529 for (i = 0; i < numAggs; i ++) {
00530 values.push_back(10);
00531 }
00532
00533 for (i = 0; i < numDataCols; i ++) {
00534 values.push_back(i + 1);
00535 }
00536
00537 testInsert(
00538 numRows, maxBlockCount, partitionLevel,
00539 values, numKeyCols, numAggs, numDataCols,
00540 dumpHashTable, writeToPartition, recursivePartitioning,
00541 testName);
00542 }
00543
00544 FENNEL_UNIT_TEST_SUITE(LhxHashTableTest);
00545
00546
00547