LhxPartition.h

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/hashexe/LhxPartition.h#4 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #ifndef Fennel_LhxPartition_Included
00024 #define Fennel_LhxPartition_Included
00025 
00026 #include "fennel/tuple/TupleData.h"
00027 #include "fennel/tuple/TupleDescriptor.h"
00028 #include "fennel/tuple/TupleAccessor.h"
00029 #include "fennel/tuple/TuplePrinter.h"
00030 #include "fennel/segment/SegInputStream.h"
00031 #include "fennel/segment/SegOutputStream.h"
00032 #include "fennel/segment/SegStreamAllocation.h"
00033 #include "fennel/hashexe/LhxHashBase.h"
00034 #include "fennel/hashexe/LhxHashTable.h"
00035 #include "fennel/exec/ExecStreamBufAccessor.h"
00036 #include "fennel/exec/ExecStream.h"
00037 #include "fennel/exec/AggComputer.h"
00038 #include <boost/dynamic_bitset.hpp>
00039 #include <boost/scoped_array.hpp>
00040 #include <boost/shared_array.hpp>
00041 #include <boost/enable_shared_from_this.hpp>
00042 
00043 using namespace std;
00044 using namespace boost;
00045 
00046 FENNEL_BEGIN_NAMESPACE
00047 
00054 struct LhxPartition
00055 {
00056     /*
00057      * The seg stream pair (input/output) associated with this partition.
00058      */
00059     SharedSegStreamAllocation segStream;
00060 
00061     /*
00062      * which input does data in this partition come from
00063      * 0 for the original probe input
00064      * 1 for the original build input
00065      */
00066     uint   inputIndex;
00067 
00072     ExecStream *pExecStream;
00073 
00074     explicit LhxPartition(ExecStream *pExecStreamInit);
00075 };
00076 
00077 class FENNEL_HASHEXE_EXPORT LhxPartitionWriter
00078 {
00082     SharedLhxPartition destPartition;
00083 
00087     SharedSegOutputStream pSegOutputStream;
00088 
00092     TupleAccessor tupleAccessor;
00093 
00094     bool isAggregate;
00095 
00101     LhxHashTable hashTable;
00102     LhxHashTableReader hashTableReader;
00103     TupleData partialAggTuple;
00104 
00105 public:
00106     void open(
00107         SharedLhxPartition destPartitionInit,
00108         LhxHashInfo const &hashInfo);
00109 
00110     void open(
00111         SharedLhxPartition destPartitionInit,
00112         LhxHashInfo &hashInfo,
00113         AggComputerList *aggList,
00114         uint numWriterCachePages);
00115 
00116     inline void allocateResources();
00117     inline void releaseResources();
00118     void marshalTuple(TupleData const &inputTuple);
00119     void aggAndMarshalTuple(TupleData const &inputTuple);
00120     void close();
00121 };
00122 
00123 class FENNEL_HASHEXE_EXPORT LhxPartitionReader
00124 {
00128     SharedLhxPartition srcPartition;
00129 
00133     SharedSegInputStream pSegInputStream;
00134 
00138     TupleAccessor tupleAccessor;
00139 
00143     uint tupleStorageLength;
00144 
00145     bool srcIsInputStream;
00146     ExecStreamBufState bufState;
00147     TupleDescriptor outputTupleDesc;
00148 
00154     SharedExecStreamBufAccessor streamBufAccessor;
00155 
00156 public:
00157     void open(
00158         SharedLhxPartition srcPartition,
00159         LhxHashInfo const &hashInfo);
00160 
00161     bool isTupleConsumptionPending();
00162     bool demandData();
00163     void unmarshalTuple(TupleData &outputTuple);
00164     void consumeTuple();
00165     void close();
00166     inline ExecStreamBufState getState() const;
00167     inline TupleDescriptor const &getTupleDesc() const;
00168     inline SharedLhxPartition getSourcePartition() const;
00169 };
00170 
00171 // REVIEW jvs 26-Aug-2006:  Fennel convention for enum names is
00172 // all uppercase with underscores.  Enums declared at top level
00173 // need prefix, e.g. LHX_PARTITION_UNDERFLOW, since we don't use
00174 // namespaces below the fennel level.
00175 
00176 enum LhxPartitionState {
00177     PartitionUnderflow, PartitionEndOfData
00178 };
00179 
00180 struct LhxPartitionInfo
00181 {
00185     LhxHashTableReader *hashTableReader;
00186 
00197     LhxPartitionReader probeReader;
00198     LhxPartitionReader *reader;
00199 
00203     TupleData buildTuple;
00204 
00205     // REVIEW jvs 26-Aug-2006:  see comment in LhxHashBase regarding
00206     // one vector of structs vs many vectors of scalars; but may not
00207     // apply here.
00208 
00220     vector<SharedLhxPartitionWriter> writerList;
00221     vector<SharedLhxPartition> destPartitionList;
00222     // REVIEW jvs 26-Aug-2006:  typedef dynamic_bitset<> LhxJoinBloomFilter
00223     // would be nice, plus SharedLhxJoinBloomFilter
00224     vector<shared_ptr<dynamic_bitset<> > > joinFilterList;
00225     shared_array<uint> filteredRowCountList;
00226 
00227     /*
00228      * Stats associated with the subpartitions. One set of subpartitions for
00229      * each input partition.
00230      * subPartStatList.size() == numInputs * LhxPlan::LhxChildPartCount
00231      * and each item in the list contains LhxPlan::LhxSubPartCount elements.
00232      */
00233     vector<shared_array<uint> > subPartStatList;
00234 
00235     uint numInputs;
00236     uint curInputIndex;
00237 
00238     LhxHashInfo *hashInfo;
00239 
00240     /*
00241      * True if the tuples currently being partitioned come from
00242      * memory (i.e. from the hash table).
00243      */
00244     bool partitionMemory;
00245 
00246     LhxPartitionInfo()
00247     {
00248         reader = NULL;
00249         hashTableReader = NULL;
00250     }
00251 
00252     // REVIEW jvs 25-Aug-2006:  Unless input parameter can be NULL,
00253     // make it a reference instead of a pointer.  Same is true elsewhere.
00257     void init(LhxHashInfo *hashInfoInit);
00258 
00266     void open(
00267         LhxHashTableReader *hashTableReaderInit,
00268         LhxPartitionReader *buildReader,
00269         TupleData &buildTuple,
00270         SharedLhxPartition probePartition,
00271         uint buildInputIndex);
00272 
00279     void open(
00280         LhxHashTableReader *hashTableReaderInit,
00281         LhxPartitionReader *buildReader,
00282         TupleData &buildTuple,
00283         AggComputerList *aggList);
00284 
00288     void close();
00289 };
00290 
00291 class FENNEL_HASHEXE_EXPORT LhxPlan
00292     : public enable_shared_from_this<LhxPlan>
00293 {
00294     uint partitionLevel;
00295     vector<SharedLhxPartition> partitions;
00296     shared_array<uint> joinSideToInputMap;
00297 
00298     shared_ptr<dynamic_bitset<> > joinFilter;
00299     shared_array<uint> filteredRowCount;
00300 
00301     /*
00302      * Map sub partitions to child partitions, based on hash key.
00303      * The mapping algorithm tries to put similar amount of data to each child
00304      * partition.
00305      */
00306     shared_array<uint> subPartToChildMap;
00307     vector<shared_array<uint> > childPartSize;
00308 
00309     shared_array<uint> inputSize;
00310 
00311     /*
00312      * Plan linkage.
00313      *
00314      * Parent plan is a weak pointer to avoid cycles in shared pointer
00315      * reference counting.
00316      * http://www.boost.org/libs/smart_ptr/weak_ptr.htm
00317      *
00318      * To enable linking back via shared_ptr to this LhxPlan object,
00319      * LhxPlan uses enable_shared_from_this as base class.
00320      * http://www.boost.org/libs/smart_ptr/enable_shared_from_this.html
00321      */
00322     WeakLhxPlan parentPlan;
00323     SharedLhxPlan firstChildPlan;
00324     SharedLhxPlan siblingPlan;
00325 
00329     inline void addSibling(SharedLhxPlan siblingPlan);
00330 
00336     void mapSubPartToChild(vector<shared_array<uint> > &subPartStats);
00337 
00342     uint calculateChildIndex(uint hashKey, uint curInputIndex);
00343 
00344     inline bool isBuildChildPart(uint childPartIndex);
00345 
00346     inline bool isProbeChildPart(uint childPartIndex);
00347 
00348     inline uint getBuildChildPart(uint childPartIndex);
00349 
00350     inline uint getProbeChildPart(uint childPartIndex);
00351 
00352 public:
00353     /*
00354      * Maximum number of subpartitions a non-leaf partition has. Subpartitions
00355      * are packed into child partitions so that child partitions are of
00356      * similar size.
00357      */
00358     static const uint LhxSubPartCount = 16;
00359     static const uint LhxChildPartCount = 3;
00360 
00364     void init(
00365         WeakLhxPlan parentPlanInit,
00366         uint partitionLevelInit,
00367         vector<SharedLhxPartition> &partitionsInit,
00368         bool enableSubPartStat);
00369 
00373     void init(
00374         WeakLhxPlan parentPlanInit,
00375         uint partitionLevelInit,
00376         vector<SharedLhxPartition> &partitionsInit,
00377         vector<shared_array<uint> > &subPartStats,
00378         shared_ptr<dynamic_bitset<> > filterInit,
00379         VectorOfUint &filteredRowsInit,
00380         bool enableSubPartStat,
00381         bool enableSwing);
00382 
00383 
00387     LhxPartitionState generatePartitions(
00388         LhxHashInfo const &hashInfo,
00389         LhxPartitionInfo &partInfo);
00390 
00395     void createChildren(LhxHashInfo const &hashInfo, bool enableSubPartStat);
00396 
00400     void createChildren(
00401         LhxPartitionInfo &partInfo,
00402         bool enableSubPartStat,
00403         bool enableSwing);
00404 
00408     inline uint getPartitionLevel();
00409 
00413     inline SharedLhxPartition getBuildPartition();
00414     inline SharedLhxPartition getProbePartition();
00415     inline SharedLhxPartition getPartition(uint inputIndex);
00416 
00417     /*
00418      * Get the input index corresponding to the probe or the build side of the
00419      * join for this plan.
00420      */
00421     inline uint getBuildInput();
00422     inline uint getProbeInput();
00423 
00424     /*
00425      * Get the join side corresponding to the input index.
00426      * 0 : probe side
00427      * 1 : build side
00428      */
00429     inline uint getJoinSide(uint inputIndex);
00430 
00434     inline SharedLhxPlan getFirstChild();
00435 
00439     LhxPlan *getFirstLeaf();
00440 
00444     LhxPlan *getNextLeaf();
00445 
00450     void close();
00451 
00457     string toString();
00458 };
00459 
00460 inline LhxPartition::LhxPartition(ExecStream *pExecStreamInit)
00461 {
00462     pExecStream = pExecStreamInit;
00463 }
00464 
00465 inline ExecStreamBufState LhxPartitionReader::getState() const
00466 {
00467     if (srcIsInputStream) {
00468         return streamBufAccessor->getState();
00469     } else {
00470         return bufState;
00471     }
00472 }
00473 
00474 inline SharedLhxPartition LhxPartitionReader::getSourcePartition() const
00475 {
00476     return srcPartition;
00477 }
00478 
00479 inline TupleDescriptor const &LhxPartitionReader::getTupleDesc() const
00480 {
00481     return outputTupleDesc;
00482 }
00483 
00484 inline void LhxPartitionWriter::allocateResources()
00485 {
00486     bool status = hashTable.allocateResources();
00487     assert(status);
00488 }
00489 
00490 inline void LhxPartitionWriter::releaseResources()
00491 {
00492     hashTable.releaseResources();
00493 }
00494 
00495 inline void LhxPlan::addSibling(SharedLhxPlan siblingPlanInit)
00496 {
00497     siblingPlan = siblingPlanInit;
00498 }
00499 
00500 inline SharedLhxPlan LhxPlan::getFirstChild()
00501 {
00502     return firstChildPlan;
00503 }
00504 
00505 inline uint LhxPlan::getPartitionLevel()
00506 {
00507     return partitionLevel;
00508 }
00509 
00510 inline uint LhxPlan::getProbeInput()
00511 {
00512     return joinSideToInputMap[0];
00513 }
00514 
00515 inline uint LhxPlan::getBuildInput()
00516 {
00517     return joinSideToInputMap[partitions.size() - 1];
00518 }
00519 
00520 inline SharedLhxPartition LhxPlan::getProbePartition()
00521 {
00522     return partitions[getProbeInput()];
00523 }
00524 
00525 inline SharedLhxPartition LhxPlan::getBuildPartition()
00526 {
00527     return partitions[getBuildInput()];
00528 }
00529 
00530 inline SharedLhxPartition LhxPlan::getPartition(uint inputIndex)
00531 {
00532     return partitions[inputIndex];
00533 }
00534 
00535 inline uint LhxPlan::getJoinSide(uint inputIndex)
00536 {
00537     uint i = 0;
00538     while ((joinSideToInputMap[i] != inputIndex)
00539         && (i < partitions.size()))
00540     {
00541         i ++;
00542     }
00543 
00544     return i;
00545 }
00546 
00547 inline bool LhxPlan::isBuildChildPart(uint childPartIndex)
00548 {
00549     return ((childPartIndex / LhxChildPartCount) == getBuildInput());
00550 }
00551 
00552 inline bool LhxPlan::isProbeChildPart(uint childPartIndex)
00553 {
00554     return ((childPartIndex / LhxChildPartCount) == getProbeInput());
00555 }
00556 
00557 inline uint LhxPlan::getBuildChildPart(uint childPartIndex)
00558 {
00559     return ((childPartIndex % LhxChildPartCount) +
00560             getBuildInput() * LhxChildPartCount);
00561 }
00562 
00563 inline uint LhxPlan::getProbeChildPart(uint childPartIndex)
00564 {
00565     return ((childPartIndex % LhxChildPartCount) +
00566             getProbeInput() * LhxChildPartCount);
00567 }
00568 
00569 FENNEL_END_NAMESPACE
00570 
00571 #endif
00572 
00573 // End LhxPartition.h

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1