#include <LhxPartition.h>
Public Member Functions | |
void | init (WeakLhxPlan parentPlanInit, uint partitionLevelInit, vector< SharedLhxPartition > &partitionsInit, bool enableSubPartStat) |
Initialize a plan, with its input partitions and parent plan. | |
void | init (WeakLhxPlan parentPlanInit, uint partitionLevelInit, vector< SharedLhxPartition > &partitionsInit, vector< shared_array< uint > > &subPartStats, shared_ptr< dynamic_bitset<> > filterInit, VectorOfUint &filteredRowsInit, bool enableSubPartStat, bool enableSwing) |
Initialize a plan. | |
LhxPartitionState | generatePartitions (LhxHashInfo const &hashInfo, LhxPartitionInfo &partInfo) |
Generate partitions for the child plans. | |
void | createChildren (LhxHashInfo const &hashInfo, bool enableSubPartStat) |
Partition this plan and create child plan. | |
void | createChildren (LhxPartitionInfo &partInfo, bool enableSubPartStat, bool enableSwing) |
Create child plan from partitions provided via partInfo. | |
uint | getPartitionLevel () |
Get the partition level of this plan. | |
SharedLhxPartition | getBuildPartition () |
Get the partition corresponding to inputIndex. | |
SharedLhxPartition | getProbePartition () |
SharedLhxPartition | getPartition (uint inputIndex) |
uint | getBuildInput () |
uint | getProbeInput () |
uint | getJoinSide (uint inputIndex) |
SharedLhxPlan | getFirstChild () |
Get the first child plan. | |
LhxPlan * | getFirstLeaf () |
Get first leaf plan in dfs order. | |
LhxPlan * | getNextLeaf () |
Get next leaf plan in dfs order. | |
void | close () |
Close the plan tree. | |
string | toString () |
Print the content of the plan tree rooted at this plan. | |
Static Public Attributes | |
static const uint | LhxSubPartCount = 16 |
static const uint | LhxChildPartCount = 3 |
Private Member Functions | |
void | addSibling (SharedLhxPlan siblingPlan) |
Add sibling plan. | |
void | mapSubPartToChild (vector< shared_array< uint > > &subPartStats) |
Using sub partition stats gathered at the previous partition level, map sub partitions to child partitions. | |
uint | calculateChildIndex (uint hashKey, uint curInputIndex) |
Calculate the target child partition index based on the hashkey of a tuple. | |
bool | isBuildChildPart (uint childPartIndex) |
bool | isProbeChildPart (uint childPartIndex) |
uint | getBuildChildPart (uint childPartIndex) |
uint | getProbeChildPart (uint childPartIndex) |
Private Attributes | |
uint | partitionLevel |
vector< SharedLhxPartition > | partitions |
shared_array< uint > | joinSideToInputMap |
shared_ptr< dynamic_bitset<> > | joinFilter |
shared_array< uint > | filteredRowCount |
shared_array< uint > | subPartToChildMap |
vector< shared_array< uint > > | childPartSize |
shared_array< uint > | inputSize |
WeakLhxPlan | parentPlan |
SharedLhxPlan | firstChildPlan |
SharedLhxPlan | siblingPlan |
Definition at line 291 of file LhxPartition.h.
void LhxPlan::addSibling | ( | SharedLhxPlan | siblingPlan | ) | [inline, private] |
Add sibling plan.
Definition at line 495 of file LhxPartition.h.
References siblingPlan.
00496 { 00497 siblingPlan = siblingPlanInit; 00498 }
void LhxPlan::mapSubPartToChild | ( | vector< shared_array< uint > > & | subPartStats | ) | [private] |
Using sub partition stats gathered at the previous partition level, map sub partitions to child partitions.
The objective is to come up with child partitions of similar size.
Definition at line 544 of file LhxPartition.cpp.
References childPartSize, getBuildInput(), getProbeInput(), LhxChildPartCount, LhxSubPartCount, partitions, and subPartToChildMap.
Referenced by init().
00546 { 00547 uint numInputs = partitions.size(); 00548 uint buildIndex = getBuildInput(); 00549 shared_array<uint> buildSubPartStat = subPartStats[buildIndex]; 00550 00551 if (!buildSubPartStat) { 00552 return; 00553 } 00554 00555 uint i, j, k; 00556 00557 subPartToChildMap.reset(new uint[LhxSubPartCount]); 00558 00559 for (i = 0; i < numInputs; i ++) { 00560 childPartSize.push_back( 00561 shared_array<uint>(new uint[LhxChildPartCount])); 00562 } 00563 00564 shared_array<uint> buildChildPartSize = childPartSize[buildIndex]; 00565 00566 for (i = 0; i < LhxChildPartCount; i ++) { 00567 buildChildPartSize[i] = 0; 00568 } 00569 00570 j = 0; 00571 for (i = 0; i < LhxSubPartCount; i ++) { 00572 buildChildPartSize[j] += buildSubPartStat[i]; 00573 subPartToChildMap[i] = j; 00574 00575 k = 1; 00576 while ( 00577 (buildChildPartSize[j] 00578 > buildChildPartSize[(j + k) % LhxChildPartCount]) 00579 && k < LhxChildPartCount) 00580 { 00581 k ++; 00582 } 00583 00584 if (k == LhxChildPartCount) { 00585 // If current child partition is bigger than all other child 00586 // partitions, move to the next child 00587 j = (j + 1) % LhxChildPartCount; 00588 } 00589 } 00590 00591 /* 00592 * This is simply stats keeping for the probe side child partitions. 00593 */ 00594 if (numInputs == 2) { 00595 uint probeIndex = getProbeInput(); 00596 shared_array<uint> probeChildPartSize = childPartSize[probeIndex]; 00597 shared_array<uint> probeSubPartStat = subPartStats[probeIndex]; 00598 00599 for (i = 0; i < LhxChildPartCount; i ++) { 00600 probeChildPartSize[i] = 0; 00601 } 00602 00603 for (i = 0; i < LhxSubPartCount; i ++) { 00604 probeChildPartSize[subPartToChildMap[i]] += probeSubPartStat[i]; 00605 } 00606 } 00607 }
Calculate the target child partition index based on the hashkey of a tuple.
Definition at line 609 of file LhxPartition.cpp.
References LhxChildPartCount, LhxSubPartCount, and subPartToChildMap.
Referenced by generatePartitions().
00610 { 00611 if (subPartToChildMap) { 00612 return (subPartToChildMap[hashKey % LhxSubPartCount] + 00613 curInputIndex * LhxChildPartCount); 00614 } else { 00615 return (hashKey % LhxChildPartCount + 00616 curInputIndex * LhxChildPartCount); 00617 } 00618 }
bool LhxPlan::isBuildChildPart | ( | uint | childPartIndex | ) | [inline, private] |
Definition at line 547 of file LhxPartition.h.
References getBuildInput(), and LhxChildPartCount.
Referenced by generatePartitions().
00548 { 00549 return ((childPartIndex / LhxChildPartCount) == getBuildInput()); 00550 }
bool LhxPlan::isProbeChildPart | ( | uint | childPartIndex | ) | [inline, private] |
Definition at line 552 of file LhxPartition.h.
References getProbeInput(), and LhxChildPartCount.
00553 { 00554 return ((childPartIndex / LhxChildPartCount) == getProbeInput()); 00555 }
Definition at line 557 of file LhxPartition.h.
References getBuildInput(), and LhxChildPartCount.
Referenced by generatePartitions().
00558 { 00559 return ((childPartIndex % LhxChildPartCount) + 00560 getBuildInput() * LhxChildPartCount); 00561 }
Definition at line 563 of file LhxPartition.h.
References getProbeInput(), and LhxChildPartCount.
Referenced by createChildren().
00564 { 00565 return ((childPartIndex % LhxChildPartCount) + 00566 getProbeInput() * LhxChildPartCount); 00567 }
void LhxPlan::init | ( | WeakLhxPlan | parentPlanInit, | |
uint | partitionLevelInit, | |||
vector< SharedLhxPartition > & | partitionsInit, | |||
bool | enableSubPartStat | |||
) |
Initialize a plan, with its input partitions and parent plan.
Definition at line 445 of file LhxPartition.cpp.
00450 { 00451 /* 00452 * No filter for this plan. 00453 */ 00454 shared_ptr<dynamic_bitset<> > joinFilterInit = 00455 shared_ptr<dynamic_bitset<> >(); 00456 vector<shared_array<uint> > subPartStatsInit; 00457 VectorOfUint filteredRows; 00458 00459 for (uint i = 0; i < partitionsInit.size(); i ++) { 00460 subPartStatsInit.push_back(shared_array<uint>()); 00461 filteredRows.push_back(0); 00462 } 00463 00464 init( 00465 parentPlanInit, partitionLevelInit, partitionsInit, 00466 subPartStatsInit, joinFilterInit, filteredRows, 00467 enableSubPartStat, false); 00468 }
void LhxPlan::init | ( | WeakLhxPlan | parentPlanInit, | |
uint | partitionLevelInit, | |||
vector< SharedLhxPartition > & | partitionsInit, | |||
vector< shared_array< uint > > & | subPartStats, | |||
shared_ptr< dynamic_bitset<> > | filterInit, | |||
VectorOfUint & | filteredRowsInit, | |||
bool | enableSubPartStat, | |||
bool | enableSwing | |||
) |
Initialize a plan.
Definition at line 470 of file LhxPartition.cpp.
References filteredRowCount, inputSize, joinFilter, joinSideToInputMap, LhxSubPartCount, mapSubPartToChild(), parentPlan, partitionLevel, partitions, and subPartToChildMap.
00479 { 00480 uint numInputs = partitionsInit.size(); 00481 00482 partitionLevel = partitionLevelInit; 00483 parentPlan = parentPlanInit; 00484 00485 /* 00486 * REVIEW(rchen 2006-08-08): if input sides swing, then the new build could 00487 * be the same as the probe input of the previous partition. This filter 00488 * will not filter any tuple as it tries to filter the very input the 00489 * filter is built on. 00490 */ 00491 joinFilter = joinFilterInit; 00492 00493 filteredRowCount.reset(new uint[numInputs]); 00494 inputSize.reset(new uint[numInputs]); 00495 joinSideToInputMap.reset(new uint[numInputs]); 00496 subPartToChildMap.reset(); 00497 00498 // REVIEW jvs 26-Aug-2006: here "will be used" means once someone 00499 // gets around to true hybrid, right? 00500 /* 00501 * After support is added for repartitioning using stats (gathered during 00502 * previous round of partitioning), a bin-packing algorithm will be used to 00503 * keep in memory some subpartitions and output the rest to child 00504 * partitions of similar size. 00505 */ 00506 for (int i = 0; i < numInputs; i ++) { 00507 partitions.push_back(partitionsInit[i]); 00508 filteredRowCount[i] = filteredRowsInit[i]; 00509 joinSideToInputMap[i] = i; 00510 00511 inputSize[i] = 0; 00512 shared_array<uint> inputSubPartStat = subPartStats[i]; 00513 00514 if (inputSubPartStat) { 00515 for (int k = 0; k < LhxSubPartCount; k ++) { 00516 inputSize[i] += inputSubPartStat[k]; 00517 } 00518 } 00519 } 00520 00521 /* 00522 * Map join side to input, using partitions stats associated with each 00523 * input. The input with the smaller side witll be the build side for the 00524 * join: joinSide for the build will map to the index of this input. 00525 */ 00526 if (enableSwing && 00527 (numInputs == 2) && (inputSize[0] < inputSize[1])) { 00528 joinSideToInputMap[0] = 1; 00529 joinSideToInputMap[1] = 0; 00530 } 00531 00532 subPartToChildMap.reset(); 00533 00534 if (enableSubPartStat) { 00535 /* 00536 * Use build side sub part stat to divide both inputs into child 00537 * partitions. Needs to be called after the join sides have been 00538 * assigned. 00539 */ 00540 mapSubPartToChild(subPartStats); 00541 } 00542 }
LhxPartitionState LhxPlan::generatePartitions | ( | LhxHashInfo const & | hashInfo, | |
LhxPartitionInfo & | partInfo | |||
) |
Generate partitions for the child plans.
Definition at line 620 of file LhxPartition.cpp.
References LhxPartitionInfo::buildTuple, calculateChildIndex(), LhxPartitionReader::close(), TupleData::compute(), LhxPartitionReader::consumeTuple(), LhxPartitionInfo::curInputIndex, LhxPartitionReader::demandData(), EXECBUF_EOS, LhxPartitionInfo::filteredRowCountList, getBuildChildPart(), getProbeInput(), LhxPartitionReader::getState(), LhxPartitionReader::getTupleDesc(), LhxHashGenerator::hash(), LhxPartitionInfo::hashTableReader, LhxHashGenerator::init(), LhxHashInfo::inputDesc, isBuildChildPart(), LhxHashInfo::isKeyColVarChar, LhxPartitionReader::isTupleConsumptionPending(), joinFilter, LhxPartitionInfo::joinFilterList, LhxHashInfo::keyProj, LhxSubPartCount, LhxPartitionInfo::numInputs, PartitionEndOfData, partitionLevel, LhxPartitionInfo::partitionMemory, PartitionUnderflow, LhxPartitionInfo::probeReader, LhxPartitionInfo::reader, LhxPartitionInfo::subPartStatList, LhxPartitionReader::unmarshalTuple(), LhxHashInfo::useJoinFilter, and LhxPartitionInfo::writerList.
Referenced by LhxJoinExecStream::execute(), and LhxAggExecStream::execute().
00623 { 00624 // REVIEW jvs 26-Aug-2006: modulo on this is computed below; make 00625 // sure compiler is optimizing to bitmask, or do it by hand 00626 uint filterSize = 4096; 00627 bool isAggregate = (partInfo.numInputs == 1); 00628 00629 LhxHashGenerator hashGenPrev; 00630 LhxHashGenerator hashGen; 00631 LhxHashGenerator hashGenNext; 00632 00633 hashGenPrev.init(partitionLevel); 00634 hashGen.init(partitionLevel + 1); 00635 hashGenNext.init(partitionLevel + 2); 00636 00637 LhxPartitionReader *&reader = partInfo.reader; 00638 vector<SharedLhxPartitionWriter> &writerList = partInfo.writerList; 00639 vector<shared_ptr<dynamic_bitset<> > > &joinFilterList = 00640 partInfo.joinFilterList; 00641 vector<shared_array<uint> > &subPartStatList = partInfo.subPartStatList; 00642 shared_array<uint> &filteredRowCountList = partInfo.filteredRowCountList; 00643 00644 uint &curInputIndex = partInfo.curInputIndex; 00645 uint otherInputIndex = partInfo.numInputs - curInputIndex - 1; 00646 00647 TupleData inputTuple; 00648 TupleDescriptor inputTupleDesc = reader->getTupleDesc(); 00649 inputTuple.compute(inputTupleDesc); 00650 00651 uint prevHashKey; 00652 uint hashKey; 00653 uint nextHashKey; 00654 00655 uint childPartIndex; 00656 bool writeToPartition; 00657 00658 uint statIndex; 00659 shared_array<uint> curSubPartStat; 00660 00661 /* 00662 * If partition source is from memory, 00663 * i.e. there's hash table to read from. 00664 */ 00665 if (partInfo.partitionMemory) { 00666 TupleData hashTableTuple; 00667 TupleDescriptor hashTableTupleDesc = hashInfo.inputDesc[curInputIndex]; 00668 00669 hashTableTuple.compute(hashTableTupleDesc); 00670 00671 while ((partInfo.hashTableReader)->getNext(hashTableTuple)) { 00672 writeToPartition = false; 00673 00674 hashKey = hashGen.hash( 00675 hashTableTuple, 00676 hashInfo.keyProj[curInputIndex], 00677 hashInfo.isKeyColVarChar[curInputIndex]); 00678 00679 childPartIndex = calculateChildIndex(hashKey, curInputIndex); 00680 00681 if (hashInfo.useJoinFilter[curInputIndex]) { 00682 /* 00683 * Use input filter if there is one. Note top level build input 00684 * does not have input filter. 00685 */ 00686 if (partitionLevel == 0) { 00687 writeToPartition = true; 00688 } else { 00689 prevHashKey = 00690 hashGenPrev.hash( 00691 hashTableTuple, 00692 hashInfo.keyProj[curInputIndex], 00693 hashInfo.isKeyColVarChar[curInputIndex]); 00694 if (joinFilter && 00695 joinFilter->test(prevHashKey % filterSize)) { 00696 writeToPartition = true; 00697 } else { 00698 filteredRowCountList[childPartIndex]++; 00699 } 00700 } 00701 } else { 00702 /* 00703 * Not using filter. 00704 */ 00705 writeToPartition = true; 00706 } 00707 00708 if (writeToPartition) { 00709 writerList[childPartIndex]->marshalTuple(hashTableTuple); 00710 00711 nextHashKey = hashGenNext.hash( 00712 hashTableTuple, 00713 hashInfo.keyProj[curInputIndex], 00714 hashInfo.isKeyColVarChar[curInputIndex]); 00715 00716 statIndex = nextHashKey % LhxSubPartCount; 00717 curSubPartStat = subPartStatList[childPartIndex]; 00718 curSubPartStat[statIndex]++; 00719 00720 /* 00721 * Set output filter for the other input. 00722 * Note: if the filter is used on the next level, "the other 00723 * input" could be the same if join sides are switched. 00724 */ 00725 if (!joinFilterList[childPartIndex]) { 00726 /* 00727 * Filter not allocated yet. 00728 */ 00729 joinFilterList[childPartIndex].reset( 00730 new dynamic_bitset<>(filterSize)); 00731 } 00732 joinFilterList[childPartIndex]->set(hashKey % filterSize); 00733 } 00734 } 00735 00736 if (isAggregate) { 00737 /* 00738 * release the agg exec stream hash table scratch pages 00739 * and initialize writer scratch pages. 00740 */ 00741 ((partInfo.hashTableReader)->getHashTable())->releaseResources(); 00742 for (int i = 0; i < writerList.size();i ++) { 00743 writerList[i]->allocateResources(); 00744 } 00745 } 00746 00747 /* 00748 * Done with tuples in the hash table. Next tuples will come from a 00749 * partition (stream or disk). 00750 */ 00751 partInfo.partitionMemory = false; 00752 00753 /* 00754 * The tuple for which hash table full is detected is not in the hash 00755 * table yet. 00756 */ 00757 inputTuple = partInfo.buildTuple; 00758 } 00759 00760 for (;;) { 00761 /* 00762 * Note that partInfo.buildTuple is an unconsumed tuple from the 00763 * reader. So when first time in this loop, isTupleConsumptionPending 00764 * returns true. 00765 */ 00766 if (!reader->isTupleConsumptionPending()) { 00767 if (reader->getState() == EXECBUF_EOS) { 00768 if (curInputIndex == getProbeInput()) { 00769 /* 00770 * Done with partitioning the 0th input. 00771 * The current plan is completely partitioned. 00772 */ 00773 return PartitionEndOfData; 00774 } else { 00775 curInputIndex = getProbeInput(); 00776 otherInputIndex = partInfo.numInputs - curInputIndex - 1; 00777 reader->close(); 00778 reader = &partInfo.probeReader; 00779 inputTupleDesc = reader->getTupleDesc(); 00780 inputTuple.compute(inputTupleDesc); 00781 continue; 00782 } 00783 } 00784 00785 if (!reader->demandData()) { 00786 if (partitionLevel == 0) { 00787 /* 00788 * Reading from a stream: indicate underflow to producer. 00789 */ 00790 return PartitionUnderflow; 00791 } else { 00792 if (curInputIndex == getProbeInput()) { 00793 /* 00794 * Done with partitioning the 0th input. 00795 * The current plan is completely partitioned. 00796 */ 00797 return PartitionEndOfData; 00798 } else { 00799 curInputIndex = getProbeInput(); 00800 reader->close(); 00801 reader = &partInfo.probeReader; 00802 inputTupleDesc = reader->getTupleDesc(); 00803 inputTuple.compute(inputTupleDesc); 00804 continue; 00805 } 00806 } 00807 } 00808 00809 reader->unmarshalTuple(inputTuple); 00810 } 00811 00812 writeToPartition = false; 00813 00814 hashKey = hashGen.hash( 00815 inputTuple, 00816 hashInfo.keyProj[curInputIndex], 00817 hashInfo.isKeyColVarChar[curInputIndex]); 00818 00819 childPartIndex = calculateChildIndex(hashKey, curInputIndex); 00820 00821 nextHashKey = hashGenNext.hash( 00822 inputTuple, 00823 hashInfo.keyProj[curInputIndex], 00824 hashInfo.isKeyColVarChar[curInputIndex]); 00825 00826 statIndex = nextHashKey % LhxSubPartCount; 00827 00828 if (!isAggregate) { 00829 if (hashInfo.useJoinFilter[curInputIndex]) { 00830 /* 00831 * Use input filter if there exists one 00832 */ 00833 if (isBuildChildPart(childPartIndex)) { 00834 /* 00835 * Build input. Note top level build input does not have 00836 * input filter. Use joinfilter from the probe input of 00837 * the previous level. 00838 */ 00839 if (partitionLevel == 0) { 00840 writeToPartition = true; 00841 } else { 00842 prevHashKey = 00843 hashGenPrev.hash( 00844 inputTuple, 00845 hashInfo.keyProj[curInputIndex], 00846 hashInfo.isKeyColVarChar[curInputIndex]); 00847 if (joinFilter && 00848 joinFilter->test(prevHashKey % filterSize)) { 00849 writeToPartition = true; 00850 } else { 00851 filteredRowCountList[childPartIndex]++; 00852 } 00853 } 00854 } else { 00855 /* 00856 * Probe input. 00857 * Use join filter from build input of the same 00858 * partitioning level. 00859 */ 00860 if (joinFilterList[getBuildChildPart(childPartIndex)] && 00861 joinFilterList[getBuildChildPart(childPartIndex)]-> 00862 test(hashKey % filterSize)) { 00863 writeToPartition = true; 00864 } else { 00865 filteredRowCountList[childPartIndex]++; 00866 } 00867 } 00868 } else { 00869 /* 00870 * Not using filter. 00871 */ 00872 writeToPartition = true; 00873 } 00874 00875 if (writeToPartition) { 00876 writerList[childPartIndex]->marshalTuple(inputTuple); 00877 curSubPartStat = subPartStatList[childPartIndex]; 00878 curSubPartStat[statIndex]++; 00879 00880 /* 00881 * Set output filter to be used by the other input. 00882 * Note: if the filter is used on the next level, "the other 00883 * input" could be the same if join sides are switched. 00884 */ 00885 if (!joinFilterList[childPartIndex]) { 00886 /* 00887 * Filter not allocated yet. 00888 */ 00889 joinFilterList[childPartIndex].reset( 00890 new dynamic_bitset<>(filterSize)); 00891 } 00892 joinFilterList[childPartIndex]->set(hashKey % filterSize); 00893 } 00894 } else { 00895 writerList[childPartIndex]->aggAndMarshalTuple(inputTuple); 00896 (subPartStatList[childPartIndex])[statIndex]++; 00897 } 00898 00899 reader->consumeTuple(); 00900 } 00901 }
void LhxPlan::createChildren | ( | LhxHashInfo const & | hashInfo, | |
bool | enableSubPartStat | |||
) |
Partition this plan and create child plan.
This is used in testing only.
Definition at line 903 of file LhxPartition.cpp.
References LhxPartitionReader::close(), close(), TupleData::compute(), LhxPartitionReader::consumeTuple(), LhxPartitionReader::demandData(), EXECBUF_EOS, firstChildPlan, LhxPartitionReader::getState(), LhxHashGenerator::hash(), LhxHashGenerator::init(), LhxHashInfo::inputDesc, LhxHashInfo::isKeyColVarChar, LhxPartitionReader::isTupleConsumptionPending(), LhxHashInfo::keyProj, LhxChildPartCount, LhxPartitionWriter::marshalTuple(), LhxPartitionWriter::open(), LhxPartitionReader::open(), partitionLevel, partitions, and LhxPartitionReader::unmarshalTuple().
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), and LhxHashTableTest::testInsert().
00906 { 00907 LhxHashGenerator hashGen; 00908 hashGen.init(partitionLevel + 1); 00909 00910 uint numInputs = hashInfo.inputDesc.size(); 00911 00912 vector<SharedLhxPartition> destPartitionList(LhxChildPartCount * numInputs); 00913 00914 LhxPartitionReader reader; 00915 LhxPartitionWriter writerList[LhxChildPartCount]; 00916 uint childNum, i, j; 00917 TupleData outputTuple; 00918 00919 /* 00920 * Generate partitions for each input. 00921 */ 00922 for (j = 0; j < numInputs; j ++) { 00923 reader.open(partitions[j], hashInfo); 00924 outputTuple.compute(hashInfo.inputDesc[j]); 00925 00926 for (i = 0; i < LhxChildPartCount; i ++) { 00927 uint index = j * LhxChildPartCount + i; 00928 destPartitionList[index].reset( 00929 new LhxPartition(partitions[j]->pExecStream)); 00930 destPartitionList[index]->inputIndex = j; 00931 writerList[i].open(destPartitionList[index], hashInfo); 00932 } 00933 00934 for (;;) { 00935 if (!reader.isTupleConsumptionPending()) { 00936 if (reader.getState() == EXECBUF_EOS) { 00937 /* 00938 * The current plan is completely partitioned. 00939 */ 00940 break; 00941 } 00942 if (!reader.demandData()) { 00943 break; 00944 } 00945 reader.unmarshalTuple(outputTuple); 00946 } 00947 00948 childNum = 00949 hashGen.hash( 00950 outputTuple, 00951 hashInfo.keyProj[j], 00952 hashInfo.isKeyColVarChar[j]) % LhxChildPartCount; 00953 00954 writerList[childNum].marshalTuple(outputTuple); 00955 reader.consumeTuple(); 00956 } 00957 00958 for (i = 0; i < LhxChildPartCount; i ++) { 00959 writerList[i].close(); 00960 } 00961 00962 /* 00963 * Partial aggregate hash tables used inside the writers(one HT 00964 * for each writer) share the same scratch buffer space. 00965 * Release the buffer pages used by these hash tables at the end, 00966 * after all writers have been closed(so that there will be no more 00967 * scratch page alloc calls). 00968 */ 00969 for (i = 0; i < LhxChildPartCount; i ++) { 00970 writerList[i].releaseResources(); 00971 } 00972 00973 reader.close(); 00974 } 00975 00976 /* 00977 * Create child plans consisting of one partition from each input. 00978 */ 00979 for (i = 0; i < LhxChildPartCount; i ++) { 00980 SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan()); 00981 vector<SharedLhxPartition> partitionList; 00982 partitionList.push_back(destPartitionList[i]); 00983 partitionList.push_back(destPartitionList[i + LhxChildPartCount]); 00984 00985 newChildPlan->init( 00986 WeakLhxPlan(shared_from_this()), 00987 partitionLevel + 1, 00988 partitionList, 00989 enableSubPartStat); 00990 00991 newChildPlan->addSibling(firstChildPlan); 00992 firstChildPlan = newChildPlan; 00993 } 00994 }
void LhxPlan::createChildren | ( | LhxPartitionInfo & | partInfo, | |
bool | enableSubPartStat, | |||
bool | enableSwing | |||
) |
Create child plan from partitions provided via partInfo.
Definition at line 996 of file LhxPartition.cpp.
References LhxPartitionInfo::destPartitionList, LhxPartitionInfo::filteredRowCountList, firstChildPlan, getProbeChildPart(), LhxPartitionInfo::joinFilterList, LhxChildPartCount, LhxPartitionInfo::numInputs, partitionLevel, and LhxPartitionInfo::subPartStatList.
01000 { 01001 uint i, j; 01002 01003 for (i = 0; i < LhxChildPartCount; i ++) { 01004 SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan()); 01005 vector<SharedLhxPartition> partitionList; 01006 vector<shared_array<uint> > subPartStats; 01007 VectorOfUint filteredRows; 01008 for (j = 0; j < partInfo.numInputs; j ++) { 01009 partitionList.push_back( 01010 partInfo.destPartitionList[i + LhxChildPartCount * j]); 01011 subPartStats.push_back( 01012 partInfo.subPartStatList[i + LhxChildPartCount * j]); 01013 filteredRows.push_back( 01014 partInfo.filteredRowCountList[i + LhxChildPartCount * j]); 01015 } 01016 newChildPlan->init( 01017 WeakLhxPlan(shared_from_this()), 01018 partitionLevel + 1, 01019 partitionList, 01020 subPartStats, 01021 partInfo.joinFilterList[getProbeChildPart(i)], 01022 filteredRows, 01023 enableSubPartStat, 01024 enableSwing); 01025 01026 newChildPlan->addSibling(firstChildPlan); 01027 firstChildPlan = newChildPlan; 01028 } 01029 partInfo.destPartitionList.clear(); 01030 partInfo.joinFilterList.clear(); 01031 }
uint LhxPlan::getPartitionLevel | ( | ) | [inline] |
Get the partition level of this plan.
Definition at line 505 of file LhxPartition.h.
References partitionLevel.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), and LhxJoinExecStream::open().
00506 { 00507 return partitionLevel; 00508 }
SharedLhxPartition LhxPlan::getBuildPartition | ( | ) | [inline] |
Get the partition corresponding to inputIndex.
Definition at line 525 of file LhxPartition.h.
References getBuildInput(), and partitions.
Referenced by LhxJoinExecStream::execute(), and LhxJoinExecStream::open().
00526 { 00527 return partitions[getBuildInput()]; 00528 }
SharedLhxPartition LhxPlan::getProbePartition | ( | ) | [inline] |
Definition at line 520 of file LhxPartition.h.
References getProbeInput(), and partitions.
Referenced by LhxJoinExecStream::execute().
00521 { 00522 return partitions[getProbeInput()]; 00523 }
SharedLhxPartition LhxPlan::getPartition | ( | uint | inputIndex | ) | [inline] |
Definition at line 530 of file LhxPartition.h.
References partitions.
Referenced by LhxAggExecStream::execute(), LhxAggExecStream::open(), and LhxHashTableTest::testInsert().
00531 { 00532 return partitions[inputIndex]; 00533 }
uint LhxPlan::getBuildInput | ( | ) | [inline] |
Definition at line 515 of file LhxPartition.h.
References joinSideToInputMap, and partitions.
Referenced by LhxJoinExecStream::execute(), getBuildChildPart(), getBuildPartition(), isBuildChildPart(), mapSubPartToChild(), LhxJoinExecStream::open(), LhxJoinExecStream::returnBuildInner(), and LhxJoinExecStream::returnBuildOuter().
00516 { 00517 return joinSideToInputMap[partitions.size() - 1]; 00518 }
uint LhxPlan::getProbeInput | ( | ) | [inline] |
Definition at line 510 of file LhxPartition.h.
References joinSideToInputMap.
Referenced by LhxJoinExecStream::execute(), generatePartitions(), getProbeChildPart(), getProbePartition(), isProbeChildPart(), mapSubPartToChild(), LhxJoinExecStream::returnProbeInner(), and LhxJoinExecStream::returnProbeOuter().
00511 { 00512 return joinSideToInputMap[0]; 00513 }
Definition at line 535 of file LhxPartition.h.
References joinSideToInputMap, and partitions.
Referenced by toString().
00536 { 00537 uint i = 0; 00538 while ((joinSideToInputMap[i] != inputIndex) 00539 && (i < partitions.size())) 00540 { 00541 i ++; 00542 } 00543 00544 return i; 00545 }
SharedLhxPlan LhxPlan::getFirstChild | ( | ) | [inline] |
Get the first child plan.
Definition at line 500 of file LhxPartition.h.
References firstChildPlan.
Referenced by LhxJoinExecStream::execute(), and LhxAggExecStream::execute().
00501 { 00502 return firstChildPlan; 00503 }
LhxPlan * LhxPlan::getFirstLeaf | ( | ) |
Get first leaf plan in dfs order.
Definition at line 1033 of file LhxPartition.cpp.
References firstChildPlan.
Referenced by LhxHashTableTest::testInsert().
01034 { 01035 if (!firstChildPlan) { 01036 return this; 01037 } else { 01038 return firstChildPlan->getFirstLeaf(); 01039 } 01040 }
LhxPlan * LhxPlan::getNextLeaf | ( | ) |
Get next leaf plan in dfs order.
Definition at line 1042 of file LhxPartition.cpp.
References parentPlan, and siblingPlan.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), and LhxHashTableTest::testInsert().
01043 { 01044 if (siblingPlan) { 01045 return siblingPlan->getFirstLeaf(); 01046 } else { 01047 WeakLhxPlan parent = this->parentPlan; 01048 SharedLhxPlan shared_parent = parent.lock(); 01049 01050 if (shared_parent) { 01051 return shared_parent->getNextLeaf(); 01052 } else { 01053 return NULL; 01054 } 01055 } 01056 }
void LhxPlan::close | ( | ) |
Close the plan tree.
Release any resource pointed to from this plan tree.
Definition at line 1058 of file LhxPartition.cpp.
References firstChildPlan, partitions, and siblingPlan.
Referenced by createChildren().
01059 { 01060 if (firstChildPlan) { 01061 firstChildPlan->close(); 01062 } 01063 01064 if (siblingPlan) { 01065 siblingPlan->close(); 01066 } 01067 01068 for (uint i = 0; i < partitions.size(); i ++) { 01069 if (partitions[i] && partitions[i]->segStream) { 01070 partitions[i]->segStream->close(); 01071 } 01072 } 01073 }
string LhxPlan::toString | ( | ) |
Print the content of the plan tree rooted at this plan.
Definition at line 1075 of file LhxPartition.cpp.
References childPartSize, filteredRowCount, firstChildPlan, getJoinSide(), inputSize, joinFilter, LhxChildPartCount, LhxSubPartCount, parentPlan, partitionLevel, partitions, siblingPlan, and subPartToChildMap.
Referenced by LhxJoinExecStream::execute(), and LhxAggExecStream::execute().
01076 { 01077 ostringstream planTrace; 01078 01079 planTrace << "\n" 01080 << "[Plan : addr = " << this << "]\n" 01081 << "[ level = " << partitionLevel << "]\n" 01082 << "[ parent = " << parentPlan.lock().get() << "]\n" 01083 << "[ firstChild = " << firstChildPlan.get() << "]\n" 01084 << "[ sibling = " << siblingPlan.get() << "]\n"; 01085 01086 /* 01087 * Print out the partitions. 01088 */ 01089 planTrace << "[joinFilter = "; 01090 if (joinFilter) { 01091 planTrace << joinFilter.get(); 01092 } 01093 planTrace << "]\n"; 01094 01095 for (uint i = 0; i < partitions.size(); i ++) { 01096 planTrace << "[Partition(" << i << ")]\n" 01097 << "[ inputIndex = " << partitions[i]->inputIndex << "]\n" 01098 << "[ join side = " << getJoinSide(partitions[i]->inputIndex) << "]\n" 01099 << "[ filteredRows = " << filteredRowCount[i] << "]\n" 01100 << "[ inputSize = " << inputSize[i] << "]\n"; 01101 planTrace << "[ childPartSize = "; 01102 if (childPartSize.size() > i) { 01103 shared_array<uint> oneChildPartSize = childPartSize[i]; 01104 if (oneChildPartSize) { 01105 for (uint j = 0; j < LhxChildPartCount; j ++) { 01106 planTrace << oneChildPartSize[j] << " "; 01107 } 01108 } 01109 } 01110 planTrace << "]\n"; 01111 } 01112 01113 planTrace << "[subPartToChildMap = "; 01114 if (subPartToChildMap) { 01115 for (uint i = 0; i < LhxSubPartCount; i ++) { 01116 planTrace << subPartToChildMap[i] << " "; 01117 } 01118 } 01119 planTrace << "]\n"; 01120 01121 SharedLhxPlan childPlan = firstChildPlan; 01122 01123 while (childPlan) { 01124 planTrace << childPlan->toString(); 01125 childPlan = childPlan->siblingPlan; 01126 } 01127 01128 return planTrace.str(); 01129 }
uint LhxPlan::partitionLevel [private] |
Definition at line 294 of file LhxPartition.h.
Referenced by createChildren(), generatePartitions(), getPartitionLevel(), init(), and toString().
vector<SharedLhxPartition> LhxPlan::partitions [private] |
Definition at line 295 of file LhxPartition.h.
Referenced by close(), createChildren(), getBuildInput(), getBuildPartition(), getJoinSide(), getPartition(), getProbePartition(), init(), mapSubPartToChild(), and toString().
shared_array<uint> LhxPlan::joinSideToInputMap [private] |
Definition at line 296 of file LhxPartition.h.
Referenced by getBuildInput(), getJoinSide(), getProbeInput(), and init().
shared_ptr<dynamic_bitset<> > LhxPlan::joinFilter [private] |
Definition at line 298 of file LhxPartition.h.
Referenced by generatePartitions(), init(), and toString().
shared_array<uint> LhxPlan::filteredRowCount [private] |
shared_array<uint> LhxPlan::subPartToChildMap [private] |
Definition at line 306 of file LhxPartition.h.
Referenced by calculateChildIndex(), init(), mapSubPartToChild(), and toString().
vector<shared_array<uint> > LhxPlan::childPartSize [private] |
shared_array<uint> LhxPlan::inputSize [private] |
WeakLhxPlan LhxPlan::parentPlan [private] |
SharedLhxPlan LhxPlan::firstChildPlan [private] |
Definition at line 323 of file LhxPartition.h.
Referenced by close(), createChildren(), getFirstChild(), getFirstLeaf(), and toString().
SharedLhxPlan LhxPlan::siblingPlan [private] |
Definition at line 324 of file LhxPartition.h.
Referenced by addSibling(), close(), getNextLeaf(), and toString().
const uint LhxPlan::LhxSubPartCount = 16 [static] |
Definition at line 358 of file LhxPartition.h.
Referenced by calculateChildIndex(), generatePartitions(), init(), mapSubPartToChild(), LhxPartitionInfo::open(), and toString().
const uint LhxPlan::LhxChildPartCount = 3 [static] |
Definition at line 359 of file LhxPartition.h.
Referenced by calculateChildIndex(), createChildren(), getBuildChildPart(), getProbeChildPart(), LhxAggExecStream::getResourceRequirements(), LhxPartitionInfo::init(), isBuildChildPart(), isProbeChildPart(), mapSubPartToChild(), LhxPartitionInfo::open(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), LhxHashTableTest::testInsert(), and toString().