LhxPlan Class Reference

#include <LhxPartition.h>

List of all members.

Public Member Functions

void init (WeakLhxPlan parentPlanInit, uint partitionLevelInit, vector< SharedLhxPartition > &partitionsInit, bool enableSubPartStat)
 Initialize a plan, with its input partitions and parent plan.
void init (WeakLhxPlan parentPlanInit, uint partitionLevelInit, vector< SharedLhxPartition > &partitionsInit, vector< shared_array< uint > > &subPartStats, shared_ptr< dynamic_bitset<> > filterInit, VectorOfUint &filteredRowsInit, bool enableSubPartStat, bool enableSwing)
 Initialize a plan.
LhxPartitionState generatePartitions (LhxHashInfo const &hashInfo, LhxPartitionInfo &partInfo)
 Generate partitions for the child plans.
void createChildren (LhxHashInfo const &hashInfo, bool enableSubPartStat)
 Partition this plan and create child plan.
void createChildren (LhxPartitionInfo &partInfo, bool enableSubPartStat, bool enableSwing)
 Create child plan from partitions provided via partInfo.
uint getPartitionLevel ()
 Get the partition level of this plan.
SharedLhxPartition getBuildPartition ()
 Get the partition corresponding to inputIndex.
SharedLhxPartition getProbePartition ()
SharedLhxPartition getPartition (uint inputIndex)
uint getBuildInput ()
uint getProbeInput ()
uint getJoinSide (uint inputIndex)
SharedLhxPlan getFirstChild ()
 Get the first child plan.
LhxPlangetFirstLeaf ()
 Get first leaf plan in dfs order.
LhxPlangetNextLeaf ()
 Get next leaf plan in dfs order.
void close ()
 Close the plan tree.
string toString ()
 Print the content of the plan tree rooted at this plan.

Static Public Attributes

static const uint LhxSubPartCount = 16
static const uint LhxChildPartCount = 3

Private Member Functions

void addSibling (SharedLhxPlan siblingPlan)
 Add sibling plan.
void mapSubPartToChild (vector< shared_array< uint > > &subPartStats)
 Using sub partition stats gathered at the previous partition level, map sub partitions to child partitions.
uint calculateChildIndex (uint hashKey, uint curInputIndex)
 Calculate the target child partition index based on the hashkey of a tuple.
bool isBuildChildPart (uint childPartIndex)
bool isProbeChildPart (uint childPartIndex)
uint getBuildChildPart (uint childPartIndex)
uint getProbeChildPart (uint childPartIndex)

Private Attributes

uint partitionLevel
vector< SharedLhxPartitionpartitions
shared_array< uintjoinSideToInputMap
shared_ptr< dynamic_bitset<> > joinFilter
shared_array< uintfilteredRowCount
shared_array< uintsubPartToChildMap
vector< shared_array< uint > > childPartSize
shared_array< uintinputSize
WeakLhxPlan parentPlan
SharedLhxPlan firstChildPlan
SharedLhxPlan siblingPlan


Detailed Description

Definition at line 291 of file LhxPartition.h.


Member Function Documentation

void LhxPlan::addSibling ( SharedLhxPlan  siblingPlan  )  [inline, private]

Add sibling plan.

Definition at line 495 of file LhxPartition.h.

References siblingPlan.

00496 {
00497     siblingPlan = siblingPlanInit;
00498 }

void LhxPlan::mapSubPartToChild ( vector< shared_array< uint > > &  subPartStats  )  [private]

Using sub partition stats gathered at the previous partition level, map sub partitions to child partitions.

The objective is to come up with child partitions of similar size.

Definition at line 544 of file LhxPartition.cpp.

References childPartSize, getBuildInput(), getProbeInput(), LhxChildPartCount, LhxSubPartCount, partitions, and subPartToChildMap.

Referenced by init().

00546 {
00547     uint numInputs = partitions.size();
00548     uint buildIndex =  getBuildInput();
00549     shared_array<uint> buildSubPartStat = subPartStats[buildIndex];
00550 
00551     if (!buildSubPartStat) {
00552         return;
00553     }
00554 
00555     uint i, j, k;
00556 
00557     subPartToChildMap.reset(new uint[LhxSubPartCount]);
00558 
00559     for (i = 0; i < numInputs; i ++) {
00560         childPartSize.push_back(
00561             shared_array<uint>(new uint[LhxChildPartCount]));
00562     }
00563 
00564     shared_array<uint> buildChildPartSize = childPartSize[buildIndex];
00565 
00566     for (i = 0; i < LhxChildPartCount; i ++) {
00567         buildChildPartSize[i] = 0;
00568     }
00569 
00570     j = 0;
00571     for (i = 0; i < LhxSubPartCount; i ++) {
00572         buildChildPartSize[j] += buildSubPartStat[i];
00573         subPartToChildMap[i] = j;
00574 
00575         k = 1;
00576         while (
00577             (buildChildPartSize[j]
00578                 > buildChildPartSize[(j + k) % LhxChildPartCount])
00579             && k < LhxChildPartCount)
00580         {
00581             k ++;
00582         }
00583 
00584         if (k == LhxChildPartCount) {
00585             // If current child partition is bigger than all other child
00586             // partitions, move to the next child
00587             j = (j + 1) % LhxChildPartCount;
00588         }
00589     }
00590 
00591     /*
00592      * This is simply stats keeping for the probe side child partitions.
00593      */
00594     if (numInputs == 2) {
00595         uint probeIndex = getProbeInput();
00596         shared_array<uint> probeChildPartSize = childPartSize[probeIndex];
00597         shared_array<uint> probeSubPartStat = subPartStats[probeIndex];
00598 
00599         for (i = 0; i < LhxChildPartCount; i ++) {
00600             probeChildPartSize[i] = 0;
00601         }
00602 
00603         for (i = 0; i < LhxSubPartCount; i ++) {
00604             probeChildPartSize[subPartToChildMap[i]] += probeSubPartStat[i];
00605         }
00606     }
00607 }

uint LhxPlan::calculateChildIndex ( uint  hashKey,
uint  curInputIndex 
) [private]

Calculate the target child partition index based on the hashkey of a tuple.

Definition at line 609 of file LhxPartition.cpp.

References LhxChildPartCount, LhxSubPartCount, and subPartToChildMap.

Referenced by generatePartitions().

00610 {
00611     if (subPartToChildMap) {
00612         return (subPartToChildMap[hashKey % LhxSubPartCount] +
00613             curInputIndex * LhxChildPartCount);
00614     } else {
00615         return (hashKey % LhxChildPartCount +
00616             curInputIndex * LhxChildPartCount);
00617     }
00618 }

bool LhxPlan::isBuildChildPart ( uint  childPartIndex  )  [inline, private]

Definition at line 547 of file LhxPartition.h.

References getBuildInput(), and LhxChildPartCount.

Referenced by generatePartitions().

00548 {
00549     return ((childPartIndex / LhxChildPartCount) == getBuildInput());
00550 }

bool LhxPlan::isProbeChildPart ( uint  childPartIndex  )  [inline, private]

Definition at line 552 of file LhxPartition.h.

References getProbeInput(), and LhxChildPartCount.

00553 {
00554     return ((childPartIndex / LhxChildPartCount) == getProbeInput());
00555 }

uint LhxPlan::getBuildChildPart ( uint  childPartIndex  )  [inline, private]

Definition at line 557 of file LhxPartition.h.

References getBuildInput(), and LhxChildPartCount.

Referenced by generatePartitions().

00558 {
00559     return ((childPartIndex % LhxChildPartCount) +
00560             getBuildInput() * LhxChildPartCount);
00561 }

uint LhxPlan::getProbeChildPart ( uint  childPartIndex  )  [inline, private]

Definition at line 563 of file LhxPartition.h.

References getProbeInput(), and LhxChildPartCount.

Referenced by createChildren().

00564 {
00565     return ((childPartIndex % LhxChildPartCount) +
00566             getProbeInput() * LhxChildPartCount);
00567 }

void LhxPlan::init ( WeakLhxPlan  parentPlanInit,
uint  partitionLevelInit,
vector< SharedLhxPartition > &  partitionsInit,
bool  enableSubPartStat 
)

Initialize a plan, with its input partitions and parent plan.

Definition at line 445 of file LhxPartition.cpp.

00450 {
00451     /*
00452      * No filter for this plan.
00453      */
00454     shared_ptr<dynamic_bitset<> > joinFilterInit =
00455         shared_ptr<dynamic_bitset<> >();
00456     vector<shared_array<uint> > subPartStatsInit;
00457     VectorOfUint filteredRows;
00458 
00459     for (uint i = 0; i < partitionsInit.size(); i ++) {
00460         subPartStatsInit.push_back(shared_array<uint>());
00461         filteredRows.push_back(0);
00462     }
00463 
00464     init(
00465         parentPlanInit, partitionLevelInit, partitionsInit,
00466         subPartStatsInit, joinFilterInit, filteredRows,
00467         enableSubPartStat, false);
00468 }

void LhxPlan::init ( WeakLhxPlan  parentPlanInit,
uint  partitionLevelInit,
vector< SharedLhxPartition > &  partitionsInit,
vector< shared_array< uint > > &  subPartStats,
shared_ptr< dynamic_bitset<> >  filterInit,
VectorOfUint filteredRowsInit,
bool  enableSubPartStat,
bool  enableSwing 
)

Initialize a plan.

Definition at line 470 of file LhxPartition.cpp.

References filteredRowCount, inputSize, joinFilter, joinSideToInputMap, LhxSubPartCount, mapSubPartToChild(), parentPlan, partitionLevel, partitions, and subPartToChildMap.

00479 {
00480     uint numInputs = partitionsInit.size();
00481 
00482     partitionLevel = partitionLevelInit;
00483     parentPlan   = parentPlanInit;
00484 
00485     /*
00486      * REVIEW(rchen 2006-08-08): if input sides swing, then the new build could
00487      * be the same as the probe input of the previous partition. This filter
00488      * will not filter any tuple as it tries to filter the very input the
00489      * filter is built on.
00490      */
00491     joinFilter = joinFilterInit;
00492 
00493     filteredRowCount.reset(new uint[numInputs]);
00494     inputSize.reset(new uint[numInputs]);
00495     joinSideToInputMap.reset(new uint[numInputs]);
00496     subPartToChildMap.reset();
00497 
00498     // REVIEW jvs 26-Aug-2006:  here "will be used" means once someone
00499     // gets around to true hybrid, right?
00500     /*
00501      * After support is added for repartitioning using stats (gathered during
00502      * previous round of partitioning), a bin-packing algorithm will be used to
00503      * keep in memory some subpartitions and output the rest to child
00504      * partitions of similar size.
00505      */
00506     for (int i = 0; i < numInputs; i ++) {
00507         partitions.push_back(partitionsInit[i]);
00508         filteredRowCount[i] = filteredRowsInit[i];
00509         joinSideToInputMap[i] = i;
00510 
00511         inputSize[i] = 0;
00512         shared_array<uint> inputSubPartStat = subPartStats[i];
00513 
00514         if (inputSubPartStat) {
00515             for (int k = 0; k < LhxSubPartCount; k ++) {
00516                 inputSize[i] += inputSubPartStat[k];
00517             }
00518         }
00519     }
00520 
00521     /*
00522      * Map join side to input, using partitions stats associated with each
00523      * input. The input with the smaller side witll be the build side for the
00524      * join: joinSide for the build will map to the index of this input.
00525      */
00526     if (enableSwing &&
00527         (numInputs == 2) && (inputSize[0] < inputSize[1])) {
00528         joinSideToInputMap[0] = 1;
00529         joinSideToInputMap[1] = 0;
00530     }
00531 
00532     subPartToChildMap.reset();
00533 
00534     if (enableSubPartStat) {
00535         /*
00536          * Use build side sub part stat to divide both inputs into child
00537          * partitions. Needs to be called after the join sides have been
00538          * assigned.
00539          */
00540         mapSubPartToChild(subPartStats);
00541     }
00542 }

LhxPartitionState LhxPlan::generatePartitions ( LhxHashInfo const &  hashInfo,
LhxPartitionInfo partInfo 
)

Generate partitions for the child plans.

Definition at line 620 of file LhxPartition.cpp.

References LhxPartitionInfo::buildTuple, calculateChildIndex(), LhxPartitionReader::close(), TupleData::compute(), LhxPartitionReader::consumeTuple(), LhxPartitionInfo::curInputIndex, LhxPartitionReader::demandData(), EXECBUF_EOS, LhxPartitionInfo::filteredRowCountList, getBuildChildPart(), getProbeInput(), LhxPartitionReader::getState(), LhxPartitionReader::getTupleDesc(), LhxHashGenerator::hash(), LhxPartitionInfo::hashTableReader, LhxHashGenerator::init(), LhxHashInfo::inputDesc, isBuildChildPart(), LhxHashInfo::isKeyColVarChar, LhxPartitionReader::isTupleConsumptionPending(), joinFilter, LhxPartitionInfo::joinFilterList, LhxHashInfo::keyProj, LhxSubPartCount, LhxPartitionInfo::numInputs, PartitionEndOfData, partitionLevel, LhxPartitionInfo::partitionMemory, PartitionUnderflow, LhxPartitionInfo::probeReader, LhxPartitionInfo::reader, LhxPartitionInfo::subPartStatList, LhxPartitionReader::unmarshalTuple(), LhxHashInfo::useJoinFilter, and LhxPartitionInfo::writerList.

Referenced by LhxJoinExecStream::execute(), and LhxAggExecStream::execute().

00623 {
00624     // REVIEW jvs 26-Aug-2006:  modulo on this is computed below; make
00625     // sure compiler is optimizing to bitmask, or do it by hand
00626     uint filterSize = 4096;
00627     bool isAggregate = (partInfo.numInputs == 1);
00628 
00629     LhxHashGenerator hashGenPrev;
00630     LhxHashGenerator hashGen;
00631     LhxHashGenerator hashGenNext;
00632 
00633     hashGenPrev.init(partitionLevel);
00634     hashGen.init(partitionLevel + 1);
00635     hashGenNext.init(partitionLevel + 2);
00636 
00637     LhxPartitionReader *&reader = partInfo.reader;
00638     vector<SharedLhxPartitionWriter> &writerList = partInfo.writerList;
00639     vector<shared_ptr<dynamic_bitset<> > > &joinFilterList =
00640         partInfo.joinFilterList;
00641     vector<shared_array<uint> > &subPartStatList = partInfo.subPartStatList;
00642     shared_array<uint> &filteredRowCountList = partInfo.filteredRowCountList;
00643 
00644     uint &curInputIndex = partInfo.curInputIndex;
00645     uint otherInputIndex = partInfo.numInputs - curInputIndex - 1;
00646 
00647     TupleData inputTuple;
00648     TupleDescriptor inputTupleDesc = reader->getTupleDesc();
00649     inputTuple.compute(inputTupleDesc);
00650 
00651     uint prevHashKey;
00652     uint hashKey;
00653     uint nextHashKey;
00654 
00655     uint childPartIndex;
00656     bool writeToPartition;
00657 
00658     uint statIndex;
00659     shared_array<uint> curSubPartStat;
00660 
00661     /*
00662      * If partition source is from memory,
00663      * i.e. there's hash table to read from.
00664      */
00665     if (partInfo.partitionMemory) {
00666         TupleData hashTableTuple;
00667         TupleDescriptor hashTableTupleDesc = hashInfo.inputDesc[curInputIndex];
00668 
00669         hashTableTuple.compute(hashTableTupleDesc);
00670 
00671         while ((partInfo.hashTableReader)->getNext(hashTableTuple)) {
00672             writeToPartition = false;
00673 
00674             hashKey = hashGen.hash(
00675                 hashTableTuple,
00676                 hashInfo.keyProj[curInputIndex],
00677                 hashInfo.isKeyColVarChar[curInputIndex]);
00678 
00679             childPartIndex = calculateChildIndex(hashKey, curInputIndex);
00680 
00681             if (hashInfo.useJoinFilter[curInputIndex]) {
00682                 /*
00683                  * Use input filter if there is one. Note top level build input
00684                  * does not have input filter.
00685                  */
00686                 if (partitionLevel == 0) {
00687                     writeToPartition = true;
00688                 } else {
00689                     prevHashKey =
00690                         hashGenPrev.hash(
00691                             hashTableTuple,
00692                             hashInfo.keyProj[curInputIndex],
00693                             hashInfo.isKeyColVarChar[curInputIndex]);
00694                     if (joinFilter &&
00695                         joinFilter->test(prevHashKey % filterSize)) {
00696                         writeToPartition = true;
00697                     } else {
00698                         filteredRowCountList[childPartIndex]++;
00699                     }
00700                 }
00701             } else {
00702                 /*
00703                  * Not using filter.
00704                  */
00705                 writeToPartition = true;
00706             }
00707 
00708             if (writeToPartition) {
00709                 writerList[childPartIndex]->marshalTuple(hashTableTuple);
00710 
00711                 nextHashKey = hashGenNext.hash(
00712                     hashTableTuple,
00713                     hashInfo.keyProj[curInputIndex],
00714                     hashInfo.isKeyColVarChar[curInputIndex]);
00715 
00716                 statIndex = nextHashKey % LhxSubPartCount;
00717                 curSubPartStat = subPartStatList[childPartIndex];
00718                 curSubPartStat[statIndex]++;
00719 
00720                 /*
00721                  * Set output filter for the other input.
00722                  * Note: if the filter is used on the next level, "the other
00723                  * input" could be the same if join sides are switched.
00724                  */
00725                 if (!joinFilterList[childPartIndex]) {
00726                     /*
00727                      * Filter not allocated yet.
00728                      */
00729                     joinFilterList[childPartIndex].reset(
00730                         new dynamic_bitset<>(filterSize));
00731                 }
00732                 joinFilterList[childPartIndex]->set(hashKey % filterSize);
00733             }
00734         }
00735 
00736         if (isAggregate) {
00737             /*
00738              * release the agg exec stream hash table scratch pages
00739              * and initialize writer scratch pages.
00740              */
00741             ((partInfo.hashTableReader)->getHashTable())->releaseResources();
00742             for (int i = 0; i < writerList.size();i ++) {
00743                 writerList[i]->allocateResources();
00744             }
00745         }
00746 
00747         /*
00748          * Done with tuples in the hash table. Next tuples will come from a
00749          * partition (stream or disk).
00750          */
00751         partInfo.partitionMemory = false;
00752 
00753         /*
00754          * The tuple for which hash table full is detected is not in the hash
00755          * table yet.
00756          */
00757         inputTuple = partInfo.buildTuple;
00758     }
00759 
00760     for (;;) {
00761         /*
00762          * Note that partInfo.buildTuple is an unconsumed tuple from the
00763          * reader. So when first time in this loop, isTupleConsumptionPending
00764          * returns true.
00765          */
00766         if (!reader->isTupleConsumptionPending()) {
00767             if (reader->getState() == EXECBUF_EOS) {
00768                 if (curInputIndex == getProbeInput()) {
00769                     /*
00770                      * Done with partitioning the 0th input.
00771                      * The current plan is completely partitioned.
00772                      */
00773                     return PartitionEndOfData;
00774                 } else {
00775                     curInputIndex = getProbeInput();
00776                     otherInputIndex = partInfo.numInputs - curInputIndex - 1;
00777                     reader->close();
00778                     reader = &partInfo.probeReader;
00779                     inputTupleDesc = reader->getTupleDesc();
00780                     inputTuple.compute(inputTupleDesc);
00781                     continue;
00782                 }
00783             }
00784 
00785             if (!reader->demandData()) {
00786                 if (partitionLevel == 0) {
00787                     /*
00788                      * Reading from a stream: indicate underflow to producer.
00789                      */
00790                     return PartitionUnderflow;
00791                 } else {
00792                     if (curInputIndex == getProbeInput()) {
00793                         /*
00794                          * Done with partitioning the 0th input.
00795                          * The current plan is completely partitioned.
00796                          */
00797                         return PartitionEndOfData;
00798                     } else {
00799                         curInputIndex = getProbeInput();
00800                         reader->close();
00801                         reader = &partInfo.probeReader;
00802                         inputTupleDesc = reader->getTupleDesc();
00803                         inputTuple.compute(inputTupleDesc);
00804                         continue;
00805                     }
00806                 }
00807             }
00808 
00809             reader->unmarshalTuple(inputTuple);
00810         }
00811 
00812         writeToPartition = false;
00813 
00814         hashKey = hashGen.hash(
00815             inputTuple,
00816             hashInfo.keyProj[curInputIndex],
00817             hashInfo.isKeyColVarChar[curInputIndex]);
00818 
00819         childPartIndex = calculateChildIndex(hashKey, curInputIndex);
00820 
00821         nextHashKey = hashGenNext.hash(
00822             inputTuple,
00823             hashInfo.keyProj[curInputIndex],
00824             hashInfo.isKeyColVarChar[curInputIndex]);
00825 
00826         statIndex = nextHashKey % LhxSubPartCount;
00827 
00828         if (!isAggregate) {
00829             if (hashInfo.useJoinFilter[curInputIndex]) {
00830                 /*
00831                  * Use input filter if there exists one
00832                  */
00833                 if (isBuildChildPart(childPartIndex)) {
00834                     /*
00835                      * Build input.  Note top level build input does not have
00836                      * input filter.  Use joinfilter from the probe input of
00837                      * the previous level.
00838                      */
00839                     if (partitionLevel == 0) {
00840                         writeToPartition = true;
00841                     } else {
00842                         prevHashKey =
00843                             hashGenPrev.hash(
00844                                 inputTuple,
00845                                 hashInfo.keyProj[curInputIndex],
00846                                 hashInfo.isKeyColVarChar[curInputIndex]);
00847                         if (joinFilter &&
00848                             joinFilter->test(prevHashKey % filterSize)) {
00849                             writeToPartition = true;
00850                         } else {
00851                             filteredRowCountList[childPartIndex]++;
00852                         }
00853                     }
00854                 } else {
00855                     /*
00856                      * Probe input.
00857                      * Use join filter from build input of the same
00858                      * partitioning level.
00859                      */
00860                     if (joinFilterList[getBuildChildPart(childPartIndex)] &&
00861                         joinFilterList[getBuildChildPart(childPartIndex)]->
00862                         test(hashKey % filterSize)) {
00863                         writeToPartition = true;
00864                     } else {
00865                         filteredRowCountList[childPartIndex]++;
00866                     }
00867                 }
00868             } else {
00869                 /*
00870                  * Not using filter.
00871                  */
00872                 writeToPartition = true;
00873             }
00874 
00875             if (writeToPartition) {
00876                 writerList[childPartIndex]->marshalTuple(inputTuple);
00877                 curSubPartStat = subPartStatList[childPartIndex];
00878                 curSubPartStat[statIndex]++;
00879 
00880                 /*
00881                  * Set output filter to be used by the other input.
00882                  * Note: if the filter is used on the next level, "the other
00883                  * input" could be the same if join sides are switched.
00884                  */
00885                 if (!joinFilterList[childPartIndex]) {
00886                     /*
00887                      * Filter not allocated yet.
00888                      */
00889                     joinFilterList[childPartIndex].reset(
00890                         new dynamic_bitset<>(filterSize));
00891                 }
00892                 joinFilterList[childPartIndex]->set(hashKey % filterSize);
00893             }
00894         } else {
00895             writerList[childPartIndex]->aggAndMarshalTuple(inputTuple);
00896             (subPartStatList[childPartIndex])[statIndex]++;
00897         }
00898 
00899         reader->consumeTuple();
00900     }
00901 }

void LhxPlan::createChildren ( LhxHashInfo const &  hashInfo,
bool  enableSubPartStat 
)

Partition this plan and create child plan.

This is used in testing only.

Definition at line 903 of file LhxPartition.cpp.

References LhxPartitionReader::close(), close(), TupleData::compute(), LhxPartitionReader::consumeTuple(), LhxPartitionReader::demandData(), EXECBUF_EOS, firstChildPlan, LhxPartitionReader::getState(), LhxHashGenerator::hash(), LhxHashGenerator::init(), LhxHashInfo::inputDesc, LhxHashInfo::isKeyColVarChar, LhxPartitionReader::isTupleConsumptionPending(), LhxHashInfo::keyProj, LhxChildPartCount, LhxPartitionWriter::marshalTuple(), LhxPartitionWriter::open(), LhxPartitionReader::open(), partitionLevel, partitions, and LhxPartitionReader::unmarshalTuple().

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), and LhxHashTableTest::testInsert().

00906 {
00907     LhxHashGenerator hashGen;
00908     hashGen.init(partitionLevel + 1);
00909 
00910     uint numInputs = hashInfo.inputDesc.size();
00911 
00912     vector<SharedLhxPartition> destPartitionList(LhxChildPartCount * numInputs);
00913 
00914     LhxPartitionReader reader;
00915     LhxPartitionWriter writerList[LhxChildPartCount];
00916     uint childNum, i, j;
00917     TupleData outputTuple;
00918 
00919     /*
00920      * Generate partitions for each input.
00921      */
00922     for (j = 0; j < numInputs; j ++) {
00923         reader.open(partitions[j], hashInfo);
00924         outputTuple.compute(hashInfo.inputDesc[j]);
00925 
00926         for (i = 0; i < LhxChildPartCount; i ++) {
00927             uint index = j * LhxChildPartCount + i;
00928             destPartitionList[index].reset(
00929                 new LhxPartition(partitions[j]->pExecStream));
00930             destPartitionList[index]->inputIndex = j;
00931             writerList[i].open(destPartitionList[index], hashInfo);
00932         }
00933 
00934         for (;;) {
00935             if (!reader.isTupleConsumptionPending()) {
00936                 if (reader.getState() == EXECBUF_EOS) {
00937                     /*
00938                      * The current plan is completely partitioned.
00939                      */
00940                     break;
00941                 }
00942                 if (!reader.demandData()) {
00943                     break;
00944                 }
00945                 reader.unmarshalTuple(outputTuple);
00946             }
00947 
00948             childNum =
00949                 hashGen.hash(
00950                     outputTuple,
00951                     hashInfo.keyProj[j],
00952                     hashInfo.isKeyColVarChar[j]) % LhxChildPartCount;
00953 
00954             writerList[childNum].marshalTuple(outputTuple);
00955             reader.consumeTuple();
00956         }
00957 
00958         for (i = 0; i < LhxChildPartCount; i ++) {
00959             writerList[i].close();
00960         }
00961 
00962         /*
00963          * Partial aggregate hash tables used inside the writers(one HT
00964          * for each writer) share the same scratch buffer space.
00965          * Release the buffer pages used by these hash tables at the end,
00966          * after all writers have been closed(so that there will be no more
00967          * scratch page alloc calls).
00968          */
00969         for (i = 0; i < LhxChildPartCount; i ++) {
00970             writerList[i].releaseResources();
00971         }
00972 
00973         reader.close();
00974     }
00975 
00976     /*
00977      * Create child plans consisting of one partition from each input.
00978      */
00979     for (i = 0; i < LhxChildPartCount; i ++) {
00980         SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan());
00981         vector<SharedLhxPartition> partitionList;
00982         partitionList.push_back(destPartitionList[i]);
00983         partitionList.push_back(destPartitionList[i + LhxChildPartCount]);
00984 
00985         newChildPlan->init(
00986             WeakLhxPlan(shared_from_this()),
00987             partitionLevel + 1,
00988             partitionList,
00989             enableSubPartStat);
00990 
00991         newChildPlan->addSibling(firstChildPlan);
00992         firstChildPlan = newChildPlan;
00993     }
00994 }

void LhxPlan::createChildren ( LhxPartitionInfo partInfo,
bool  enableSubPartStat,
bool  enableSwing 
)

Create child plan from partitions provided via partInfo.

Definition at line 996 of file LhxPartition.cpp.

References LhxPartitionInfo::destPartitionList, LhxPartitionInfo::filteredRowCountList, firstChildPlan, getProbeChildPart(), LhxPartitionInfo::joinFilterList, LhxChildPartCount, LhxPartitionInfo::numInputs, partitionLevel, and LhxPartitionInfo::subPartStatList.

01000 {
01001     uint i, j;
01002 
01003     for (i = 0; i < LhxChildPartCount; i ++) {
01004         SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan());
01005         vector<SharedLhxPartition> partitionList;
01006         vector<shared_array<uint> > subPartStats;
01007         VectorOfUint filteredRows;
01008         for (j = 0; j < partInfo.numInputs; j ++) {
01009             partitionList.push_back(
01010                 partInfo.destPartitionList[i + LhxChildPartCount * j]);
01011             subPartStats.push_back(
01012                 partInfo.subPartStatList[i + LhxChildPartCount * j]);
01013             filteredRows.push_back(
01014                 partInfo.filteredRowCountList[i + LhxChildPartCount * j]);
01015         }
01016         newChildPlan->init(
01017             WeakLhxPlan(shared_from_this()),
01018             partitionLevel + 1,
01019             partitionList,
01020             subPartStats,
01021             partInfo.joinFilterList[getProbeChildPart(i)],
01022             filteredRows,
01023             enableSubPartStat,
01024             enableSwing);
01025 
01026         newChildPlan->addSibling(firstChildPlan);
01027         firstChildPlan = newChildPlan;
01028     }
01029     partInfo.destPartitionList.clear();
01030     partInfo.joinFilterList.clear();
01031 }

uint LhxPlan::getPartitionLevel (  )  [inline]

Get the partition level of this plan.

Definition at line 505 of file LhxPartition.h.

References partitionLevel.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), and LhxJoinExecStream::open().

00506 {
00507     return partitionLevel;
00508 }

SharedLhxPartition LhxPlan::getBuildPartition (  )  [inline]

Get the partition corresponding to inputIndex.

Definition at line 525 of file LhxPartition.h.

References getBuildInput(), and partitions.

Referenced by LhxJoinExecStream::execute(), and LhxJoinExecStream::open().

00526 {
00527     return partitions[getBuildInput()];
00528 }

SharedLhxPartition LhxPlan::getProbePartition (  )  [inline]

Definition at line 520 of file LhxPartition.h.

References getProbeInput(), and partitions.

Referenced by LhxJoinExecStream::execute().

00521 {
00522     return partitions[getProbeInput()];
00523 }

SharedLhxPartition LhxPlan::getPartition ( uint  inputIndex  )  [inline]

Definition at line 530 of file LhxPartition.h.

References partitions.

Referenced by LhxAggExecStream::execute(), LhxAggExecStream::open(), and LhxHashTableTest::testInsert().

00531 {
00532     return partitions[inputIndex];
00533 }

uint LhxPlan::getBuildInput (  )  [inline]

Definition at line 515 of file LhxPartition.h.

References joinSideToInputMap, and partitions.

Referenced by LhxJoinExecStream::execute(), getBuildChildPart(), getBuildPartition(), isBuildChildPart(), mapSubPartToChild(), LhxJoinExecStream::open(), LhxJoinExecStream::returnBuildInner(), and LhxJoinExecStream::returnBuildOuter().

00516 {
00517     return joinSideToInputMap[partitions.size() - 1];
00518 }

uint LhxPlan::getProbeInput (  )  [inline]

Definition at line 510 of file LhxPartition.h.

References joinSideToInputMap.

Referenced by LhxJoinExecStream::execute(), generatePartitions(), getProbeChildPart(), getProbePartition(), isProbeChildPart(), mapSubPartToChild(), LhxJoinExecStream::returnProbeInner(), and LhxJoinExecStream::returnProbeOuter().

00511 {
00512     return joinSideToInputMap[0];
00513 }

uint LhxPlan::getJoinSide ( uint  inputIndex  )  [inline]

Definition at line 535 of file LhxPartition.h.

References joinSideToInputMap, and partitions.

Referenced by toString().

00536 {
00537     uint i = 0;
00538     while ((joinSideToInputMap[i] != inputIndex)
00539         && (i < partitions.size()))
00540     {
00541         i ++;
00542     }
00543 
00544     return i;
00545 }

SharedLhxPlan LhxPlan::getFirstChild (  )  [inline]

Get the first child plan.

Definition at line 500 of file LhxPartition.h.

References firstChildPlan.

Referenced by LhxJoinExecStream::execute(), and LhxAggExecStream::execute().

00501 {
00502     return firstChildPlan;
00503 }

LhxPlan * LhxPlan::getFirstLeaf (  ) 

Get first leaf plan in dfs order.

Definition at line 1033 of file LhxPartition.cpp.

References firstChildPlan.

Referenced by LhxHashTableTest::testInsert().

01034 {
01035     if (!firstChildPlan) {
01036         return this;
01037     } else {
01038         return firstChildPlan->getFirstLeaf();
01039     }
01040 }

LhxPlan * LhxPlan::getNextLeaf (  ) 

Get next leaf plan in dfs order.

Definition at line 1042 of file LhxPartition.cpp.

References parentPlan, and siblingPlan.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), and LhxHashTableTest::testInsert().

01043 {
01044     if (siblingPlan) {
01045         return siblingPlan->getFirstLeaf();
01046     } else {
01047         WeakLhxPlan parent = this->parentPlan;
01048         SharedLhxPlan shared_parent = parent.lock();
01049 
01050         if (shared_parent) {
01051             return shared_parent->getNextLeaf();
01052         } else {
01053             return NULL;
01054         }
01055     }
01056 }

void LhxPlan::close (  ) 

Close the plan tree.

Release any resource pointed to from this plan tree.

Definition at line 1058 of file LhxPartition.cpp.

References firstChildPlan, partitions, and siblingPlan.

Referenced by createChildren().

01059 {
01060     if (firstChildPlan) {
01061         firstChildPlan->close();
01062     }
01063 
01064     if (siblingPlan) {
01065         siblingPlan->close();
01066     }
01067 
01068     for (uint i = 0; i < partitions.size(); i ++) {
01069         if (partitions[i] && partitions[i]->segStream) {
01070             partitions[i]->segStream->close();
01071         }
01072     }
01073 }

string LhxPlan::toString (  ) 

Print the content of the plan tree rooted at this plan.

Returns:
the string representation of this plan tree.

Definition at line 1075 of file LhxPartition.cpp.

References childPartSize, filteredRowCount, firstChildPlan, getJoinSide(), inputSize, joinFilter, LhxChildPartCount, LhxSubPartCount, parentPlan, partitionLevel, partitions, siblingPlan, and subPartToChildMap.

Referenced by LhxJoinExecStream::execute(), and LhxAggExecStream::execute().

01076 {
01077     ostringstream planTrace;
01078 
01079     planTrace << "\n"
01080               << "[Plan : addr       = " << this                 << "]\n"
01081               << "[       level      = " << partitionLevel       << "]\n"
01082               << "[       parent     = " << parentPlan.lock().get()  << "]\n"
01083               << "[       firstChild = " << firstChildPlan.get() << "]\n"
01084               << "[       sibling    = " << siblingPlan.get()    << "]\n";
01085 
01086     /*
01087      * Print out the partitions.
01088      */
01089     planTrace << "[joinFilter = ";
01090     if (joinFilter) {
01091         planTrace << joinFilter.get();
01092     }
01093     planTrace << "]\n";
01094 
01095     for (uint i = 0; i < partitions.size(); i ++) {
01096         planTrace << "[Partition(" << i << ")]\n"
01097                   << "[       inputIndex     = " << partitions[i]->inputIndex << "]\n"
01098                   << "[       join side      = " << getJoinSide(partitions[i]->inputIndex) << "]\n"
01099                   << "[       filteredRows   = " << filteredRowCount[i] << "]\n"
01100                   << "[       inputSize      = " << inputSize[i] << "]\n";
01101         planTrace << "[       childPartSize  = ";
01102         if (childPartSize.size() > i) {
01103             shared_array<uint> oneChildPartSize = childPartSize[i];
01104             if (oneChildPartSize) {
01105                 for (uint j = 0; j < LhxChildPartCount; j ++) {
01106                     planTrace << oneChildPartSize[j] << " ";
01107                 }
01108             }
01109         }
01110         planTrace << "]\n";
01111     }
01112 
01113     planTrace << "[subPartToChildMap = ";
01114     if (subPartToChildMap) {
01115         for (uint i = 0; i < LhxSubPartCount; i ++) {
01116             planTrace << subPartToChildMap[i] << " ";
01117         }
01118     }
01119     planTrace << "]\n";
01120 
01121     SharedLhxPlan childPlan = firstChildPlan;
01122 
01123     while (childPlan) {
01124         planTrace << childPlan->toString();
01125         childPlan = childPlan->siblingPlan;
01126     }
01127 
01128     return planTrace.str();
01129 }


Member Data Documentation

uint LhxPlan::partitionLevel [private]

Definition at line 294 of file LhxPartition.h.

Referenced by createChildren(), generatePartitions(), getPartitionLevel(), init(), and toString().

vector<SharedLhxPartition> LhxPlan::partitions [private]

Definition at line 295 of file LhxPartition.h.

Referenced by close(), createChildren(), getBuildInput(), getBuildPartition(), getJoinSide(), getPartition(), getProbePartition(), init(), mapSubPartToChild(), and toString().

shared_array<uint> LhxPlan::joinSideToInputMap [private]

Definition at line 296 of file LhxPartition.h.

Referenced by getBuildInput(), getJoinSide(), getProbeInput(), and init().

shared_ptr<dynamic_bitset<> > LhxPlan::joinFilter [private]

Definition at line 298 of file LhxPartition.h.

Referenced by generatePartitions(), init(), and toString().

shared_array<uint> LhxPlan::filteredRowCount [private]

Definition at line 299 of file LhxPartition.h.

Referenced by init(), and toString().

shared_array<uint> LhxPlan::subPartToChildMap [private]

Definition at line 306 of file LhxPartition.h.

Referenced by calculateChildIndex(), init(), mapSubPartToChild(), and toString().

vector<shared_array<uint> > LhxPlan::childPartSize [private]

Definition at line 307 of file LhxPartition.h.

Referenced by mapSubPartToChild(), and toString().

shared_array<uint> LhxPlan::inputSize [private]

Definition at line 309 of file LhxPartition.h.

Referenced by init(), and toString().

WeakLhxPlan LhxPlan::parentPlan [private]

Definition at line 322 of file LhxPartition.h.

Referenced by getNextLeaf(), init(), and toString().

SharedLhxPlan LhxPlan::firstChildPlan [private]

Definition at line 323 of file LhxPartition.h.

Referenced by close(), createChildren(), getFirstChild(), getFirstLeaf(), and toString().

SharedLhxPlan LhxPlan::siblingPlan [private]

Definition at line 324 of file LhxPartition.h.

Referenced by addSibling(), close(), getNextLeaf(), and toString().

const uint LhxPlan::LhxSubPartCount = 16 [static]

Definition at line 358 of file LhxPartition.h.

Referenced by calculateChildIndex(), generatePartitions(), init(), mapSubPartToChild(), LhxPartitionInfo::open(), and toString().

const uint LhxPlan::LhxChildPartCount = 3 [static]

Definition at line 359 of file LhxPartition.h.

Referenced by calculateChildIndex(), createChildren(), getBuildChildPart(), getProbeChildPart(), LhxAggExecStream::getResourceRequirements(), LhxPartitionInfo::init(), isBuildChildPart(), isProbeChildPart(), mapSubPartToChild(), LhxPartitionInfo::open(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), LhxHashTableTest::testInsert(), and toString().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:39 2009 for Fennel by  doxygen 1.5.1