00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef Fennel_LhxJoinExecStream_Included
00024 #define Fennel_LhxJoinExecStream_Included
00025
00026 #include "fennel/exec/ConfluenceExecStream.h"
00027 #include "fennel/hashexe/LhxHashBase.h"
00028 #include "fennel/hashexe/LhxHashTable.h"
00029 #include "fennel/hashexe/LhxPartition.h"
00030
00031 using namespace boost;
00032
00033 FENNEL_BEGIN_NAMESPACE
00034
00042 struct LhxJoinExecStreamParams : public ConfluenceExecStreamParams
00043 {
00047 SharedSegment pTempSegment;
00048
00052 bool enableSubPartStat;
00053
00058 uint forcePartitionLevel;
00059
00060
00061
00062
00069 RecordNum cndKeys;
00070
00074 RecordNum numRows;
00075
00079 bool leftInner;
00080
00084 bool leftOuter;
00085
00089 bool rightInner;
00090
00094 bool rightOuter;
00095
00099 TupleProjection leftKeyProj;
00100
00104 TupleProjection rightKeyProj;
00105
00109 TupleProjection filterNullKeyProj;
00110
00115 TupleProjection outputProj;
00116
00125 bool setopDistinct;
00126 bool setopAll;
00127
00131 bool enableJoinFilter;
00132
00136 bool enableSwing;
00137 };
00138
00139 class FENNEL_HASHEXE_EXPORT LhxJoinExecStream
00140 : public ConfluenceExecStream
00141 {
00142
00143
00144
00145 enum LhxDefaultJoinInputIndex {
00146 DefaultProbeInputIndex = 0, DefaultBuildInputIndex = 1
00147 };
00148
00149 enum LhxJoinState {
00150 ForcePartitionBuild, Build, Probe,
00151 ProduceBuild, ProducePending,
00152 Partition, CreateChildPlan, GetNextPlan, Done
00153 };
00154
00158 shared_array<TupleData> inputTuple;
00159 shared_array<uint> inputTupleSize;
00160
00164 TupleData outputTuple;
00165
00169 uint numTuplesProduced;
00170
00174 LhxHashInfo hashInfo;
00175
00179 LhxHashTable hashTable;
00180 LhxHashTableReader hashTableReader;
00181
00186 BlockNum numBlocksHashTable;
00187
00191 uint numMiscCacheBlocks;
00192
00193
00194
00195
00196 bool isTopPlan;
00197 SharedLhxPlan rootPlan;
00198 LhxPlan *curPlan;
00199
00204 LhxPartitionInfo partInfo;
00205
00209 SharedLhxPartition buildPart;
00210 SharedLhxPartition probePart;
00211
00215 LhxPartitionReader buildReader;
00216 LhxPartitionReader probeReader;
00217
00221 bool enableSubPartStat;
00222
00226 bool enableSwing;
00227
00232 uint forcePartitionLevel;
00233
00234
00235
00236
00237 LhxJoinState joinState;
00238
00242 vector<LhxJoinState> nextState;
00243
00244
00245
00246
00247 shared_ptr<dynamic_bitset<> > joinType;
00248
00256 bool regularJoin;
00257 bool setopDistinct;
00258 bool setopAll;
00259
00263 virtual void closeImpl();
00264
00265
00266
00267
00268 void setJoinType(LhxJoinExecStreamParams const ¶ms);
00269
00270
00271
00272
00273 void setHashInfo(LhxJoinExecStreamParams const ¶ms);
00274
00275
00276
00277
00278
00279 inline bool returnProbeInner(LhxPlan *curPlan = NULL);
00280
00281
00282
00283
00284 inline bool returnBuildInner(LhxPlan *curPlan = NULL);
00285
00286
00287
00288
00289 inline bool returnProbeOuter(LhxPlan *curPlan = NULL);
00290
00291
00292
00293
00294 inline bool returnBuildOuter(LhxPlan *curPlan = NULL);
00295
00296
00297
00298
00299 inline bool returnInner(LhxPlan *curPlan = NULL);
00300
00301
00302
00303
00304 inline bool returnProbe(LhxPlan *curPlan = NULL);
00305
00306
00307
00308
00309 inline bool returnBuild(LhxPlan *curPlan = NULL);
00310
00311 public:
00312
00313
00314
00315 virtual void prepare(LhxJoinExecStreamParams const ¶ms);
00316
00317 virtual void open(bool restart);
00318
00319 virtual ExecStreamResult execute(ExecStreamQuantum const &quantum);
00320
00321 virtual void getResourceRequirements(
00322 ExecStreamResourceQuantity &minQuantity,
00323 ExecStreamResourceQuantity &optQuantity,
00324 ExecStreamResourceSettingType &optType);
00325
00326 virtual void setResourceAllocation(
00327 ExecStreamResourceQuantity &quantity);
00328 };
00329
00330 inline bool LhxJoinExecStream::returnProbeInner(LhxPlan *curPlan)
00331 {
00332 uint probeInput = (curPlan == NULL) ? 0 : curPlan->getProbeInput();
00333 return joinType->test(probeInput * 2 + 0);
00334 }
00335
00336 inline bool LhxJoinExecStream::returnBuildInner(LhxPlan *curPlan)
00337 {
00338 uint buildInput = (curPlan == NULL) ? 1 : curPlan->getBuildInput();
00339 return joinType->test(buildInput * 2 + 0);
00340 }
00341
00342 inline bool LhxJoinExecStream::returnProbeOuter(LhxPlan *curPlan)
00343 {
00344 uint probeInput = (curPlan == NULL) ? 0 : curPlan->getProbeInput();
00345 return joinType->test(probeInput * 2 + 1);
00346 }
00347
00348 inline bool LhxJoinExecStream::returnBuildOuter(LhxPlan *curPlan)
00349 {
00350 uint buildInput = (curPlan == NULL) ? 1 : curPlan->getBuildInput();
00351 return joinType->test(buildInput * 2 + 1);
00352 }
00353
00354 inline bool LhxJoinExecStream::returnInner(LhxPlan *curPlan)
00355 {
00356 return (returnProbeInner(curPlan) && returnBuildInner(curPlan));
00357 }
00358
00359 inline bool LhxJoinExecStream::returnProbe(LhxPlan *curPlan)
00360 {
00361 return (returnProbeInner(curPlan) || returnProbeOuter(curPlan));
00362 }
00363
00364 inline bool LhxJoinExecStream::returnBuild(LhxPlan *curPlan)
00365 {
00366 return (returnBuildInner(curPlan) || returnBuildOuter(curPlan));
00367 }
00368
00369 FENNEL_END_NAMESPACE
00370
00371 #endif
00372
00373