00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxPartition.h"
00025 #include "fennel/hashexe/LhxHashGenerator.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $");
00029
00030 void LhxPartitionWriter::open(
00031 SharedLhxPartition destPartitionInit,
00032 LhxHashInfo const &hashInfo)
00033 {
00034 destPartition = destPartitionInit;
00035
00036 tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00037
00038 pSegOutputStream = SegOutputStream::newSegOutputStream(
00039 hashInfo.externalSegmentAccessor);
00040 destPartition->segStream =
00041 SegStreamAllocation::newSegStreamAllocation();
00042 destPartition->segStream->beginWrite(pSegOutputStream);
00043
00044 isAggregate = false;
00045 }
00046
00047 void LhxPartitionWriter::open(
00048 SharedLhxPartition destPartitionInit,
00049 LhxHashInfo &hashInfo,
00050 AggComputerList *aggList,
00051 uint numWriterCachePages)
00052 {
00053 destPartition = destPartitionInit;
00054 tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00055
00056 pSegOutputStream = SegOutputStream::newSegOutputStream(
00057 hashInfo.externalSegmentAccessor);
00058 destPartition->segStream =
00059 SegStreamAllocation::newSegStreamAllocation();
00060 destPartition->segStream->beginWrite(pSegOutputStream);
00061
00062 isAggregate = true;
00063
00064
00065
00066 uint partitionLevel = 0;
00067 uint savedNumCachePages = hashInfo.numCachePages;
00068 hashInfo.numCachePages = numWriterCachePages;
00069
00070 hashTable.init(
00071 partitionLevel,
00072 hashInfo,
00073 aggList,
00074 destPartition->inputIndex);
00075 hashTableReader.init(&hashTable, hashInfo, destPartition->inputIndex);
00076
00077 hashInfo.numCachePages = savedNumCachePages;
00078
00079 uint cndKeys = hashInfo.cndKeys.back();
00080 uint usablePageSize =
00081 (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize();
00082
00083 hashTable.calculateNumSlots(cndKeys, usablePageSize, numWriterCachePages);
00084
00085 partialAggTuple.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00086 }
00087
00088 void LhxPartitionWriter::close()
00089 {
00090 if (isAggregate) {
00091
00092
00093
00094 while (hashTableReader.getNext(partialAggTuple)) {
00095 uint tupleStorageLength =
00096 tupleAccessor.getByteCount(partialAggTuple);
00097 PBuffer pDestBuf =
00098 pSegOutputStream->getWritePointer(tupleStorageLength);
00099 tupleAccessor.marshal(partialAggTuple, pDestBuf);
00100 pSegOutputStream->consumeWritePointer(tupleStorageLength);
00101 }
00102 }
00103 destPartition->segStream->endWrite();
00104 pSegOutputStream->close();
00105 }
00106
00107 void LhxPartitionWriter::marshalTuple(TupleData const &inputTuple)
00108 {
00109 uint tupleStorageLength = tupleAccessor.getByteCount(inputTuple);
00110 PBuffer pDestBuf = pSegOutputStream->getWritePointer(tupleStorageLength);
00111 tupleAccessor.marshal(inputTuple, pDestBuf);
00112 pSegOutputStream->consumeWritePointer(tupleStorageLength);
00113 }
00114
00115 void LhxPartitionWriter::aggAndMarshalTuple(TupleData const &inputTuple)
00116 {
00117 while (!hashTable.addTuple(inputTuple)) {
00118
00119
00120
00121 while (hashTableReader.getNext(partialAggTuple)) {
00122 uint tupleStorageLength =
00123 tupleAccessor.getByteCount(partialAggTuple);
00124 PBuffer pDestBuf =
00125 pSegOutputStream->getWritePointer(tupleStorageLength);
00126 tupleAccessor.marshal(partialAggTuple, pDestBuf);
00127 pSegOutputStream->consumeWritePointer(tupleStorageLength);
00128 }
00129 bool reuse = true;
00130
00131
00132
00133 bool status = hashTable.allocateResources(reuse);
00134 assert(status);
00135
00136
00137
00138
00139 hashTableReader.bindKey(NULL);
00140 }
00141 }
00142
00143 void LhxPartitionReader::open(
00144 SharedLhxPartition srcPartitionInit,
00145 LhxHashInfo const &hashInfo)
00146 {
00147 bufState = EXECBUF_NONEMPTY;
00148 srcPartition = srcPartitionInit;
00149
00150 if (!srcPartition->segStream) {
00151
00152
00153
00154
00155 srcIsInputStream = true;
00156 } else {
00157 srcIsInputStream = false;
00158 }
00159
00160 if (srcIsInputStream) {
00161 streamBufAccessor =
00162 hashInfo.streamBufAccessor[srcPartition->inputIndex];
00163 outputTupleDesc = streamBufAccessor->getTupleDesc();
00164 } else {
00165 outputTupleDesc = hashInfo.inputDesc[srcPartition->inputIndex];
00166 tupleAccessor.compute(outputTupleDesc);
00167 tupleAccessor.resetCurrentTupleBuf();
00168
00169
00170
00171
00172
00173
00174 pSegInputStream = srcPartition->segStream->getInputStream();
00175 pSegInputStream->startPrefetch();
00176 }
00177 }
00178
00179 void LhxPartitionReader::close()
00180 {
00181 if (srcIsInputStream) {
00182
00183
00184
00185 } else {
00186 pSegInputStream->close();
00187 }
00188 }
00189
00190 void LhxPartitionReader::unmarshalTuple(TupleData &outputTuple)
00191 {
00192 if (srcIsInputStream) {
00193
00194
00195
00196 streamBufAccessor->unmarshalTuple(outputTuple);
00197 } else {
00198 tupleAccessor.unmarshal(outputTuple);
00199 }
00200 }
00201
00202 void LhxPartitionReader::consumeTuple()
00203 {
00204 if (srcIsInputStream) {
00205 streamBufAccessor->consumeTuple();
00206 } else {
00207 tupleAccessor.resetCurrentTupleBuf();
00208 pSegInputStream->consumeReadPointer(tupleStorageLength);
00209 }
00210 }
00211
00212 bool LhxPartitionReader::isTupleConsumptionPending()
00213 {
00214 if (srcIsInputStream) {
00215 return streamBufAccessor->isTupleConsumptionPending();
00216 } else {
00217 if (tupleAccessor.getCurrentTupleBuf()) {
00218 return true;
00219 } else {
00220 return false;
00221 }
00222 }
00223 }
00224
00225 bool LhxPartitionReader::demandData()
00226 {
00227 if (srcIsInputStream) {
00228 return streamBufAccessor->demandData();
00229 } else {
00230
00231
00232
00233 uint bytesReadable = 0;
00234 PConstBuffer pSrcBuf =
00235 pSegInputStream->getReadPointer(1, &bytesReadable);
00236
00237
00238
00239
00240
00241 if (!pSrcBuf) {
00242 bufState = EXECBUF_EOS;
00243 return false;
00244 } else {
00245 tupleStorageLength = tupleAccessor.getBufferByteCount(pSrcBuf);
00246 assert(bytesReadable >= tupleStorageLength);
00247 if (bytesReadable == tupleStorageLength) {
00248
00249
00250 if (srcPartition->pExecStream) {
00251 srcPartition->pExecStream->checkAbort();
00252 }
00253 }
00254 tupleAccessor.setCurrentTupleBuf(pSrcBuf);
00255 return true;
00256 }
00257 }
00258 }
00259
00260 void LhxPartitionInfo::init(LhxHashInfo *hashInfoInit)
00261 {
00262 hashInfo = hashInfoInit;
00263 numInputs = (hashInfo->inputDesc).size();
00264
00265 writerList.clear();
00266
00267
00268
00269
00270 for (uint i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) {
00271 writerList.push_back(
00272 SharedLhxPartitionWriter(new LhxPartitionWriter()));
00273 }
00274
00275 filteredRowCountList.reset(
00276 new uint[numInputs * LhxPlan::LhxChildPartCount]);
00277 }
00278
00279 void LhxPartitionInfo::open(
00280 LhxHashTableReader *hashTableReaderInit,
00281 LhxPartitionReader *buildReader,
00282 TupleData &buildTupleInit,
00283 SharedLhxPartition probePartition,
00284 uint buildInputIndex)
00285 {
00286 uint i, j;
00287
00288 probeReader.open(probePartition, *hashInfo);
00289
00290
00291
00292
00293 curInputIndex = buildInputIndex;
00294
00295 hashTableReader = hashTableReaderInit;
00296
00297
00298
00299
00300 hashTableReader->bindKey(NULL);
00301
00302
00303
00304
00305 reader = buildReader;
00306
00307
00308
00309
00310 buildTuple = buildTupleInit;
00311
00312 destPartitionList.clear();
00313 subPartStatList.clear();
00314 joinFilterList.clear();
00315 shared_array<uint> curSubPartStat;
00316
00317 for (i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) {
00318 destPartitionList.push_back(
00319 SharedLhxPartition(new LhxPartition(probePartition->pExecStream)));
00320 destPartitionList[i]->inputIndex = (i / LhxPlan::LhxChildPartCount);
00321 subPartStatList.push_back(
00322 shared_array<uint>(new uint[LhxPlan::LhxSubPartCount]));
00323
00324 curSubPartStat = subPartStatList[i];
00325
00326 for (j = 0; j < LhxPlan::LhxSubPartCount; j ++) {
00327 curSubPartStat[j] = 0;
00328 }
00329
00330
00331
00332
00333
00334 joinFilterList.push_back(shared_ptr<dynamic_bitset<> >());
00335
00336 writerList[i]->open(destPartitionList[i], *hashInfo);
00337 filteredRowCountList[i] = 0;
00338 }
00339
00340
00341
00342
00343 partitionMemory = true;
00344 }
00345
00346 void LhxPartitionInfo::open(
00347 LhxHashTableReader *hashTableReaderInit,
00348 LhxPartitionReader *buildReader,
00349 TupleData &buildTupleInit,
00350 AggComputerList *aggList)
00351 {
00352 uint i, j;
00353 assert (numInputs == 1);
00354 uint buildIndex = numInputs - 1;
00355
00356 curInputIndex = buildIndex;
00357
00358 hashTableReader = hashTableReaderInit;
00359
00360
00361
00362
00363 hashTableReader->bindKey(NULL);
00364
00365
00366
00367
00368
00369 reader = buildReader;
00370
00371
00372
00373
00374 buildTuple = buildTupleInit;
00375
00376
00377
00378
00379
00380 uint numWriterCachePages =
00381 hashInfo->numCachePages / LhxPlan::LhxChildPartCount;
00382
00383 destPartitionList.clear();
00384 subPartStatList.clear();
00385 joinFilterList.clear();
00386 shared_array<uint> curSubPartStat;
00387
00388 for (i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) {
00389 destPartitionList.push_back(
00390 SharedLhxPartition(
00391 new LhxPartition(reader->getSourcePartition()->pExecStream)));
00392 destPartitionList[i]->inputIndex = (i / LhxPlan::LhxChildPartCount);
00393 subPartStatList.push_back(
00394 shared_array<uint>(new uint[LhxPlan::LhxSubPartCount]));
00395
00396 curSubPartStat = subPartStatList[i];
00397
00398 for (j = 0; j < LhxPlan::LhxSubPartCount; j ++) {
00399 curSubPartStat[j] = 0;
00400 }
00401
00402
00403
00404
00405
00406
00407 joinFilterList.push_back(shared_ptr<dynamic_bitset<> >());
00408
00409 writerList[i]->open(
00410 destPartitionList[i],
00411 *hashInfo,
00412 aggList,
00413 numWriterCachePages);
00414 filteredRowCountList[i] = 0;
00415 }
00416
00417
00418
00419
00420 partitionMemory = true;
00421 }
00422
00423 void LhxPartitionInfo::close()
00424 {
00425 reader->close();
00426
00427 uint numWriters = writerList.size();
00428
00429 for (uint i = 0; i < numWriters; i ++) {
00430 writerList[i]->close();
00431 }
00432
00433
00434
00435
00436
00437
00438
00439
00440 for (uint i = 0; i < numWriters; i ++) {
00441 writerList[i]->releaseResources();
00442 }
00443 }
00444
00445 void LhxPlan::init(
00446 WeakLhxPlan parentPlanInit,
00447 uint partitionLevelInit,
00448 vector<SharedLhxPartition> &partitionsInit,
00449 bool enableSubPartStat)
00450 {
00451
00452
00453
00454 shared_ptr<dynamic_bitset<> > joinFilterInit =
00455 shared_ptr<dynamic_bitset<> >();
00456 vector<shared_array<uint> > subPartStatsInit;
00457 VectorOfUint filteredRows;
00458
00459 for (uint i = 0; i < partitionsInit.size(); i ++) {
00460 subPartStatsInit.push_back(shared_array<uint>());
00461 filteredRows.push_back(0);
00462 }
00463
00464 init(
00465 parentPlanInit, partitionLevelInit, partitionsInit,
00466 subPartStatsInit, joinFilterInit, filteredRows,
00467 enableSubPartStat, false);
00468 }
00469
00470 void LhxPlan::init(
00471 WeakLhxPlan parentPlanInit,
00472 uint partitionLevelInit,
00473 vector<SharedLhxPartition> &partitionsInit,
00474 vector<shared_array<uint> > &subPartStats,
00475 shared_ptr<dynamic_bitset<> > joinFilterInit,
00476 VectorOfUint &filteredRowsInit,
00477 bool enableSubPartStat,
00478 bool enableSwing)
00479 {
00480 uint numInputs = partitionsInit.size();
00481
00482 partitionLevel = partitionLevelInit;
00483 parentPlan = parentPlanInit;
00484
00485
00486
00487
00488
00489
00490
00491 joinFilter = joinFilterInit;
00492
00493 filteredRowCount.reset(new uint[numInputs]);
00494 inputSize.reset(new uint[numInputs]);
00495 joinSideToInputMap.reset(new uint[numInputs]);
00496 subPartToChildMap.reset();
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506 for (int i = 0; i < numInputs; i ++) {
00507 partitions.push_back(partitionsInit[i]);
00508 filteredRowCount[i] = filteredRowsInit[i];
00509 joinSideToInputMap[i] = i;
00510
00511 inputSize[i] = 0;
00512 shared_array<uint> inputSubPartStat = subPartStats[i];
00513
00514 if (inputSubPartStat) {
00515 for (int k = 0; k < LhxSubPartCount; k ++) {
00516 inputSize[i] += inputSubPartStat[k];
00517 }
00518 }
00519 }
00520
00521
00522
00523
00524
00525
00526 if (enableSwing &&
00527 (numInputs == 2) && (inputSize[0] < inputSize[1])) {
00528 joinSideToInputMap[0] = 1;
00529 joinSideToInputMap[1] = 0;
00530 }
00531
00532 subPartToChildMap.reset();
00533
00534 if (enableSubPartStat) {
00535
00536
00537
00538
00539
00540 mapSubPartToChild(subPartStats);
00541 }
00542 }
00543
00544 void LhxPlan::mapSubPartToChild(
00545 vector<shared_array<uint> > &subPartStats)
00546 {
00547 uint numInputs = partitions.size();
00548 uint buildIndex = getBuildInput();
00549 shared_array<uint> buildSubPartStat = subPartStats[buildIndex];
00550
00551 if (!buildSubPartStat) {
00552 return;
00553 }
00554
00555 uint i, j, k;
00556
00557 subPartToChildMap.reset(new uint[LhxSubPartCount]);
00558
00559 for (i = 0; i < numInputs; i ++) {
00560 childPartSize.push_back(
00561 shared_array<uint>(new uint[LhxChildPartCount]));
00562 }
00563
00564 shared_array<uint> buildChildPartSize = childPartSize[buildIndex];
00565
00566 for (i = 0; i < LhxChildPartCount; i ++) {
00567 buildChildPartSize[i] = 0;
00568 }
00569
00570 j = 0;
00571 for (i = 0; i < LhxSubPartCount; i ++) {
00572 buildChildPartSize[j] += buildSubPartStat[i];
00573 subPartToChildMap[i] = j;
00574
00575 k = 1;
00576 while (
00577 (buildChildPartSize[j]
00578 > buildChildPartSize[(j + k) % LhxChildPartCount])
00579 && k < LhxChildPartCount)
00580 {
00581 k ++;
00582 }
00583
00584 if (k == LhxChildPartCount) {
00585
00586
00587 j = (j + 1) % LhxChildPartCount;
00588 }
00589 }
00590
00591
00592
00593
00594 if (numInputs == 2) {
00595 uint probeIndex = getProbeInput();
00596 shared_array<uint> probeChildPartSize = childPartSize[probeIndex];
00597 shared_array<uint> probeSubPartStat = subPartStats[probeIndex];
00598
00599 for (i = 0; i < LhxChildPartCount; i ++) {
00600 probeChildPartSize[i] = 0;
00601 }
00602
00603 for (i = 0; i < LhxSubPartCount; i ++) {
00604 probeChildPartSize[subPartToChildMap[i]] += probeSubPartStat[i];
00605 }
00606 }
00607 }
00608
00609 uint LhxPlan::calculateChildIndex(uint hashKey, uint curInputIndex)
00610 {
00611 if (subPartToChildMap) {
00612 return (subPartToChildMap[hashKey % LhxSubPartCount] +
00613 curInputIndex * LhxChildPartCount);
00614 } else {
00615 return (hashKey % LhxChildPartCount +
00616 curInputIndex * LhxChildPartCount);
00617 }
00618 }
00619
00620 LhxPartitionState LhxPlan::generatePartitions(
00621 LhxHashInfo const &hashInfo,
00622 LhxPartitionInfo &partInfo)
00623 {
00624
00625
00626 uint filterSize = 4096;
00627 bool isAggregate = (partInfo.numInputs == 1);
00628
00629 LhxHashGenerator hashGenPrev;
00630 LhxHashGenerator hashGen;
00631 LhxHashGenerator hashGenNext;
00632
00633 hashGenPrev.init(partitionLevel);
00634 hashGen.init(partitionLevel + 1);
00635 hashGenNext.init(partitionLevel + 2);
00636
00637 LhxPartitionReader *&reader = partInfo.reader;
00638 vector<SharedLhxPartitionWriter> &writerList = partInfo.writerList;
00639 vector<shared_ptr<dynamic_bitset<> > > &joinFilterList =
00640 partInfo.joinFilterList;
00641 vector<shared_array<uint> > &subPartStatList = partInfo.subPartStatList;
00642 shared_array<uint> &filteredRowCountList = partInfo.filteredRowCountList;
00643
00644 uint &curInputIndex = partInfo.curInputIndex;
00645 uint otherInputIndex = partInfo.numInputs - curInputIndex - 1;
00646
00647 TupleData inputTuple;
00648 TupleDescriptor inputTupleDesc = reader->getTupleDesc();
00649 inputTuple.compute(inputTupleDesc);
00650
00651 uint prevHashKey;
00652 uint hashKey;
00653 uint nextHashKey;
00654
00655 uint childPartIndex;
00656 bool writeToPartition;
00657
00658 uint statIndex;
00659 shared_array<uint> curSubPartStat;
00660
00661
00662
00663
00664
00665 if (partInfo.partitionMemory) {
00666 TupleData hashTableTuple;
00667 TupleDescriptor hashTableTupleDesc = hashInfo.inputDesc[curInputIndex];
00668
00669 hashTableTuple.compute(hashTableTupleDesc);
00670
00671 while ((partInfo.hashTableReader)->getNext(hashTableTuple)) {
00672 writeToPartition = false;
00673
00674 hashKey = hashGen.hash(
00675 hashTableTuple,
00676 hashInfo.keyProj[curInputIndex],
00677 hashInfo.isKeyColVarChar[curInputIndex]);
00678
00679 childPartIndex = calculateChildIndex(hashKey, curInputIndex);
00680
00681 if (hashInfo.useJoinFilter[curInputIndex]) {
00682
00683
00684
00685
00686 if (partitionLevel == 0) {
00687 writeToPartition = true;
00688 } else {
00689 prevHashKey =
00690 hashGenPrev.hash(
00691 hashTableTuple,
00692 hashInfo.keyProj[curInputIndex],
00693 hashInfo.isKeyColVarChar[curInputIndex]);
00694 if (joinFilter &&
00695 joinFilter->test(prevHashKey % filterSize)) {
00696 writeToPartition = true;
00697 } else {
00698 filteredRowCountList[childPartIndex]++;
00699 }
00700 }
00701 } else {
00702
00703
00704
00705 writeToPartition = true;
00706 }
00707
00708 if (writeToPartition) {
00709 writerList[childPartIndex]->marshalTuple(hashTableTuple);
00710
00711 nextHashKey = hashGenNext.hash(
00712 hashTableTuple,
00713 hashInfo.keyProj[curInputIndex],
00714 hashInfo.isKeyColVarChar[curInputIndex]);
00715
00716 statIndex = nextHashKey % LhxSubPartCount;
00717 curSubPartStat = subPartStatList[childPartIndex];
00718 curSubPartStat[statIndex]++;
00719
00720
00721
00722
00723
00724
00725 if (!joinFilterList[childPartIndex]) {
00726
00727
00728
00729 joinFilterList[childPartIndex].reset(
00730 new dynamic_bitset<>(filterSize));
00731 }
00732 joinFilterList[childPartIndex]->set(hashKey % filterSize);
00733 }
00734 }
00735
00736 if (isAggregate) {
00737
00738
00739
00740
00741 ((partInfo.hashTableReader)->getHashTable())->releaseResources();
00742 for (int i = 0; i < writerList.size();i ++) {
00743 writerList[i]->allocateResources();
00744 }
00745 }
00746
00747
00748
00749
00750
00751 partInfo.partitionMemory = false;
00752
00753
00754
00755
00756
00757 inputTuple = partInfo.buildTuple;
00758 }
00759
00760 for (;;) {
00761
00762
00763
00764
00765
00766 if (!reader->isTupleConsumptionPending()) {
00767 if (reader->getState() == EXECBUF_EOS) {
00768 if (curInputIndex == getProbeInput()) {
00769
00770
00771
00772
00773 return PartitionEndOfData;
00774 } else {
00775 curInputIndex = getProbeInput();
00776 otherInputIndex = partInfo.numInputs - curInputIndex - 1;
00777 reader->close();
00778 reader = &partInfo.probeReader;
00779 inputTupleDesc = reader->getTupleDesc();
00780 inputTuple.compute(inputTupleDesc);
00781 continue;
00782 }
00783 }
00784
00785 if (!reader->demandData()) {
00786 if (partitionLevel == 0) {
00787
00788
00789
00790 return PartitionUnderflow;
00791 } else {
00792 if (curInputIndex == getProbeInput()) {
00793
00794
00795
00796
00797 return PartitionEndOfData;
00798 } else {
00799 curInputIndex = getProbeInput();
00800 reader->close();
00801 reader = &partInfo.probeReader;
00802 inputTupleDesc = reader->getTupleDesc();
00803 inputTuple.compute(inputTupleDesc);
00804 continue;
00805 }
00806 }
00807 }
00808
00809 reader->unmarshalTuple(inputTuple);
00810 }
00811
00812 writeToPartition = false;
00813
00814 hashKey = hashGen.hash(
00815 inputTuple,
00816 hashInfo.keyProj[curInputIndex],
00817 hashInfo.isKeyColVarChar[curInputIndex]);
00818
00819 childPartIndex = calculateChildIndex(hashKey, curInputIndex);
00820
00821 nextHashKey = hashGenNext.hash(
00822 inputTuple,
00823 hashInfo.keyProj[curInputIndex],
00824 hashInfo.isKeyColVarChar[curInputIndex]);
00825
00826 statIndex = nextHashKey % LhxSubPartCount;
00827
00828 if (!isAggregate) {
00829 if (hashInfo.useJoinFilter[curInputIndex]) {
00830
00831
00832
00833 if (isBuildChildPart(childPartIndex)) {
00834
00835
00836
00837
00838
00839 if (partitionLevel == 0) {
00840 writeToPartition = true;
00841 } else {
00842 prevHashKey =
00843 hashGenPrev.hash(
00844 inputTuple,
00845 hashInfo.keyProj[curInputIndex],
00846 hashInfo.isKeyColVarChar[curInputIndex]);
00847 if (joinFilter &&
00848 joinFilter->test(prevHashKey % filterSize)) {
00849 writeToPartition = true;
00850 } else {
00851 filteredRowCountList[childPartIndex]++;
00852 }
00853 }
00854 } else {
00855
00856
00857
00858
00859
00860 if (joinFilterList[getBuildChildPart(childPartIndex)] &&
00861 joinFilterList[getBuildChildPart(childPartIndex)]->
00862 test(hashKey % filterSize)) {
00863 writeToPartition = true;
00864 } else {
00865 filteredRowCountList[childPartIndex]++;
00866 }
00867 }
00868 } else {
00869
00870
00871
00872 writeToPartition = true;
00873 }
00874
00875 if (writeToPartition) {
00876 writerList[childPartIndex]->marshalTuple(inputTuple);
00877 curSubPartStat = subPartStatList[childPartIndex];
00878 curSubPartStat[statIndex]++;
00879
00880
00881
00882
00883
00884
00885 if (!joinFilterList[childPartIndex]) {
00886
00887
00888
00889 joinFilterList[childPartIndex].reset(
00890 new dynamic_bitset<>(filterSize));
00891 }
00892 joinFilterList[childPartIndex]->set(hashKey % filterSize);
00893 }
00894 } else {
00895 writerList[childPartIndex]->aggAndMarshalTuple(inputTuple);
00896 (subPartStatList[childPartIndex])[statIndex]++;
00897 }
00898
00899 reader->consumeTuple();
00900 }
00901 }
00902
00903 void LhxPlan::createChildren(
00904 LhxHashInfo const &hashInfo,
00905 bool enableSubPartStat)
00906 {
00907 LhxHashGenerator hashGen;
00908 hashGen.init(partitionLevel + 1);
00909
00910 uint numInputs = hashInfo.inputDesc.size();
00911
00912 vector<SharedLhxPartition> destPartitionList(LhxChildPartCount * numInputs);
00913
00914 LhxPartitionReader reader;
00915 LhxPartitionWriter writerList[LhxChildPartCount];
00916 uint childNum, i, j;
00917 TupleData outputTuple;
00918
00919
00920
00921
00922 for (j = 0; j < numInputs; j ++) {
00923 reader.open(partitions[j], hashInfo);
00924 outputTuple.compute(hashInfo.inputDesc[j]);
00925
00926 for (i = 0; i < LhxChildPartCount; i ++) {
00927 uint index = j * LhxChildPartCount + i;
00928 destPartitionList[index].reset(
00929 new LhxPartition(partitions[j]->pExecStream));
00930 destPartitionList[index]->inputIndex = j;
00931 writerList[i].open(destPartitionList[index], hashInfo);
00932 }
00933
00934 for (;;) {
00935 if (!reader.isTupleConsumptionPending()) {
00936 if (reader.getState() == EXECBUF_EOS) {
00937
00938
00939
00940 break;
00941 }
00942 if (!reader.demandData()) {
00943 break;
00944 }
00945 reader.unmarshalTuple(outputTuple);
00946 }
00947
00948 childNum =
00949 hashGen.hash(
00950 outputTuple,
00951 hashInfo.keyProj[j],
00952 hashInfo.isKeyColVarChar[j]) % LhxChildPartCount;
00953
00954 writerList[childNum].marshalTuple(outputTuple);
00955 reader.consumeTuple();
00956 }
00957
00958 for (i = 0; i < LhxChildPartCount; i ++) {
00959 writerList[i].close();
00960 }
00961
00962
00963
00964
00965
00966
00967
00968
00969 for (i = 0; i < LhxChildPartCount; i ++) {
00970 writerList[i].releaseResources();
00971 }
00972
00973 reader.close();
00974 }
00975
00976
00977
00978
00979 for (i = 0; i < LhxChildPartCount; i ++) {
00980 SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan());
00981 vector<SharedLhxPartition> partitionList;
00982 partitionList.push_back(destPartitionList[i]);
00983 partitionList.push_back(destPartitionList[i + LhxChildPartCount]);
00984
00985 newChildPlan->init(
00986 WeakLhxPlan(shared_from_this()),
00987 partitionLevel + 1,
00988 partitionList,
00989 enableSubPartStat);
00990
00991 newChildPlan->addSibling(firstChildPlan);
00992 firstChildPlan = newChildPlan;
00993 }
00994 }
00995
00996 void LhxPlan::createChildren(
00997 LhxPartitionInfo &partInfo,
00998 bool enableSubPartStat,
00999 bool enableSwing)
01000 {
01001 uint i, j;
01002
01003 for (i = 0; i < LhxChildPartCount; i ++) {
01004 SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan());
01005 vector<SharedLhxPartition> partitionList;
01006 vector<shared_array<uint> > subPartStats;
01007 VectorOfUint filteredRows;
01008 for (j = 0; j < partInfo.numInputs; j ++) {
01009 partitionList.push_back(
01010 partInfo.destPartitionList[i + LhxChildPartCount * j]);
01011 subPartStats.push_back(
01012 partInfo.subPartStatList[i + LhxChildPartCount * j]);
01013 filteredRows.push_back(
01014 partInfo.filteredRowCountList[i + LhxChildPartCount * j]);
01015 }
01016 newChildPlan->init(
01017 WeakLhxPlan(shared_from_this()),
01018 partitionLevel + 1,
01019 partitionList,
01020 subPartStats,
01021 partInfo.joinFilterList[getProbeChildPart(i)],
01022 filteredRows,
01023 enableSubPartStat,
01024 enableSwing);
01025
01026 newChildPlan->addSibling(firstChildPlan);
01027 firstChildPlan = newChildPlan;
01028 }
01029 partInfo.destPartitionList.clear();
01030 partInfo.joinFilterList.clear();
01031 }
01032
01033 LhxPlan *LhxPlan::getFirstLeaf()
01034 {
01035 if (!firstChildPlan) {
01036 return this;
01037 } else {
01038 return firstChildPlan->getFirstLeaf();
01039 }
01040 }
01041
01042 LhxPlan *LhxPlan::getNextLeaf()
01043 {
01044 if (siblingPlan) {
01045 return siblingPlan->getFirstLeaf();
01046 } else {
01047 WeakLhxPlan parent = this->parentPlan;
01048 SharedLhxPlan shared_parent = parent.lock();
01049
01050 if (shared_parent) {
01051 return shared_parent->getNextLeaf();
01052 } else {
01053 return NULL;
01054 }
01055 }
01056 }
01057
01058 void LhxPlan::close()
01059 {
01060 if (firstChildPlan) {
01061 firstChildPlan->close();
01062 }
01063
01064 if (siblingPlan) {
01065 siblingPlan->close();
01066 }
01067
01068 for (uint i = 0; i < partitions.size(); i ++) {
01069 if (partitions[i] && partitions[i]->segStream) {
01070 partitions[i]->segStream->close();
01071 }
01072 }
01073 }
01074
01075 string LhxPlan::toString()
01076 {
01077 ostringstream planTrace;
01078
01079 planTrace << "\n"
01080 << "[Plan : addr = " << this << "]\n"
01081 << "[ level = " << partitionLevel << "]\n"
01082 << "[ parent = " << parentPlan.lock().get() << "]\n"
01083 << "[ firstChild = " << firstChildPlan.get() << "]\n"
01084 << "[ sibling = " << siblingPlan.get() << "]\n";
01085
01086
01087
01088
01089 planTrace << "[joinFilter = ";
01090 if (joinFilter) {
01091 planTrace << joinFilter.get();
01092 }
01093 planTrace << "]\n";
01094
01095 for (uint i = 0; i < partitions.size(); i ++) {
01096 planTrace << "[Partition(" << i << ")]\n"
01097 << "[ inputIndex = " << partitions[i]->inputIndex << "]\n"
01098 << "[ join side = " << getJoinSide(partitions[i]->inputIndex) << "]\n"
01099 << "[ filteredRows = " << filteredRowCount[i] << "]\n"
01100 << "[ inputSize = " << inputSize[i] << "]\n";
01101 planTrace << "[ childPartSize = ";
01102 if (childPartSize.size() > i) {
01103 shared_array<uint> oneChildPartSize = childPartSize[i];
01104 if (oneChildPartSize) {
01105 for (uint j = 0; j < LhxChildPartCount; j ++) {
01106 planTrace << oneChildPartSize[j] << " ";
01107 }
01108 }
01109 }
01110 planTrace << "]\n";
01111 }
01112
01113 planTrace << "[subPartToChildMap = ";
01114 if (subPartToChildMap) {
01115 for (uint i = 0; i < LhxSubPartCount; i ++) {
01116 planTrace << subPartToChildMap[i] << " ";
01117 }
01118 }
01119 planTrace << "]\n";
01120
01121 SharedLhxPlan childPlan = firstChildPlan;
01122
01123 while (childPlan) {
01124 planTrace << childPlan->toString();
01125 childPlan = childPlan->siblingPlan;
01126 }
01127
01128 return planTrace.str();
01129 }
01130
01131 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $");
01132
01133