LhxPartition.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxPartition.h"
00025 #include "fennel/hashexe/LhxHashGenerator.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $");
00029 
00030 void LhxPartitionWriter::open(
00031     SharedLhxPartition destPartitionInit,
00032     LhxHashInfo const &hashInfo)
00033 {
00034     destPartition = destPartitionInit;
00035 
00036     tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00037 
00038     pSegOutputStream = SegOutputStream::newSegOutputStream(
00039         hashInfo.externalSegmentAccessor);
00040     destPartition->segStream =
00041         SegStreamAllocation::newSegStreamAllocation();
00042     destPartition->segStream->beginWrite(pSegOutputStream);
00043 
00044     isAggregate = false;
00045 }
00046 
00047 void LhxPartitionWriter::open(
00048     SharedLhxPartition destPartitionInit,
00049     LhxHashInfo &hashInfo,
00050     AggComputerList *aggList,
00051     uint numWriterCachePages)
00052 {
00053     destPartition = destPartitionInit;
00054     tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00055 
00056     pSegOutputStream = SegOutputStream::newSegOutputStream(
00057         hashInfo.externalSegmentAccessor);
00058     destPartition->segStream =
00059         SegStreamAllocation::newSegStreamAllocation();
00060     destPartition->segStream->beginWrite(pSegOutputStream);
00061 
00062     isAggregate = true;
00063     /*
00064      * Any partition level is fine since the data is hash partitioned already.
00065      */
00066     uint partitionLevel = 0;
00067     uint savedNumCachePages = hashInfo.numCachePages;
00068     hashInfo.numCachePages = numWriterCachePages;
00069 
00070     hashTable.init(
00071         partitionLevel,
00072         hashInfo,
00073         aggList,
00074         destPartition->inputIndex);
00075     hashTableReader.init(&hashTable, hashInfo, destPartition->inputIndex);
00076 
00077     hashInfo.numCachePages = savedNumCachePages;
00078 
00079     uint cndKeys = hashInfo.cndKeys.back();
00080     uint usablePageSize =
00081         (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize();
00082 
00083     hashTable.calculateNumSlots(cndKeys, usablePageSize, numWriterCachePages);
00084 
00085     partialAggTuple.compute(hashInfo.inputDesc[destPartition->inputIndex]);
00086 }
00087 
00088 void LhxPartitionWriter::close()
00089 {
00090     if (isAggregate) {
00091         /*
00092          * Write out the remaining partial aggregates in the local hash table.
00093          */
00094         while (hashTableReader.getNext(partialAggTuple)) {
00095             uint tupleStorageLength =
00096                 tupleAccessor.getByteCount(partialAggTuple);
00097             PBuffer pDestBuf =
00098                 pSegOutputStream->getWritePointer(tupleStorageLength);
00099             tupleAccessor.marshal(partialAggTuple, pDestBuf);
00100             pSegOutputStream->consumeWritePointer(tupleStorageLength);
00101         }
00102     }
00103     destPartition->segStream->endWrite();
00104     pSegOutputStream->close();
00105 }
00106 
00107 void LhxPartitionWriter::marshalTuple(TupleData const &inputTuple)
00108 {
00109     uint tupleStorageLength = tupleAccessor.getByteCount(inputTuple);
00110     PBuffer pDestBuf = pSegOutputStream->getWritePointer(tupleStorageLength);
00111     tupleAccessor.marshal(inputTuple, pDestBuf);
00112     pSegOutputStream->consumeWritePointer(tupleStorageLength);
00113 }
00114 
00115 void LhxPartitionWriter::aggAndMarshalTuple(TupleData const &inputTuple)
00116 {
00117     while (!hashTable.addTuple(inputTuple)) {
00118         /*
00119          * Write everything out to partition.
00120          */
00121         while (hashTableReader.getNext(partialAggTuple)) {
00122             uint tupleStorageLength =
00123                 tupleAccessor.getByteCount(partialAggTuple);
00124             PBuffer pDestBuf =
00125                 pSegOutputStream->getWritePointer(tupleStorageLength);
00126             tupleAccessor.marshal(partialAggTuple, pDestBuf);
00127             pSegOutputStream->consumeWritePointer(tupleStorageLength);
00128         }
00129         bool reuse = true;
00130         /*
00131          * hash table size remain unchanged
00132          */
00133         bool status = hashTable.allocateResources(reuse);
00134         assert(status);
00135         /*
00136          * Reset the reader and bind it to no particular key (will read the
00137          * whole hash table).
00138          */
00139         hashTableReader.bindKey(NULL);
00140     }
00141 }
00142 
00143 void LhxPartitionReader::open(
00144     SharedLhxPartition srcPartitionInit,
00145     LhxHashInfo const &hashInfo)
00146 {
00147     bufState = EXECBUF_NONEMPTY;
00148     srcPartition = srcPartitionInit;
00149 
00150     if (!srcPartition->segStream) {
00151         /*
00152          * source has never been written to, which means the source
00153          * is not from the disk but from input stream.
00154          */
00155         srcIsInputStream = true;
00156     } else {
00157         srcIsInputStream = false;
00158     }
00159 
00160     if (srcIsInputStream) {
00161         streamBufAccessor =
00162             hashInfo.streamBufAccessor[srcPartition->inputIndex];
00163         outputTupleDesc = streamBufAccessor->getTupleDesc();
00164     } else {
00165         outputTupleDesc = hashInfo.inputDesc[srcPartition->inputIndex];
00166         tupleAccessor.compute(outputTupleDesc);
00167         tupleAccessor.resetCurrentTupleBuf();
00168 
00169         /*
00170          * Since reader now gets input stream from the partition,
00171          * this inputStream will delete content that is read.
00172          * This also means each partition can only be read once.
00173          */
00174         pSegInputStream = srcPartition->segStream->getInputStream();
00175         pSegInputStream->startPrefetch();
00176     }
00177 }
00178 
00179 void LhxPartitionReader::close()
00180 {
00181     if (srcIsInputStream) {
00182         /*
00183          * Do nothing if reading from stream.
00184          */
00185     } else {
00186         pSegInputStream->close();
00187     }
00188 }
00189 
00190 void LhxPartitionReader::unmarshalTuple(TupleData &outputTuple)
00191 {
00192     if (srcIsInputStream) {
00193         /*
00194          * Read from stream.
00195          */
00196         streamBufAccessor->unmarshalTuple(outputTuple);
00197     } else {
00198         tupleAccessor.unmarshal(outputTuple);
00199     }
00200 }
00201 
00202 void LhxPartitionReader::consumeTuple()
00203 {
00204     if (srcIsInputStream) {
00205         streamBufAccessor->consumeTuple();
00206     } else {
00207         tupleAccessor.resetCurrentTupleBuf();
00208         pSegInputStream->consumeReadPointer(tupleStorageLength);
00209     }
00210 }
00211 
00212 bool LhxPartitionReader::isTupleConsumptionPending()
00213 {
00214     if (srcIsInputStream) {
00215         return streamBufAccessor->isTupleConsumptionPending();
00216     } else {
00217         if (tupleAccessor.getCurrentTupleBuf()) {
00218             return true;
00219         } else {
00220             return false;
00221         }
00222     }
00223 }
00224 
00225 bool LhxPartitionReader::demandData()
00226 {
00227     if (srcIsInputStream) {
00228         return streamBufAccessor->demandData();
00229     } else {
00230         /*
00231          * Read from disk.
00232          */
00233         uint bytesReadable = 0;
00234         PConstBuffer pSrcBuf =
00235             pSegInputStream->getReadPointer(1, &bytesReadable);
00236 
00237         /*
00238          * If readable data does not fill a tuple, it means the segment stream
00239          * has reached EOD.
00240          */
00241         if (!pSrcBuf) {
00242             bufState = EXECBUF_EOS;
00243             return false;
00244         } else {
00245             tupleStorageLength = tupleAccessor.getBufferByteCount(pSrcBuf);
00246             assert(bytesReadable >= tupleStorageLength);
00247             if (bytesReadable == tupleStorageLength) {
00248                 // We're processing the last tuple in a buffer,
00249                 // so now is a good time to check for abort.
00250                 if (srcPartition->pExecStream) {
00251                     srcPartition->pExecStream->checkAbort();
00252                 }
00253             }
00254             tupleAccessor.setCurrentTupleBuf(pSrcBuf);
00255             return true;
00256         }
00257     }
00258 }
00259 
00260 void LhxPartitionInfo::init(LhxHashInfo *hashInfoInit)
00261 {
00262     hashInfo = hashInfoInit;
00263     numInputs = (hashInfo->inputDesc).size();
00264 
00265     writerList.clear();
00266     /*
00267      * writerList is shared across iterations of partitioning. At each
00268      * iteration, writerList is initialized with new destination partitions.
00269      */
00270     for (uint i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) {
00271         writerList.push_back(
00272             SharedLhxPartitionWriter(new LhxPartitionWriter()));
00273     }
00274 
00275     filteredRowCountList.reset(
00276         new uint[numInputs * LhxPlan::LhxChildPartCount]);
00277 }
00278 
00279 void LhxPartitionInfo::open(
00280     LhxHashTableReader *hashTableReaderInit,
00281     LhxPartitionReader *buildReader,
00282     TupleData &buildTupleInit,
00283     SharedLhxPartition probePartition,
00284     uint buildInputIndex)
00285 {
00286     uint i, j;
00287 
00288     probeReader.open(probePartition, *hashInfo);
00289 
00290     /*
00291      * Start partitioning from the build side.
00292      */
00293     curInputIndex = buildInputIndex;
00294 
00295     hashTableReader = hashTableReaderInit;
00296     /*
00297      * Reset the reader and bind it to no particular key(will read the
00298      * whole hash table).
00299      */
00300     hashTableReader->bindKey(NULL);
00301 
00302     /*
00303      * The build reader is from the LhxJoinExecStream and is already open.
00304      */
00305     reader = buildReader;
00306 
00307     /*
00308      * The inflight (between disk partition and hash table) build tuple.
00309      */
00310     buildTuple = buildTupleInit;
00311 
00312     destPartitionList.clear();
00313     subPartStatList.clear();
00314     joinFilterList.clear();
00315     shared_array<uint> curSubPartStat;
00316 
00317     for (i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) {
00318         destPartitionList.push_back(
00319             SharedLhxPartition(new LhxPartition(probePartition->pExecStream)));
00320         destPartitionList[i]->inputIndex = (i / LhxPlan::LhxChildPartCount);
00321         subPartStatList.push_back(
00322             shared_array<uint>(new uint[LhxPlan::LhxSubPartCount]));
00323 
00324         curSubPartStat = subPartStatList[i];
00325 
00326         for (j = 0; j < LhxPlan::LhxSubPartCount; j ++) {
00327             curSubPartStat[j] = 0;
00328         }
00329 
00330         /*
00331          * One filter for each partition
00332          * filter bitmap is only allocated when a partition is written to
00333          */
00334         joinFilterList.push_back(shared_ptr<dynamic_bitset<> >());
00335 
00336         writerList[i]->open(destPartitionList[i], *hashInfo);
00337         filteredRowCountList[i] = 0;
00338     }
00339 
00340     /*
00341      * Tuples will come from memory (hash table) first.
00342      */
00343     partitionMemory = true;
00344 }
00345 
00346 void LhxPartitionInfo::open(
00347     LhxHashTableReader *hashTableReaderInit,
00348     LhxPartitionReader *buildReader,
00349     TupleData &buildTupleInit,
00350     AggComputerList *aggList)
00351 {
00352     uint i, j;
00353     assert (numInputs == 1);
00354     uint buildIndex = numInputs - 1;
00355 
00356     curInputIndex = buildIndex;
00357 
00358     hashTableReader = hashTableReaderInit;
00359     /*
00360      * Reset the reader and bind it to no particular key (will read the
00361      * whole hash table).
00362      */
00363     hashTableReader->bindKey(NULL);
00364 
00365     // REVIEW jvs 26-Aug-2006:  no join if doing agg...
00366     /*
00367      * The build reader is from the LhxJoinExecStream and is already open.
00368      */
00369     reader = buildReader;
00370 
00371     /*
00372      * The inflight(between disk partition and hash table) build tuple.
00373      */
00374     buildTuple = buildTupleInit;
00375 
00376     /*
00377      * The hash table contained in the writer should only use up to the child's
00378      * share of scratch buffer.
00379      */
00380     uint numWriterCachePages =
00381         hashInfo->numCachePages / LhxPlan::LhxChildPartCount;
00382 
00383     destPartitionList.clear();
00384     subPartStatList.clear();
00385     joinFilterList.clear();
00386     shared_array<uint> curSubPartStat;
00387 
00388     for (i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) {
00389         destPartitionList.push_back(
00390             SharedLhxPartition(
00391                 new LhxPartition(reader->getSourcePartition()->pExecStream)));
00392         destPartitionList[i]->inputIndex = (i / LhxPlan::LhxChildPartCount);
00393         subPartStatList.push_back(
00394             shared_array<uint>(new uint[LhxPlan::LhxSubPartCount]));
00395 
00396         curSubPartStat = subPartStatList[i];
00397 
00398         for (j = 0; j < LhxPlan::LhxSubPartCount; j ++) {
00399             curSubPartStat[j] = 0;
00400         }
00401 
00402         // REVIEW jvs 26-Aug-2006:  no join if doing agg...
00403         /*
00404          * One filter for each partition
00405          * filter bitmap is only allocated when a partition is written to
00406          */
00407         joinFilterList.push_back(shared_ptr<dynamic_bitset<> >());
00408 
00409         writerList[i]->open(
00410             destPartitionList[i],
00411             *hashInfo,
00412             aggList,
00413             numWriterCachePages);
00414         filteredRowCountList[i] = 0;
00415     }
00416 
00417     /*
00418      * Tuples will come from memory(hash table) first.
00419      */
00420     partitionMemory = true;
00421 }
00422 
00423 void LhxPartitionInfo::close()
00424 {
00425     reader->close();
00426 
00427     uint numWriters = writerList.size();
00428 
00429     for (uint i = 0; i < numWriters; i ++) {
00430         writerList[i]->close();
00431     }
00432 
00433     /*
00434      * Partial aggregate hash tables used inside the writers(one HT
00435      * for each writer) share the same scratch buffer space.
00436      * Release the buffer pages used by these hash tables at the end,
00437      * after all writers have been closed(so that there will be no more
00438      * scratch page alloc calls).
00439      */
00440     for (uint i = 0; i < numWriters; i ++) {
00441         writerList[i]->releaseResources();
00442     }
00443 }
00444 
00445 void LhxPlan::init(
00446     WeakLhxPlan parentPlanInit,
00447     uint partitionLevelInit,
00448     vector<SharedLhxPartition> &partitionsInit,
00449     bool enableSubPartStat)
00450 {
00451     /*
00452      * No filter for this plan.
00453      */
00454     shared_ptr<dynamic_bitset<> > joinFilterInit =
00455         shared_ptr<dynamic_bitset<> >();
00456     vector<shared_array<uint> > subPartStatsInit;
00457     VectorOfUint filteredRows;
00458 
00459     for (uint i = 0; i < partitionsInit.size(); i ++) {
00460         subPartStatsInit.push_back(shared_array<uint>());
00461         filteredRows.push_back(0);
00462     }
00463 
00464     init(
00465         parentPlanInit, partitionLevelInit, partitionsInit,
00466         subPartStatsInit, joinFilterInit, filteredRows,
00467         enableSubPartStat, false);
00468 }
00469 
00470 void LhxPlan::init(
00471     WeakLhxPlan parentPlanInit,
00472     uint partitionLevelInit,
00473     vector<SharedLhxPartition> &partitionsInit,
00474     vector<shared_array<uint> > &subPartStats,
00475     shared_ptr<dynamic_bitset<> > joinFilterInit,
00476     VectorOfUint &filteredRowsInit,
00477     bool enableSubPartStat,
00478     bool enableSwing)
00479 {
00480     uint numInputs = partitionsInit.size();
00481 
00482     partitionLevel = partitionLevelInit;
00483     parentPlan   = parentPlanInit;
00484 
00485     /*
00486      * REVIEW(rchen 2006-08-08): if input sides swing, then the new build could
00487      * be the same as the probe input of the previous partition. This filter
00488      * will not filter any tuple as it tries to filter the very input the
00489      * filter is built on.
00490      */
00491     joinFilter = joinFilterInit;
00492 
00493     filteredRowCount.reset(new uint[numInputs]);
00494     inputSize.reset(new uint[numInputs]);
00495     joinSideToInputMap.reset(new uint[numInputs]);
00496     subPartToChildMap.reset();
00497 
00498     // REVIEW jvs 26-Aug-2006:  here "will be used" means once someone
00499     // gets around to true hybrid, right?
00500     /*
00501      * After support is added for repartitioning using stats (gathered during
00502      * previous round of partitioning), a bin-packing algorithm will be used to
00503      * keep in memory some subpartitions and output the rest to child
00504      * partitions of similar size.
00505      */
00506     for (int i = 0; i < numInputs; i ++) {
00507         partitions.push_back(partitionsInit[i]);
00508         filteredRowCount[i] = filteredRowsInit[i];
00509         joinSideToInputMap[i] = i;
00510 
00511         inputSize[i] = 0;
00512         shared_array<uint> inputSubPartStat = subPartStats[i];
00513 
00514         if (inputSubPartStat) {
00515             for (int k = 0; k < LhxSubPartCount; k ++) {
00516                 inputSize[i] += inputSubPartStat[k];
00517             }
00518         }
00519     }
00520 
00521     /*
00522      * Map join side to input, using partitions stats associated with each
00523      * input. The input with the smaller side witll be the build side for the
00524      * join: joinSide for the build will map to the index of this input.
00525      */
00526     if (enableSwing &&
00527         (numInputs == 2) && (inputSize[0] < inputSize[1])) {
00528         joinSideToInputMap[0] = 1;
00529         joinSideToInputMap[1] = 0;
00530     }
00531 
00532     subPartToChildMap.reset();
00533 
00534     if (enableSubPartStat) {
00535         /*
00536          * Use build side sub part stat to divide both inputs into child
00537          * partitions. Needs to be called after the join sides have been
00538          * assigned.
00539          */
00540         mapSubPartToChild(subPartStats);
00541     }
00542 }
00543 
00544 void LhxPlan::mapSubPartToChild(
00545     vector<shared_array<uint> > &subPartStats)
00546 {
00547     uint numInputs = partitions.size();
00548     uint buildIndex =  getBuildInput();
00549     shared_array<uint> buildSubPartStat = subPartStats[buildIndex];
00550 
00551     if (!buildSubPartStat) {
00552         return;
00553     }
00554 
00555     uint i, j, k;
00556 
00557     subPartToChildMap.reset(new uint[LhxSubPartCount]);
00558 
00559     for (i = 0; i < numInputs; i ++) {
00560         childPartSize.push_back(
00561             shared_array<uint>(new uint[LhxChildPartCount]));
00562     }
00563 
00564     shared_array<uint> buildChildPartSize = childPartSize[buildIndex];
00565 
00566     for (i = 0; i < LhxChildPartCount; i ++) {
00567         buildChildPartSize[i] = 0;
00568     }
00569 
00570     j = 0;
00571     for (i = 0; i < LhxSubPartCount; i ++) {
00572         buildChildPartSize[j] += buildSubPartStat[i];
00573         subPartToChildMap[i] = j;
00574 
00575         k = 1;
00576         while (
00577             (buildChildPartSize[j]
00578                 > buildChildPartSize[(j + k) % LhxChildPartCount])
00579             && k < LhxChildPartCount)
00580         {
00581             k ++;
00582         }
00583 
00584         if (k == LhxChildPartCount) {
00585             // If current child partition is bigger than all other child
00586             // partitions, move to the next child
00587             j = (j + 1) % LhxChildPartCount;
00588         }
00589     }
00590 
00591     /*
00592      * This is simply stats keeping for the probe side child partitions.
00593      */
00594     if (numInputs == 2) {
00595         uint probeIndex = getProbeInput();
00596         shared_array<uint> probeChildPartSize = childPartSize[probeIndex];
00597         shared_array<uint> probeSubPartStat = subPartStats[probeIndex];
00598 
00599         for (i = 0; i < LhxChildPartCount; i ++) {
00600             probeChildPartSize[i] = 0;
00601         }
00602 
00603         for (i = 0; i < LhxSubPartCount; i ++) {
00604             probeChildPartSize[subPartToChildMap[i]] += probeSubPartStat[i];
00605         }
00606     }
00607 }
00608 
00609 uint LhxPlan::calculateChildIndex(uint hashKey, uint curInputIndex)
00610 {
00611     if (subPartToChildMap) {
00612         return (subPartToChildMap[hashKey % LhxSubPartCount] +
00613             curInputIndex * LhxChildPartCount);
00614     } else {
00615         return (hashKey % LhxChildPartCount +
00616             curInputIndex * LhxChildPartCount);
00617     }
00618 }
00619 
00620 LhxPartitionState LhxPlan::generatePartitions(
00621     LhxHashInfo const &hashInfo,
00622     LhxPartitionInfo  &partInfo)
00623 {
00624     // REVIEW jvs 26-Aug-2006:  modulo on this is computed below; make
00625     // sure compiler is optimizing to bitmask, or do it by hand
00626     uint filterSize = 4096;
00627     bool isAggregate = (partInfo.numInputs == 1);
00628 
00629     LhxHashGenerator hashGenPrev;
00630     LhxHashGenerator hashGen;
00631     LhxHashGenerator hashGenNext;
00632 
00633     hashGenPrev.init(partitionLevel);
00634     hashGen.init(partitionLevel + 1);
00635     hashGenNext.init(partitionLevel + 2);
00636 
00637     LhxPartitionReader *&reader = partInfo.reader;
00638     vector<SharedLhxPartitionWriter> &writerList = partInfo.writerList;
00639     vector<shared_ptr<dynamic_bitset<> > > &joinFilterList =
00640         partInfo.joinFilterList;
00641     vector<shared_array<uint> > &subPartStatList = partInfo.subPartStatList;
00642     shared_array<uint> &filteredRowCountList = partInfo.filteredRowCountList;
00643 
00644     uint &curInputIndex = partInfo.curInputIndex;
00645     uint otherInputIndex = partInfo.numInputs - curInputIndex - 1;
00646 
00647     TupleData inputTuple;
00648     TupleDescriptor inputTupleDesc = reader->getTupleDesc();
00649     inputTuple.compute(inputTupleDesc);
00650 
00651     uint prevHashKey;
00652     uint hashKey;
00653     uint nextHashKey;
00654 
00655     uint childPartIndex;
00656     bool writeToPartition;
00657 
00658     uint statIndex;
00659     shared_array<uint> curSubPartStat;
00660 
00661     /*
00662      * If partition source is from memory,
00663      * i.e. there's hash table to read from.
00664      */
00665     if (partInfo.partitionMemory) {
00666         TupleData hashTableTuple;
00667         TupleDescriptor hashTableTupleDesc = hashInfo.inputDesc[curInputIndex];
00668 
00669         hashTableTuple.compute(hashTableTupleDesc);
00670 
00671         while ((partInfo.hashTableReader)->getNext(hashTableTuple)) {
00672             writeToPartition = false;
00673 
00674             hashKey = hashGen.hash(
00675                 hashTableTuple,
00676                 hashInfo.keyProj[curInputIndex],
00677                 hashInfo.isKeyColVarChar[curInputIndex]);
00678 
00679             childPartIndex = calculateChildIndex(hashKey, curInputIndex);
00680 
00681             if (hashInfo.useJoinFilter[curInputIndex]) {
00682                 /*
00683                  * Use input filter if there is one. Note top level build input
00684                  * does not have input filter.
00685                  */
00686                 if (partitionLevel == 0) {
00687                     writeToPartition = true;
00688                 } else {
00689                     prevHashKey =
00690                         hashGenPrev.hash(
00691                             hashTableTuple,
00692                             hashInfo.keyProj[curInputIndex],
00693                             hashInfo.isKeyColVarChar[curInputIndex]);
00694                     if (joinFilter &&
00695                         joinFilter->test(prevHashKey % filterSize)) {
00696                         writeToPartition = true;
00697                     } else {
00698                         filteredRowCountList[childPartIndex]++;
00699                     }
00700                 }
00701             } else {
00702                 /*
00703                  * Not using filter.
00704                  */
00705                 writeToPartition = true;
00706             }
00707 
00708             if (writeToPartition) {
00709                 writerList[childPartIndex]->marshalTuple(hashTableTuple);
00710 
00711                 nextHashKey = hashGenNext.hash(
00712                     hashTableTuple,
00713                     hashInfo.keyProj[curInputIndex],
00714                     hashInfo.isKeyColVarChar[curInputIndex]);
00715 
00716                 statIndex = nextHashKey % LhxSubPartCount;
00717                 curSubPartStat = subPartStatList[childPartIndex];
00718                 curSubPartStat[statIndex]++;
00719 
00720                 /*
00721                  * Set output filter for the other input.
00722                  * Note: if the filter is used on the next level, "the other
00723                  * input" could be the same if join sides are switched.
00724                  */
00725                 if (!joinFilterList[childPartIndex]) {
00726                     /*
00727                      * Filter not allocated yet.
00728                      */
00729                     joinFilterList[childPartIndex].reset(
00730                         new dynamic_bitset<>(filterSize));
00731                 }
00732                 joinFilterList[childPartIndex]->set(hashKey % filterSize);
00733             }
00734         }
00735 
00736         if (isAggregate) {
00737             /*
00738              * release the agg exec stream hash table scratch pages
00739              * and initialize writer scratch pages.
00740              */
00741             ((partInfo.hashTableReader)->getHashTable())->releaseResources();
00742             for (int i = 0; i < writerList.size();i ++) {
00743                 writerList[i]->allocateResources();
00744             }
00745         }
00746 
00747         /*
00748          * Done with tuples in the hash table. Next tuples will come from a
00749          * partition (stream or disk).
00750          */
00751         partInfo.partitionMemory = false;
00752 
00753         /*
00754          * The tuple for which hash table full is detected is not in the hash
00755          * table yet.
00756          */
00757         inputTuple = partInfo.buildTuple;
00758     }
00759 
00760     for (;;) {
00761         /*
00762          * Note that partInfo.buildTuple is an unconsumed tuple from the
00763          * reader. So when first time in this loop, isTupleConsumptionPending
00764          * returns true.
00765          */
00766         if (!reader->isTupleConsumptionPending()) {
00767             if (reader->getState() == EXECBUF_EOS) {
00768                 if (curInputIndex == getProbeInput()) {
00769                     /*
00770                      * Done with partitioning the 0th input.
00771                      * The current plan is completely partitioned.
00772                      */
00773                     return PartitionEndOfData;
00774                 } else {
00775                     curInputIndex = getProbeInput();
00776                     otherInputIndex = partInfo.numInputs - curInputIndex - 1;
00777                     reader->close();
00778                     reader = &partInfo.probeReader;
00779                     inputTupleDesc = reader->getTupleDesc();
00780                     inputTuple.compute(inputTupleDesc);
00781                     continue;
00782                 }
00783             }
00784 
00785             if (!reader->demandData()) {
00786                 if (partitionLevel == 0) {
00787                     /*
00788                      * Reading from a stream: indicate underflow to producer.
00789                      */
00790                     return PartitionUnderflow;
00791                 } else {
00792                     if (curInputIndex == getProbeInput()) {
00793                         /*
00794                          * Done with partitioning the 0th input.
00795                          * The current plan is completely partitioned.
00796                          */
00797                         return PartitionEndOfData;
00798                     } else {
00799                         curInputIndex = getProbeInput();
00800                         reader->close();
00801                         reader = &partInfo.probeReader;
00802                         inputTupleDesc = reader->getTupleDesc();
00803                         inputTuple.compute(inputTupleDesc);
00804                         continue;
00805                     }
00806                 }
00807             }
00808 
00809             reader->unmarshalTuple(inputTuple);
00810         }
00811 
00812         writeToPartition = false;
00813 
00814         hashKey = hashGen.hash(
00815             inputTuple,
00816             hashInfo.keyProj[curInputIndex],
00817             hashInfo.isKeyColVarChar[curInputIndex]);
00818 
00819         childPartIndex = calculateChildIndex(hashKey, curInputIndex);
00820 
00821         nextHashKey = hashGenNext.hash(
00822             inputTuple,
00823             hashInfo.keyProj[curInputIndex],
00824             hashInfo.isKeyColVarChar[curInputIndex]);
00825 
00826         statIndex = nextHashKey % LhxSubPartCount;
00827 
00828         if (!isAggregate) {
00829             if (hashInfo.useJoinFilter[curInputIndex]) {
00830                 /*
00831                  * Use input filter if there exists one
00832                  */
00833                 if (isBuildChildPart(childPartIndex)) {
00834                     /*
00835                      * Build input.  Note top level build input does not have
00836                      * input filter.  Use joinfilter from the probe input of
00837                      * the previous level.
00838                      */
00839                     if (partitionLevel == 0) {
00840                         writeToPartition = true;
00841                     } else {
00842                         prevHashKey =
00843                             hashGenPrev.hash(
00844                                 inputTuple,
00845                                 hashInfo.keyProj[curInputIndex],
00846                                 hashInfo.isKeyColVarChar[curInputIndex]);
00847                         if (joinFilter &&
00848                             joinFilter->test(prevHashKey % filterSize)) {
00849                             writeToPartition = true;
00850                         } else {
00851                             filteredRowCountList[childPartIndex]++;
00852                         }
00853                     }
00854                 } else {
00855                     /*
00856                      * Probe input.
00857                      * Use join filter from build input of the same
00858                      * partitioning level.
00859                      */
00860                     if (joinFilterList[getBuildChildPart(childPartIndex)] &&
00861                         joinFilterList[getBuildChildPart(childPartIndex)]->
00862                         test(hashKey % filterSize)) {
00863                         writeToPartition = true;
00864                     } else {
00865                         filteredRowCountList[childPartIndex]++;
00866                     }
00867                 }
00868             } else {
00869                 /*
00870                  * Not using filter.
00871                  */
00872                 writeToPartition = true;
00873             }
00874 
00875             if (writeToPartition) {
00876                 writerList[childPartIndex]->marshalTuple(inputTuple);
00877                 curSubPartStat = subPartStatList[childPartIndex];
00878                 curSubPartStat[statIndex]++;
00879 
00880                 /*
00881                  * Set output filter to be used by the other input.
00882                  * Note: if the filter is used on the next level, "the other
00883                  * input" could be the same if join sides are switched.
00884                  */
00885                 if (!joinFilterList[childPartIndex]) {
00886                     /*
00887                      * Filter not allocated yet.
00888                      */
00889                     joinFilterList[childPartIndex].reset(
00890                         new dynamic_bitset<>(filterSize));
00891                 }
00892                 joinFilterList[childPartIndex]->set(hashKey % filterSize);
00893             }
00894         } else {
00895             writerList[childPartIndex]->aggAndMarshalTuple(inputTuple);
00896             (subPartStatList[childPartIndex])[statIndex]++;
00897         }
00898 
00899         reader->consumeTuple();
00900     }
00901 }
00902 
00903 void LhxPlan::createChildren(
00904     LhxHashInfo const &hashInfo,
00905     bool enableSubPartStat)
00906 {
00907     LhxHashGenerator hashGen;
00908     hashGen.init(partitionLevel + 1);
00909 
00910     uint numInputs = hashInfo.inputDesc.size();
00911 
00912     vector<SharedLhxPartition> destPartitionList(LhxChildPartCount * numInputs);
00913 
00914     LhxPartitionReader reader;
00915     LhxPartitionWriter writerList[LhxChildPartCount];
00916     uint childNum, i, j;
00917     TupleData outputTuple;
00918 
00919     /*
00920      * Generate partitions for each input.
00921      */
00922     for (j = 0; j < numInputs; j ++) {
00923         reader.open(partitions[j], hashInfo);
00924         outputTuple.compute(hashInfo.inputDesc[j]);
00925 
00926         for (i = 0; i < LhxChildPartCount; i ++) {
00927             uint index = j * LhxChildPartCount + i;
00928             destPartitionList[index].reset(
00929                 new LhxPartition(partitions[j]->pExecStream));
00930             destPartitionList[index]->inputIndex = j;
00931             writerList[i].open(destPartitionList[index], hashInfo);
00932         }
00933 
00934         for (;;) {
00935             if (!reader.isTupleConsumptionPending()) {
00936                 if (reader.getState() == EXECBUF_EOS) {
00937                     /*
00938                      * The current plan is completely partitioned.
00939                      */
00940                     break;
00941                 }
00942                 if (!reader.demandData()) {
00943                     break;
00944                 }
00945                 reader.unmarshalTuple(outputTuple);
00946             }
00947 
00948             childNum =
00949                 hashGen.hash(
00950                     outputTuple,
00951                     hashInfo.keyProj[j],
00952                     hashInfo.isKeyColVarChar[j]) % LhxChildPartCount;
00953 
00954             writerList[childNum].marshalTuple(outputTuple);
00955             reader.consumeTuple();
00956         }
00957 
00958         for (i = 0; i < LhxChildPartCount; i ++) {
00959             writerList[i].close();
00960         }
00961 
00962         /*
00963          * Partial aggregate hash tables used inside the writers(one HT
00964          * for each writer) share the same scratch buffer space.
00965          * Release the buffer pages used by these hash tables at the end,
00966          * after all writers have been closed(so that there will be no more
00967          * scratch page alloc calls).
00968          */
00969         for (i = 0; i < LhxChildPartCount; i ++) {
00970             writerList[i].releaseResources();
00971         }
00972 
00973         reader.close();
00974     }
00975 
00976     /*
00977      * Create child plans consisting of one partition from each input.
00978      */
00979     for (i = 0; i < LhxChildPartCount; i ++) {
00980         SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan());
00981         vector<SharedLhxPartition> partitionList;
00982         partitionList.push_back(destPartitionList[i]);
00983         partitionList.push_back(destPartitionList[i + LhxChildPartCount]);
00984 
00985         newChildPlan->init(
00986             WeakLhxPlan(shared_from_this()),
00987             partitionLevel + 1,
00988             partitionList,
00989             enableSubPartStat);
00990 
00991         newChildPlan->addSibling(firstChildPlan);
00992         firstChildPlan = newChildPlan;
00993     }
00994 }
00995 
00996 void LhxPlan::createChildren(
00997     LhxPartitionInfo &partInfo,
00998     bool enableSubPartStat,
00999     bool enableSwing)
01000 {
01001     uint i, j;
01002 
01003     for (i = 0; i < LhxChildPartCount; i ++) {
01004         SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan());
01005         vector<SharedLhxPartition> partitionList;
01006         vector<shared_array<uint> > subPartStats;
01007         VectorOfUint filteredRows;
01008         for (j = 0; j < partInfo.numInputs; j ++) {
01009             partitionList.push_back(
01010                 partInfo.destPartitionList[i + LhxChildPartCount * j]);
01011             subPartStats.push_back(
01012                 partInfo.subPartStatList[i + LhxChildPartCount * j]);
01013             filteredRows.push_back(
01014                 partInfo.filteredRowCountList[i + LhxChildPartCount * j]);
01015         }
01016         newChildPlan->init(
01017             WeakLhxPlan(shared_from_this()),
01018             partitionLevel + 1,
01019             partitionList,
01020             subPartStats,
01021             partInfo.joinFilterList[getProbeChildPart(i)],
01022             filteredRows,
01023             enableSubPartStat,
01024             enableSwing);
01025 
01026         newChildPlan->addSibling(firstChildPlan);
01027         firstChildPlan = newChildPlan;
01028     }
01029     partInfo.destPartitionList.clear();
01030     partInfo.joinFilterList.clear();
01031 }
01032 
01033 LhxPlan *LhxPlan::getFirstLeaf()
01034 {
01035     if (!firstChildPlan) {
01036         return this;
01037     } else {
01038         return firstChildPlan->getFirstLeaf();
01039     }
01040 }
01041 
01042 LhxPlan *LhxPlan::getNextLeaf()
01043 {
01044     if (siblingPlan) {
01045         return siblingPlan->getFirstLeaf();
01046     } else {
01047         WeakLhxPlan parent = this->parentPlan;
01048         SharedLhxPlan shared_parent = parent.lock();
01049 
01050         if (shared_parent) {
01051             return shared_parent->getNextLeaf();
01052         } else {
01053             return NULL;
01054         }
01055     }
01056 }
01057 
01058 void LhxPlan::close()
01059 {
01060     if (firstChildPlan) {
01061         firstChildPlan->close();
01062     }
01063 
01064     if (siblingPlan) {
01065         siblingPlan->close();
01066     }
01067 
01068     for (uint i = 0; i < partitions.size(); i ++) {
01069         if (partitions[i] && partitions[i]->segStream) {
01070             partitions[i]->segStream->close();
01071         }
01072     }
01073 }
01074 
01075 string LhxPlan::toString()
01076 {
01077     ostringstream planTrace;
01078 
01079     planTrace << "\n"
01080               << "[Plan : addr       = " << this                 << "]\n"
01081               << "[       level      = " << partitionLevel       << "]\n"
01082               << "[       parent     = " << parentPlan.lock().get()  << "]\n"
01083               << "[       firstChild = " << firstChildPlan.get() << "]\n"
01084               << "[       sibling    = " << siblingPlan.get()    << "]\n";
01085 
01086     /*
01087      * Print out the partitions.
01088      */
01089     planTrace << "[joinFilter = ";
01090     if (joinFilter) {
01091         planTrace << joinFilter.get();
01092     }
01093     planTrace << "]\n";
01094 
01095     for (uint i = 0; i < partitions.size(); i ++) {
01096         planTrace << "[Partition(" << i << ")]\n"
01097                   << "[       inputIndex     = " << partitions[i]->inputIndex << "]\n"
01098                   << "[       join side      = " << getJoinSide(partitions[i]->inputIndex) << "]\n"
01099                   << "[       filteredRows   = " << filteredRowCount[i] << "]\n"
01100                   << "[       inputSize      = " << inputSize[i] << "]\n";
01101         planTrace << "[       childPartSize  = ";
01102         if (childPartSize.size() > i) {
01103             shared_array<uint> oneChildPartSize = childPartSize[i];
01104             if (oneChildPartSize) {
01105                 for (uint j = 0; j < LhxChildPartCount; j ++) {
01106                     planTrace << oneChildPartSize[j] << " ";
01107                 }
01108             }
01109         }
01110         planTrace << "]\n";
01111     }
01112 
01113     planTrace << "[subPartToChildMap = ";
01114     if (subPartToChildMap) {
01115         for (uint i = 0; i < LhxSubPartCount; i ++) {
01116             planTrace << subPartToChildMap[i] << " ";
01117         }
01118     }
01119     planTrace << "]\n";
01120 
01121     SharedLhxPlan childPlan = firstChildPlan;
01122 
01123     while (childPlan) {
01124         planTrace << childPlan->toString();
01125         childPlan = childPlan->siblingPlan;
01126     }
01127 
01128     return planTrace.str();
01129 }
01130 
01131 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $");
01132 
01133 // End LhxPartition.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1