00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef Fennel_LhxPartition_Included
00024 #define Fennel_LhxPartition_Included
00025
00026 #include "fennel/tuple/TupleData.h"
00027 #include "fennel/tuple/TupleDescriptor.h"
00028 #include "fennel/tuple/TupleAccessor.h"
00029 #include "fennel/tuple/TuplePrinter.h"
00030 #include "fennel/segment/SegInputStream.h"
00031 #include "fennel/segment/SegOutputStream.h"
00032 #include "fennel/segment/SegStreamAllocation.h"
00033 #include "fennel/hashexe/LhxHashBase.h"
00034 #include "fennel/hashexe/LhxHashTable.h"
00035 #include "fennel/exec/ExecStreamBufAccessor.h"
00036 #include "fennel/exec/ExecStream.h"
00037 #include "fennel/exec/AggComputer.h"
00038 #include <boost/dynamic_bitset.hpp>
00039 #include <boost/scoped_array.hpp>
00040 #include <boost/shared_array.hpp>
00041 #include <boost/enable_shared_from_this.hpp>
00042
00043 using namespace std;
00044 using namespace boost;
00045
00046 FENNEL_BEGIN_NAMESPACE
00047
00054 struct LhxPartition
00055 {
00056
00057
00058
00059 SharedSegStreamAllocation segStream;
00060
00061
00062
00063
00064
00065
00066 uint inputIndex;
00067
00072 ExecStream *pExecStream;
00073
00074 explicit LhxPartition(ExecStream *pExecStreamInit);
00075 };
00076
00077 class FENNEL_HASHEXE_EXPORT LhxPartitionWriter
00078 {
00082 SharedLhxPartition destPartition;
00083
00087 SharedSegOutputStream pSegOutputStream;
00088
00092 TupleAccessor tupleAccessor;
00093
00094 bool isAggregate;
00095
00101 LhxHashTable hashTable;
00102 LhxHashTableReader hashTableReader;
00103 TupleData partialAggTuple;
00104
00105 public:
00106 void open(
00107 SharedLhxPartition destPartitionInit,
00108 LhxHashInfo const &hashInfo);
00109
00110 void open(
00111 SharedLhxPartition destPartitionInit,
00112 LhxHashInfo &hashInfo,
00113 AggComputerList *aggList,
00114 uint numWriterCachePages);
00115
00116 inline void allocateResources();
00117 inline void releaseResources();
00118 void marshalTuple(TupleData const &inputTuple);
00119 void aggAndMarshalTuple(TupleData const &inputTuple);
00120 void close();
00121 };
00122
00123 class FENNEL_HASHEXE_EXPORT LhxPartitionReader
00124 {
00128 SharedLhxPartition srcPartition;
00129
00133 SharedSegInputStream pSegInputStream;
00134
00138 TupleAccessor tupleAccessor;
00139
00143 uint tupleStorageLength;
00144
00145 bool srcIsInputStream;
00146 ExecStreamBufState bufState;
00147 TupleDescriptor outputTupleDesc;
00148
00154 SharedExecStreamBufAccessor streamBufAccessor;
00155
00156 public:
00157 void open(
00158 SharedLhxPartition srcPartition,
00159 LhxHashInfo const &hashInfo);
00160
00161 bool isTupleConsumptionPending();
00162 bool demandData();
00163 void unmarshalTuple(TupleData &outputTuple);
00164 void consumeTuple();
00165 void close();
00166 inline ExecStreamBufState getState() const;
00167 inline TupleDescriptor const &getTupleDesc() const;
00168 inline SharedLhxPartition getSourcePartition() const;
00169 };
00170
00171
00172
00173
00174
00175
00176 enum LhxPartitionState {
00177 PartitionUnderflow, PartitionEndOfData
00178 };
00179
00180 struct LhxPartitionInfo
00181 {
00185 LhxHashTableReader *hashTableReader;
00186
00197 LhxPartitionReader probeReader;
00198 LhxPartitionReader *reader;
00199
00203 TupleData buildTuple;
00204
00205
00206
00207
00208
00220 vector<SharedLhxPartitionWriter> writerList;
00221 vector<SharedLhxPartition> destPartitionList;
00222
00223
00224 vector<shared_ptr<dynamic_bitset<> > > joinFilterList;
00225 shared_array<uint> filteredRowCountList;
00226
00227
00228
00229
00230
00231
00232
00233 vector<shared_array<uint> > subPartStatList;
00234
00235 uint numInputs;
00236 uint curInputIndex;
00237
00238 LhxHashInfo *hashInfo;
00239
00240
00241
00242
00243
00244 bool partitionMemory;
00245
00246 LhxPartitionInfo()
00247 {
00248 reader = NULL;
00249 hashTableReader = NULL;
00250 }
00251
00252
00253
00257 void init(LhxHashInfo *hashInfoInit);
00258
00266 void open(
00267 LhxHashTableReader *hashTableReaderInit,
00268 LhxPartitionReader *buildReader,
00269 TupleData &buildTuple,
00270 SharedLhxPartition probePartition,
00271 uint buildInputIndex);
00272
00279 void open(
00280 LhxHashTableReader *hashTableReaderInit,
00281 LhxPartitionReader *buildReader,
00282 TupleData &buildTuple,
00283 AggComputerList *aggList);
00284
00288 void close();
00289 };
00290
00291 class FENNEL_HASHEXE_EXPORT LhxPlan
00292 : public enable_shared_from_this<LhxPlan>
00293 {
00294 uint partitionLevel;
00295 vector<SharedLhxPartition> partitions;
00296 shared_array<uint> joinSideToInputMap;
00297
00298 shared_ptr<dynamic_bitset<> > joinFilter;
00299 shared_array<uint> filteredRowCount;
00300
00301
00302
00303
00304
00305
00306 shared_array<uint> subPartToChildMap;
00307 vector<shared_array<uint> > childPartSize;
00308
00309 shared_array<uint> inputSize;
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322 WeakLhxPlan parentPlan;
00323 SharedLhxPlan firstChildPlan;
00324 SharedLhxPlan siblingPlan;
00325
00329 inline void addSibling(SharedLhxPlan siblingPlan);
00330
00336 void mapSubPartToChild(vector<shared_array<uint> > &subPartStats);
00337
00342 uint calculateChildIndex(uint hashKey, uint curInputIndex);
00343
00344 inline bool isBuildChildPart(uint childPartIndex);
00345
00346 inline bool isProbeChildPart(uint childPartIndex);
00347
00348 inline uint getBuildChildPart(uint childPartIndex);
00349
00350 inline uint getProbeChildPart(uint childPartIndex);
00351
00352 public:
00353
00354
00355
00356
00357
00358 static const uint LhxSubPartCount = 16;
00359 static const uint LhxChildPartCount = 3;
00360
00364 void init(
00365 WeakLhxPlan parentPlanInit,
00366 uint partitionLevelInit,
00367 vector<SharedLhxPartition> &partitionsInit,
00368 bool enableSubPartStat);
00369
00373 void init(
00374 WeakLhxPlan parentPlanInit,
00375 uint partitionLevelInit,
00376 vector<SharedLhxPartition> &partitionsInit,
00377 vector<shared_array<uint> > &subPartStats,
00378 shared_ptr<dynamic_bitset<> > filterInit,
00379 VectorOfUint &filteredRowsInit,
00380 bool enableSubPartStat,
00381 bool enableSwing);
00382
00383
00387 LhxPartitionState generatePartitions(
00388 LhxHashInfo const &hashInfo,
00389 LhxPartitionInfo &partInfo);
00390
00395 void createChildren(LhxHashInfo const &hashInfo, bool enableSubPartStat);
00396
00400 void createChildren(
00401 LhxPartitionInfo &partInfo,
00402 bool enableSubPartStat,
00403 bool enableSwing);
00404
00408 inline uint getPartitionLevel();
00409
00413 inline SharedLhxPartition getBuildPartition();
00414 inline SharedLhxPartition getProbePartition();
00415 inline SharedLhxPartition getPartition(uint inputIndex);
00416
00417
00418
00419
00420
00421 inline uint getBuildInput();
00422 inline uint getProbeInput();
00423
00424
00425
00426
00427
00428
00429 inline uint getJoinSide(uint inputIndex);
00430
00434 inline SharedLhxPlan getFirstChild();
00435
00439 LhxPlan *getFirstLeaf();
00440
00444 LhxPlan *getNextLeaf();
00445
00450 void close();
00451
00457 string toString();
00458 };
00459
00460 inline LhxPartition::LhxPartition(ExecStream *pExecStreamInit)
00461 {
00462 pExecStream = pExecStreamInit;
00463 }
00464
00465 inline ExecStreamBufState LhxPartitionReader::getState() const
00466 {
00467 if (srcIsInputStream) {
00468 return streamBufAccessor->getState();
00469 } else {
00470 return bufState;
00471 }
00472 }
00473
00474 inline SharedLhxPartition LhxPartitionReader::getSourcePartition() const
00475 {
00476 return srcPartition;
00477 }
00478
00479 inline TupleDescriptor const &LhxPartitionReader::getTupleDesc() const
00480 {
00481 return outputTupleDesc;
00482 }
00483
00484 inline void LhxPartitionWriter::allocateResources()
00485 {
00486 bool status = hashTable.allocateResources();
00487 assert(status);
00488 }
00489
00490 inline void LhxPartitionWriter::releaseResources()
00491 {
00492 hashTable.releaseResources();
00493 }
00494
00495 inline void LhxPlan::addSibling(SharedLhxPlan siblingPlanInit)
00496 {
00497 siblingPlan = siblingPlanInit;
00498 }
00499
00500 inline SharedLhxPlan LhxPlan::getFirstChild()
00501 {
00502 return firstChildPlan;
00503 }
00504
00505 inline uint LhxPlan::getPartitionLevel()
00506 {
00507 return partitionLevel;
00508 }
00509
00510 inline uint LhxPlan::getProbeInput()
00511 {
00512 return joinSideToInputMap[0];
00513 }
00514
00515 inline uint LhxPlan::getBuildInput()
00516 {
00517 return joinSideToInputMap[partitions.size() - 1];
00518 }
00519
00520 inline SharedLhxPartition LhxPlan::getProbePartition()
00521 {
00522 return partitions[getProbeInput()];
00523 }
00524
00525 inline SharedLhxPartition LhxPlan::getBuildPartition()
00526 {
00527 return partitions[getBuildInput()];
00528 }
00529
00530 inline SharedLhxPartition LhxPlan::getPartition(uint inputIndex)
00531 {
00532 return partitions[inputIndex];
00533 }
00534
00535 inline uint LhxPlan::getJoinSide(uint inputIndex)
00536 {
00537 uint i = 0;
00538 while ((joinSideToInputMap[i] != inputIndex)
00539 && (i < partitions.size()))
00540 {
00541 i ++;
00542 }
00543
00544 return i;
00545 }
00546
00547 inline bool LhxPlan::isBuildChildPart(uint childPartIndex)
00548 {
00549 return ((childPartIndex / LhxChildPartCount) == getBuildInput());
00550 }
00551
00552 inline bool LhxPlan::isProbeChildPart(uint childPartIndex)
00553 {
00554 return ((childPartIndex / LhxChildPartCount) == getProbeInput());
00555 }
00556
00557 inline uint LhxPlan::getBuildChildPart(uint childPartIndex)
00558 {
00559 return ((childPartIndex % LhxChildPartCount) +
00560 getBuildInput() * LhxChildPartCount);
00561 }
00562
00563 inline uint LhxPlan::getProbeChildPart(uint childPartIndex)
00564 {
00565 return ((childPartIndex % LhxChildPartCount) +
00566 getProbeInput() * LhxChildPartCount);
00567 }
00568
00569 FENNEL_END_NAMESPACE
00570
00571 #endif
00572
00573