LhxJoinExecStream Class Reference

#include <LhxJoinExecStream.h>

Inheritance diagram for LhxJoinExecStream:

ConfluenceExecStream SingleOutputExecStream ExecStream ClosableObject TraceSource ErrorSource List of all members.

Public Member Functions

virtual void prepare (LhxJoinExecStreamParams const &params)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void prepare (ConfluenceExecStreamParams const &params)
virtual void prepare (SingleOutputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Protected Attributes

std::vector< SharedExecStreamBufAccessorinAccessors
SharedExecStreamBufAccessor pOutAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose

Private Types

enum  LhxDefaultJoinInputIndex { DefaultProbeInputIndex = 0, DefaultBuildInputIndex = 1 }
enum  LhxJoinState {
  ForcePartitionBuild, Build, Probe, ProduceBuild,
  ProducePending, Partition, CreateChildPlan, GetNextPlan,
  Done
}

Private Member Functions

virtual void closeImpl ()
 implement ExecStream
void setJoinType (LhxJoinExecStreamParams const &params)
void setHashInfo (LhxJoinExecStreamParams const &params)
bool returnProbeInner (LhxPlan *curPlan=NULL)
bool returnBuildInner (LhxPlan *curPlan=NULL)
bool returnProbeOuter (LhxPlan *curPlan=NULL)
bool returnBuildOuter (LhxPlan *curPlan=NULL)
bool returnInner (LhxPlan *curPlan=NULL)
bool returnProbe (LhxPlan *curPlan=NULL)
bool returnBuild (LhxPlan *curPlan=NULL)

Private Attributes

shared_array< TupleDatainputTuple
 Input tuple.
shared_array< uintinputTupleSize
TupleData outputTuple
 TupleData to assemble the output tuple.
uint numTuplesProduced
 Number of tuples produced within the current quantum.
LhxHashInfo hashInfo
 Hash join info.
LhxHashTable hashTable
 HashTable to use.
LhxHashTableReader hashTableReader
BlockNum numBlocksHashTable
 Initial estimate of blocks required.
uint numMiscCacheBlocks
 Number of cache blocks set aside for I/O.
bool isTopPlan
SharedLhxPlan rootPlan
LhxPlancurPlan
LhxPartitionInfo partInfo
 Partition context used in recursive partitioning.
SharedLhxPartition buildPart
 The build partition(which is also the only partition).
SharedLhxPartition probePart
LhxPartitionReader buildReader
 Partition reader.
LhxPartitionReader probeReader
bool enableSubPartStat
 whether to use sub partition stats.
bool enableSwing
 Whether to use swing based on input sizes.
uint forcePartitionLevel
 This is set only in tests.
LhxJoinState joinState
vector< LhxJoinStatenextState
 The next state of the JoinExecStream.
shared_ptr< dynamic_bitset<> > joinType
bool regularJoin
 regularJoin: do not match NULLs, and do not remove duplicates in inputs.
bool setopDistinct
bool setopAll

Detailed Description

Definition at line 139 of file LhxJoinExecStream.h.


Member Enumeration Documentation

enum LhxJoinExecStream::LhxDefaultJoinInputIndex [private]

Enumerator:
DefaultProbeInputIndex 
DefaultBuildInputIndex 

Definition at line 145 of file LhxJoinExecStream.h.

00145                                   {
00146         DefaultProbeInputIndex = 0, DefaultBuildInputIndex = 1
00147     };

enum LhxJoinExecStream::LhxJoinState [private]

Enumerator:
ForcePartitionBuild 
Build 
Probe 
ProduceBuild 
ProducePending 
Partition 
CreateChildPlan 
GetNextPlan 
Done 

Definition at line 149 of file LhxJoinExecStream.h.


Member Function Documentation

void LhxJoinExecStream::closeImpl (  )  [private, virtual]

implement ExecStream

Reimplemented from ExecStream.

Definition at line 713 of file LhxJoinExecStream.cpp.

References ExecStream::closeImpl(), hashTable, LhxHashTable::releaseResources(), and rootPlan.

00714 {
00715     hashTable.releaseResources();
00716     if (rootPlan) {
00717         rootPlan->close();
00718         rootPlan.reset();
00719     }
00720     ConfluenceExecStream::closeImpl();
00721 }

void LhxJoinExecStream::setJoinType ( LhxJoinExecStreamParams const &  params  )  [private]

Definition at line 723 of file LhxJoinExecStream.cpp.

References joinType, LhxJoinExecStreamParams::leftInner, LhxJoinExecStreamParams::leftOuter, regularJoin, returnBuild(), returnProbeInner(), returnProbeOuter(), LhxJoinExecStreamParams::rightInner, LhxJoinExecStreamParams::rightOuter, setopAll, LhxJoinExecStreamParams::setopAll, setopDistinct, and LhxJoinExecStreamParams::setopDistinct.

Referenced by prepare().

00725 {
00726     /*
00727      * Join types currently supported:
00728      *
00729      * Inner, Left Outer, Right Outer, Full Outer
00730      * Right Anti(return non-matching rows from the build side)
00731      * Left Semi(return matching rows from the probe side)
00732      *
00733      * These join types are marked by using the above four parameters. Each
00734      * specify whether to return matching or nonmatching tuples from a join
00735      * input.
00736      *
00737      * LeftInner LeftOuter RightInner RightOuter   Join Type
00738      *     F         T          F          F      (Left Anti)
00739      *     F         F          F          T       Right Anti
00740      *     T         F          F          F       Left Semi
00741      *     F         F          T          F      (Right Semi)
00742      *     T         F          T          F       Inner Join
00743      *     T         F          T          T       Right Outer
00744      *     T         T          T          F       Left Outer
00745      *     T         T          T          T       Full Outer
00746      * Note join types in () are not visiible in optimizer plan.
00747      */
00748 
00749     joinType.reset(new dynamic_bitset<>(4));
00750 
00751     joinType->set(0, params.leftInner);
00752     joinType->set(1, params.leftOuter);
00753     joinType->set(2, params.rightInner);
00754     joinType->set(3, params.rightOuter);
00755 
00756     /*
00757      * By construction, at most one of the above six is true for a combination
00758      * of values.
00759      * Now make sure at least one of them is true.
00760      * Otherwise, the optimizer has passed in a join type not supported by this
00761      * join implementation.
00762      */
00763     assert (joinType->count() != 0);
00764 
00765     regularJoin   = !params.setopDistinct && !params.setopAll;
00766     setopDistinct =  params.setopDistinct && !params.setopAll;
00767     setopAll      = !params.setopDistinct &&  params.setopAll;
00768 
00769     assert (!setopAll && (regularJoin || setopDistinct));
00770 
00771     /*
00772      * Anit joins with duplicate removal needs to use hash table to remove
00773      * duplicated non-matching tuples. Hence the anti side needs to be the
00774      * build input(original right side).
00775      */
00776     bool leftAnti =
00777         (returnProbeOuter() && !returnProbeInner() && !returnBuild());
00778 
00779     assert (!(leftAnti && setopDistinct));
00780 }

void LhxJoinExecStream::setHashInfo ( LhxJoinExecStreamParams const &  params  )  [private]

Definition at line 782 of file LhxJoinExecStream.cpp.

References LhxJoinExecStreamParams::cndKeys, LhxHashInfo::cndKeys, LhxHashInfo::dataProj, DefaultBuildInputIndex, DefaultProbeInputIndex, LhxJoinExecStreamParams::enableJoinFilter, LhxHashInfo::externalSegmentAccessor, LhxHashInfo::filterNull, LhxHashInfo::filterNullKeyProj, LhxJoinExecStreamParams::filterNullKeyProj, HASH_TRIM_NONE, HASH_TRIM_UNICODE_VARCHAR, HASH_TRIM_VARCHAR, hashInfo, ConfluenceExecStream::inAccessors, LhxHashInfo::inputDesc, LhxHashInfo::isKeyColVarChar, LhxHashInfo::keyProj, LhxJoinExecStreamParams::leftKeyProj, LhxHashInfo::memSegmentAccessor, LhxJoinExecStreamParams::numRows, LhxHashInfo::numRows, SegmentAccessor::pCacheAccessor, TupleProjection::projectFrom(), SegmentAccessor::pSegment, LhxJoinExecStreamParams::pTempSegment, regularJoin, LhxHashInfo::removeDuplicate, returnBuild(), returnBuildInner(), returnBuildOuter(), returnProbe(), returnProbeInner(), returnProbeOuter(), LhxJoinExecStreamParams::rightKeyProj, setopDistinct, STANDARD_TYPE_UNICODE_VARCHAR, STANDARD_TYPE_VARCHAR, LhxHashInfo::streamBufAccessor, and LhxHashInfo::useJoinFilter.

Referenced by prepare().

00784 {
00785     uint numInputs = inAccessors.size();
00786     for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
00787         hashInfo.streamBufAccessor.push_back(inAccessors[inputIndex]);
00788         hashInfo.inputDesc.push_back(
00789             inAccessors[inputIndex]->getTupleDesc());
00790         /*
00791          * set(distinct) matching operations eliminate duplicates.
00792          */
00793         hashInfo.removeDuplicate.push_back(setopDistinct);
00794         hashInfo.numRows.push_back(params.numRows);
00795         hashInfo.cndKeys.push_back(params.cndKeys);
00796     }
00797 
00798     bool leftSemi =
00799         (returnProbeInner() && !returnProbeOuter() && !returnBuild());
00800 
00801     bool rightSemi =
00802         (returnBuildInner() && !returnBuildOuter() && !returnProbe());
00803 
00804     /*
00805      * removeDuplicate is no longer a feature for set op only. For semi joins,
00806      * e.g. IN predicates, the lookup table needs to eliminate duplicates as
00807      * well. The way this is achieved is different for LEFTSEMI and RIGHTSEMI.
00808      * For LEFTSEMI, if the join output row for a matched tuple from the left
00809      * does not include any matched tuples from the right(the build side), then
00810      * at most one output tuple is returned per left tuple. (See comment in
00811      * execute() state Probe).
00812      * For RIGHTSEMI, however, all the matched tuples from the build side need
00813      * to be returned and only once. This is done by checking the "matched"
00814      * flag per left tuple. If the same key has not been matched before, then
00815      * return all matching tuples from the RHS. Otherwise, discard(and return
00816      * nothing from the RHS) and go to the next tuple on the left. In this
00817      * case, the call to findKey() in execute():Probe needs to pass true for
00818      * parameter removeDuplicateProbe.
00819      *
00820      */
00821     if (leftSemi) {
00822         hashInfo.removeDuplicate[DefaultBuildInputIndex] = true;
00823     }
00824 
00825     if (rightSemi) {
00826         hashInfo.removeDuplicate[DefaultProbeInputIndex] = true;
00827     }
00828 
00829     /*
00830      * Nulls do not join, unless in set operation.
00831      * Filter null values if non-matching tuples are not needed.
00832      */
00833     hashInfo.filterNull.push_back(regularJoin && !returnProbeOuter());
00834     hashInfo.filterNull.push_back(regularJoin && !returnBuildOuter());
00835 
00836     hashInfo.keyProj.push_back(params.leftKeyProj);
00837     hashInfo.keyProj.push_back(params.rightKeyProj);
00838 
00839     TupleProjection filterNullLeftKeyProj;
00840     TupleProjection filterNullRightKeyProj;
00841 
00842     // only filter null on join sides from which non-joining tuples will not
00843     // need to be returned
00844     filterNullLeftKeyProj.projectFrom(
00845         params.leftKeyProj, params.filterNullKeyProj);
00846 
00847     filterNullRightKeyProj.projectFrom(
00848         params.rightKeyProj, params.filterNullKeyProj);
00849 
00850     hashInfo.filterNullKeyProj.push_back(filterNullLeftKeyProj);
00851     hashInfo.filterNullKeyProj.push_back(filterNullRightKeyProj);
00852 
00853     hashInfo.useJoinFilter.push_back(
00854         params.enableJoinFilter && !returnProbeOuter());
00855     hashInfo.useJoinFilter.push_back(
00856         params.enableJoinFilter && !returnBuildOuter());
00857 
00858     hashInfo.memSegmentAccessor = params.scratchAccessor;
00859     hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00860     hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00861 
00862     for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
00863         TupleProjection &keyProj  = hashInfo.keyProj[inputIndex];
00864         TupleDescriptor &inputDesc  = hashInfo.inputDesc[inputIndex];
00865 
00866         vector<LhxHashTrim> isKeyVarChar;
00867         TupleProjection dataProj;
00868 
00869         /*
00870          * Hashing is special for varchar types(the trailing blanks are
00871          * insignificant).
00872          */
00873         for (int j = 0; j < keyProj.size(); j ++) {
00874             StoredTypeDescriptor::Ordinal ordinal =
00875                 inputDesc[keyProj[j]].pTypeDescriptor->getOrdinal();
00876             if (ordinal == STANDARD_TYPE_VARCHAR) {
00877                 isKeyVarChar.push_back(HASH_TRIM_VARCHAR);
00878             } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) {
00879                 isKeyVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR);
00880             } else {
00881                 isKeyVarChar.push_back(HASH_TRIM_NONE);
00882             }
00883         }
00884 
00885         hashInfo.isKeyColVarChar.push_back(isKeyVarChar);
00886 
00887         /*
00888          * Need to construct a covering set of keys; for example:
00889          * keyProj (3,4,2,3) should have a covering set of (3,4,2);
00890          */
00891         for (int i = 0; i < inputDesc.size(); i ++) {
00892             /*
00893              * Okay a dumb for loop to search for key columns.
00894              */
00895             bool colIsKey = false;
00896             for (int j = 0; j < keyProj.size(); j ++) {
00897                 if (i == keyProj[j]) {
00898                     colIsKey = true;
00899                     break;
00900                 }
00901             }
00902             if (!colIsKey) {
00903                 dataProj.push_back(i);
00904             }
00905         }
00906         hashInfo.dataProj.push_back(dataProj);
00907     }
00908 }

bool LhxJoinExecStream::returnProbeInner ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 330 of file LhxJoinExecStream.h.

References curPlan, LhxPlan::getProbeInput(), and joinType.

Referenced by execute(), prepare(), returnInner(), returnProbe(), setHashInfo(), and setJoinType().

00331 {
00332     uint probeInput = (curPlan == NULL) ? 0 : curPlan->getProbeInput();
00333     return joinType->test(probeInput * 2 + 0);
00334 }

bool LhxJoinExecStream::returnBuildInner ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 336 of file LhxJoinExecStream.h.

References curPlan, LhxPlan::getBuildInput(), and joinType.

Referenced by execute(), prepare(), returnBuild(), returnInner(), and setHashInfo().

00337 {
00338     uint buildInput = (curPlan == NULL) ? 1 : curPlan->getBuildInput();
00339     return joinType->test(buildInput * 2 + 0);
00340 }

bool LhxJoinExecStream::returnProbeOuter ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 342 of file LhxJoinExecStream.h.

References curPlan, LhxPlan::getProbeInput(), and joinType.

Referenced by execute(), prepare(), returnProbe(), setHashInfo(), and setJoinType().

00343 {
00344     uint probeInput = (curPlan == NULL) ? 0 : curPlan->getProbeInput();
00345     return joinType->test(probeInput * 2 + 1);
00346 }

bool LhxJoinExecStream::returnBuildOuter ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 348 of file LhxJoinExecStream.h.

References curPlan, LhxPlan::getBuildInput(), and joinType.

Referenced by execute(), prepare(), returnBuild(), and setHashInfo().

00349 {
00350     uint buildInput = (curPlan == NULL) ? 1 : curPlan->getBuildInput();
00351     return joinType->test(buildInput * 2 + 1);
00352 }

bool LhxJoinExecStream::returnInner ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 354 of file LhxJoinExecStream.h.

References curPlan, returnBuildInner(), and returnProbeInner().

00355 {
00356     return (returnProbeInner(curPlan) && returnBuildInner(curPlan));
00357 }

bool LhxJoinExecStream::returnProbe ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 359 of file LhxJoinExecStream.h.

References curPlan, returnProbeInner(), and returnProbeOuter().

Referenced by execute(), prepare(), and setHashInfo().

00360 {
00361     return (returnProbeInner(curPlan) || returnProbeOuter(curPlan));
00362 }

bool LhxJoinExecStream::returnBuild ( LhxPlan curPlan = NULL  )  [inline, private]

Definition at line 364 of file LhxJoinExecStream.h.

References curPlan, returnBuildInner(), and returnBuildOuter().

Referenced by execute(), prepare(), setHashInfo(), and setJoinType().

00365 {
00366     return (returnBuildInner(curPlan) || returnBuildOuter(curPlan));
00367 }

void LhxJoinExecStream::prepare ( LhxJoinExecStreamParams const &  params  )  [virtual]

Definition at line 33 of file LhxJoinExecStream.cpp.

References LhxHashTable::calculateSize(), TupleData::compute(), DefaultBuildInputIndex, LhxJoinExecStreamParams::enableSubPartStat, enableSubPartStat, LhxJoinExecStreamParams::enableSwing, enableSwing, LhxJoinExecStreamParams::forcePartitionLevel, forcePartitionLevel, hashInfo, hashTable, ConfluenceExecStream::inAccessors, inputTuple, inputTupleSize, LhxJoinExecStreamParams::leftKeyProj, LhxPlan::LhxChildPartCount, numBlocksHashTable, numMiscCacheBlocks, LhxJoinExecStreamParams::outputProj, outputTuple, SingleOutputExecStream::pOutAccessor, ConfluenceExecStream::prepare(), TupleDescriptor::projectFrom(), returnBuild(), returnBuildInner(), returnBuildOuter(), returnProbe(), returnProbeInner(), returnProbeOuter(), LhxJoinExecStreamParams::rightKeyProj, setHashInfo(), setJoinType(), and setopDistinct.

00035 {
00036     assert (params.leftKeyProj.size() == params.rightKeyProj.size());
00037 
00038     ConfluenceExecStream::prepare(params);
00039 
00040     setJoinType(params);
00041     setHashInfo(params);
00042 
00043     uint numInputs = inAccessors.size();
00044 
00045     inputTuple.reset(new TupleData[2]);
00046     inputTupleSize.reset(new uint[2]);
00047 
00048     for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
00049         inputTuple[inputIndex].compute(
00050             inAccessors[inputIndex]->getTupleDesc());
00051         inputTupleSize[inputIndex] = inputTuple[inputIndex].size();
00052     }
00053 
00054     /*
00055      * Force partitioning level. Only set in tests.
00056      */
00057     forcePartitionLevel = params.forcePartitionLevel;
00058     enableSubPartStat = params.enableSubPartStat;
00059 
00060     /*
00061      * NOTE: currently anti joins that need to remove duplicates can not
00062      * switch join sides(join then
00063      * effectively becomes LeftAnti) because the hash table is used to remove
00064      * duplicated non-matched tuples. The "Anti" side has to be the build side.
00065      * It is difficult, thought not impossible,
00066      * to remove duplicates of non-matched tuples from the probe side.
00067      *
00068      * One approach to solve this is to insert the non-matched probe tuple into
00069      * the hash table, mark it as matched, and return this tuple. Subsequent
00070      * identical probe tuple will see the tuple as a "match" and will not
00071      * return the tuple(hence satisfy the anti join semantics).
00072      * This scheme also works when the hash table overflows. Tuples in the
00073      * hash table, including the probe tuples inserted, will be partitioned
00074      * as child partitions of the build input.
00075      * The remaining probe input, together with all the matched tuples from the
00076      * hash table, will be partitioned to disk as the children of the probe
00077      * input. Note matched tuples are stored in both input.
00078      *
00079      * This partitioning scheme makes sure that the join result is correct
00080      * using the above described LeftAnti join algorithm or the already
00081      * implemented RightAnti join algorithm, regardless of input assignment for
00082      * the next partition level.
00083      *
00084      * RightAnti join without duplicate removal can use swing.
00085      * LeftAnti join without duplicate removal can use swing.
00086      *
00087      * RightAnti join with duplicate removal cannot use swing.
00088      * LeftAnti join with duplicate removal is not supported(see setJoinType())
00089      *
00090      */
00091     bool leftAntiJoin =
00092         (returnProbeOuter() && !returnProbeInner() && !returnBuild());
00093 
00094     bool rightAntiJoin =
00095         (returnBuildOuter() && !returnBuildInner() && !returnProbe());
00096 
00097     bool antiJoin = leftAntiJoin || rightAntiJoin;
00098 
00099     enableSwing = params.enableSwing && (!(antiJoin && setopDistinct));
00100 
00101     /*
00102      * Calculate the number of blocks required to perform the join, as given by
00103      * the optimizer, completely in memory.
00104      */
00105     hashTable.calculateSize(
00106         hashInfo,
00107         DefaultBuildInputIndex,
00108         numBlocksHashTable);
00109 
00110     TupleDescriptor outputDesc;
00111 
00112     if (params.outputProj.size() != 0) {
00113         outputDesc.projectFrom(params.outputTupleDesc, params.outputProj);
00114     } else {
00115         outputDesc = params.outputTupleDesc;
00116     }
00117 
00118     outputTuple.compute(outputDesc);
00119 
00120     assert (outputTuple.size() == (inputTupleSize[0] + inputTupleSize[1]) ||
00121         outputTuple.size() == inputTupleSize[0]||
00122         outputTuple.size() == inputTupleSize[1]);
00123 
00124     pOutAccessor->setTupleShape(outputDesc);
00125 
00126     /*
00127      * Set aside one cache block per child partition writer for I/O
00128      */
00129     numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs;
00130 }

void LhxJoinExecStream::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from ConfluenceExecStream.

Definition at line 159 of file LhxJoinExecStream.cpp.

References LhxHashTable::allocateResources(), Build, buildPart, buildReader, curPlan, DefaultBuildInputIndex, DefaultProbeInputIndex, enableSubPartStat, enableSwing, ForcePartitionBuild, forcePartitionLevel, LhxPlan::getBuildInput(), LhxPlan::getBuildPartition(), LhxPlan::getPartitionLevel(), hashInfo, hashTable, hashTableReader, LhxHashTableReader::init(), LhxHashTable::init(), LhxPartitionInfo::init(), isTopPlan, joinState, nextState, LhxPartitionReader::open(), ConfluenceExecStream::open(), partInfo, probePart, LhxHashTable::releaseResources(), and rootPlan.

00160 {
00161     ConfluenceExecStream::open(restart);
00162 
00163     if (restart) {
00164         hashTable.releaseResources();
00165     };
00166 
00167     uint partitionLevel = 0;
00168 
00169     /*
00170      * Create the root plan.
00171      *
00172      * The execute state machine operates at the plan level.
00173      */
00174     probePart = SharedLhxPartition(new LhxPartition(this));
00175     buildPart = SharedLhxPartition(new LhxPartition(this));
00176 
00177     (probePart->segStream).reset();
00178     probePart->inputIndex = DefaultProbeInputIndex;
00179 
00180     (buildPart->segStream).reset();
00181     buildPart->inputIndex = DefaultBuildInputIndex;
00182 
00183     vector<SharedLhxPartition> partitionList;
00184     partitionList.push_back(probePart);
00185     partitionList.push_back(buildPart);
00186 
00187     vector<shared_array<uint> > subPartStats;
00188     subPartStats.push_back(shared_array<uint>());
00189     subPartStats.push_back(shared_array<uint>());
00190 
00191     shared_ptr<dynamic_bitset<> > joinFilterInit =
00192         shared_ptr<dynamic_bitset<> >();
00193 
00194     VectorOfUint filteredRows;
00195     filteredRows.push_back(0);
00196     filteredRows.push_back(0);
00197 
00198     /*
00199      * No input join filter for root plan.
00200      */
00201     rootPlan =  SharedLhxPlan(new LhxPlan());
00202     rootPlan->init(
00203         WeakLhxPlan(),
00204         partitionLevel,
00205         partitionList,
00206         subPartStats,
00207         joinFilterInit,
00208         filteredRows,
00209         enableSubPartStat,
00210         enableSwing);
00211 
00212     /*
00213      * Initialize recursive partitioning context.
00214      */
00215     partInfo.init(&hashInfo);
00216 
00217     curPlan = rootPlan.get();
00218     isTopPlan = true;
00219 
00220     hashTable.init(
00221         curPlan->getPartitionLevel(),
00222         hashInfo,
00223         curPlan->getBuildInput());
00224     hashTableReader.init(&hashTable, hashInfo, curPlan->getBuildInput());
00225 
00226     bool status = hashTable.allocateResources();
00227     assert (status);
00228 
00229     buildReader.open(curPlan->getBuildPartition(), hashInfo);
00230 
00231     joinState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00232     nextState.clear();
00233 }

ExecStreamResult LhxJoinExecStream::execute ( ExecStreamQuantum const &  quantum  )  [virtual]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 235 of file LhxJoinExecStream.cpp.

References LhxHashTable::addTuple(), LhxHashTable::allocateResources(), LhxHashTableReader::bindKey(), LhxHashTableReader::bindUnMatched(), Build, buildReader, ExecStream::checkAbort(), LhxPartitionInfo::close(), LhxPartitionReader::close(), LhxPartitionReader::consumeTuple(), TupleData::containsNull(), CreateChildPlan, LhxPlan::createChildren(), curPlan, LhxPartitionReader::demandData(), Done, enableSubPartStat, enableSwing, EXECBUF_EOS, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, LhxHashInfo::filterNullKeyProj, LhxHashTable::findKey(), ForcePartitionBuild, forcePartitionLevel, LhxPlan::generatePartitions(), LhxPlan::getBuildInput(), LhxPlan::getBuildPartition(), LhxPlan::getFirstChild(), LhxHashTableReader::getNext(), LhxPlan::getNextLeaf(), GetNextPlan, LhxPlan::getPartitionLevel(), LhxPlan::getProbeInput(), LhxPlan::getProbePartition(), LhxPartitionReader::getState(), hashInfo, hashTable, hashTableReader, LhxHashTableReader::init(), LhxHashTable::init(), inputTuple, inputTupleSize, isTopPlan, LhxPartitionReader::isTupleConsumptionPending(), joinState, LhxHashInfo::keyProj, nextState, ExecStreamQuantum::nTuplesMax, numTuplesProduced, LhxPartitionInfo::open(), LhxPartitionReader::open(), outputTuple, partInfo, Partition, PartitionUnderflow, SingleOutputExecStream::pOutAccessor, Probe, probeReader, ProduceBuild, ProducePending, regularJoin, LhxHashTable::releaseResources(), LhxHashInfo::removeDuplicate, returnBuild(), returnBuildInner(), returnBuildOuter(), returnProbe(), returnProbeInner(), returnProbeOuter(), LhxPlan::toString(), TRACE_FINE, and LhxPartitionReader::unmarshalTuple().

00236 {
00237     while (true) {
00238         switch (joinState) {
00239         case ForcePartitionBuild:
00240             {
00241                 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()];
00242 
00243                 /*
00244                  * Build
00245                  */
00246                 for (;;) {
00247                     if (!buildReader.isTupleConsumptionPending()) {
00248                         if (buildReader.getState() == EXECBUF_EOS) {
00249                             /*
00250                              * break out of this loop, and start probing.
00251                              */
00252                             buildReader.close();
00253                             probeReader.open(
00254                                 curPlan->getProbePartition(),
00255                                 hashInfo);
00256                             joinState = Probe;
00257                             numTuplesProduced = 0;
00258                             break;
00259                         }
00260 
00261                         if (!buildReader.demandData()) {
00262                             if (isTopPlan) {
00263                                 /*
00264                                  * Top level: request more data from producer.
00265                                  */
00266                                 return EXECRC_BUF_UNDERFLOW;
00267                             } else {
00268                                 /*
00269                                  * Recursive level: no more data in partition.
00270                                  * Come back to the top of the same state to
00271                                  * detect EOS.
00272                                  */
00273                                 break;
00274                             }
00275                         }
00276                         buildReader.unmarshalTuple(buildTuple);
00277                     }
00278 
00279                     /*
00280                      * Add tuple to hash table.
00281                      *
00282                      * NOTE: This is a testing state. Always partition up to
00283                      * forcePartitionLevel.
00284                      */
00285                     if (curPlan->getPartitionLevel() < forcePartitionLevel ||
00286                         !hashTable.addTuple(buildTuple)) {
00287                         /*
00288                          * If hash table is full, partition input data.
00289                          *
00290                          * First, partition the right(build input).
00291                          */
00292                         partInfo.open(
00293                             &hashTableReader, &buildReader, buildTuple,
00294                             curPlan->getProbePartition(),
00295                             curPlan->getBuildInput());
00296                         joinState = Partition;
00297                         break;
00298                     }
00299                     buildReader.consumeTuple();
00300                 }
00301                 break;
00302             }
00303         case Build:
00304             {
00305                 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()];
00306 
00307                 /*
00308                  * Build
00309                  */
00310                 for (;;) {
00311                     if (!buildReader.isTupleConsumptionPending()) {
00312                         if (buildReader.getState() == EXECBUF_EOS) {
00313                             /*
00314                              * break out of this loop, and start probing.
00315                              */
00316                             buildReader.close();
00317                             probeReader.open(
00318                                 curPlan->getProbePartition(),
00319                                 hashInfo);
00320                             joinState = Probe;
00321                             numTuplesProduced = 0;
00322                             break;
00323                         }
00324 
00325                         if (!buildReader.demandData()) {
00326                             if (isTopPlan) {
00327                                 /*
00328                                  * Top level: request more data from producer.
00329                                  */
00330                                 return EXECRC_BUF_UNDERFLOW;
00331                             } else {
00332                                 /*
00333                                  * Recursive level: no more data in partition.
00334                                  * Come back to the top of the same state to
00335                                  * detect EOS.
00336                                  */
00337                                 break;
00338                             }
00339                         }
00340                         buildReader.unmarshalTuple(buildTuple);
00341                     }
00342 
00343                     /*
00344                      * Add tuple to hash table.
00345                      */
00346                     if (!hashTable.addTuple(buildTuple)) {
00347                         /*
00348                          * If hash table is full, partition input data.
00349                          *
00350                          * First, partition the right(build input).
00351                          */
00352                         partInfo.open(
00353                             &hashTableReader, &buildReader, buildTuple,
00354                             curPlan->getProbePartition(),
00355                             curPlan->getBuildInput());
00356                         joinState = Partition;
00357                         break;
00358                     }
00359                     buildReader.consumeTuple();
00360                 }
00361                 break;
00362             }
00363         case Partition:
00364             {
00365                 for (;;) {
00366                     if (curPlan->generatePartitions(hashInfo, partInfo)
00367                         == PartitionUnderflow) {
00368                         /*
00369                          * Request more data from producer.
00370                          */
00371                         return EXECRC_BUF_UNDERFLOW;
00372                     } else {
00373                         /*
00374                          * Finished building the partitions for both
00375                          * inputs.
00376                          */
00377                         break;
00378                     }
00379                 }
00380                 partInfo.close();
00381                 joinState = CreateChildPlan;
00382                 break;
00383             }
00384         case CreateChildPlan:
00385             {
00386                 /*
00387                  * Link the newly created partitioned in the plan tree.
00388                  */
00389                 curPlan->createChildren(
00390                     partInfo,
00391                     enableSubPartStat,
00392                     enableSwing);
00393 
00394                 FENNEL_TRACE(TRACE_FINE, curPlan->toString());
00395 
00396                 /*
00397                  * now recursice down the plan tree to get the first leaf plan.
00398                  */
00399                 curPlan = curPlan->getFirstChild().get();
00400                 isTopPlan = false;
00401 
00402                 hashTable.releaseResources();
00403 
00404                 hashTable.init(
00405                     curPlan->getPartitionLevel(),
00406                     hashInfo,
00407                     curPlan->getBuildInput());
00408                 hashTableReader.init(
00409                     &hashTable,
00410                     hashInfo,
00411                     curPlan->getBuildInput());
00412 
00413                 bool status = hashTable.allocateResources();
00414                 assert (status);
00415                 buildReader.open(curPlan->getBuildPartition(), hashInfo);
00416 
00417                 joinState =
00418                     (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00419                 nextState.clear();
00420                 break;
00421             }
00422         case GetNextPlan:
00423             {
00424                 hashTable.releaseResources();
00425 
00426                 checkAbort();
00427 
00428                 curPlan = curPlan->getNextLeaf();
00429 
00430                 if (curPlan) {
00431                     hashTable.init(
00432                         curPlan->getPartitionLevel(),
00433                         hashInfo,
00434                         curPlan->getBuildInput());
00435                     hashTableReader.init(
00436                         &hashTable,
00437                         hashInfo,
00438                         curPlan->getBuildInput());
00439 
00440                     bool status = hashTable.allocateResources();
00441                     assert (status);
00442                     buildReader.open(curPlan->getBuildPartition(), hashInfo);
00443                     joinState =
00444                         (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00445                     nextState.clear();
00446                 } else {
00447                     joinState = Done;
00448                 }
00449                 break;
00450             }
00451         case Probe:
00452             {
00453                 TupleData &probeTuple = inputTuple[curPlan->getProbeInput()];
00454                 uint probeTupleSize = inputTupleSize[curPlan->getProbeInput()];
00455                 TupleProjection &probeKeyProj  =
00456                     hashInfo.keyProj[curPlan->getProbeInput()];
00457                 uint buildTupleSize = inputTupleSize[curPlan->getBuildInput()];
00458                 bool removeDuplicateProbe =
00459                     hashInfo.removeDuplicate[curPlan->getProbeInput()];
00460                 TupleProjection &filterNullProbeKeyProj  =
00461                     hashInfo.filterNullKeyProj[curPlan->getProbeInput()];
00462                 bool filterNullProbe = regularJoin;
00463 
00464                 uint probeFieldOffset =
00465                     returnBuild(curPlan) ?
00466                     buildTupleSize * curPlan->getProbeInput() : 0;
00467                 uint buildFieldOffset =
00468                     returnProbe(curPlan) ?
00469                     probeTupleSize * curPlan->getBuildInput() : 0;
00470                 uint probeFieldLength =
00471                     returnProbe(curPlan) ? probeTupleSize : 0;
00472                 uint buildFieldLength =
00473                     returnBuild(curPlan) ? buildTupleSize : 0;
00474 
00475                 /*
00476                  * Probe
00477                  */
00478                 for (;;) {
00479                     if (!probeReader.isTupleConsumptionPending()) {
00480                         if (probeReader.getState() == EXECBUF_EOS) {
00481                             probeReader.close();
00482                             if (returnBuildOuter(curPlan)) {
00483                                 /*
00484                                  * Join types that return non-matching
00485                                  * tuples from the build input:
00486                                  *    RightOuter, FullOuter, RightAnti,
00487                                  *
00488                                  * Set the output tuple to have NULL values on
00489                                  * the left(probe side), and return all the
00490                                  * non-matching tuples in the hash table on the
00491                                  * right.
00492                                  */
00493                                 hashTableReader.bindUnMatched();
00494 
00495                                 /*
00496                                  * fill in the probe side, if required, with
00497                                  * NULLs
00498                                  */
00499                                 for (uint i = 0; i < probeFieldLength; i ++) {
00500                                     outputTuple[i + probeFieldOffset].pData =
00501                                         NULL;
00502                                 }
00503                                 joinState = ProduceBuild;
00504                                 nextState.push_back(GetNextPlan);
00505                             } else {
00506                                 /*
00507                                  * Probing for this plan is done.
00508                                  */
00509                                 joinState = GetNextPlan;
00510                             }
00511                             break;
00512                         }
00513                         if (!probeReader.demandData()) {
00514                             if (isTopPlan) {
00515                                 /*
00516                                  * Top level: request more data from producer.
00517                                  */
00518                                 return EXECRC_BUF_UNDERFLOW;
00519                             } else {
00520                                 /*
00521                                  * Recursive level: no more data in partition.
00522                                  * Come back to the top of the same state to
00523                                  * detect EOS.
00524                                  */
00525                                 break;
00526                             }
00527                         }
00528                         probeReader.unmarshalTuple(probeTuple);
00529                     }
00530 
00531                     PBuffer keyBuf = NULL;
00532 
00533                     /*
00534                      * Try to locate matching key in the hash table.
00535                      * If this tuple does contain null in its key columns, it
00536                      * will not join so hash table lookup is not needed.
00537                      */
00538                     if (!filterNullProbe ||
00539                         !probeTuple.containsNull(filterNullProbeKeyProj)) {
00540                         keyBuf =
00541                             hashTable.findKey(
00542                                 probeTuple,
00543                                 probeKeyProj,
00544                                 removeDuplicateProbe);
00545                     }
00546 
00547                     if (keyBuf) {
00548                         if (returnBuildInner(curPlan)) {
00549                             /*
00550                              * Join types that return matching tuples from both
00551                              * inputs: InnerJoin, LeftOuter, RightOuter,
00552                              * FullOuter
00553                              *
00554                              * Join types that return matching tuples from the
00555                              * build input: RightSemi(when matched for the
00556                              * first time)
00557                              *
00558                              * Set the output tuple to include only the probe
00559                              * input and get all the matching tuples from the
00560                              * build side. For RightSemi, the probeFieldLength
00561                              * to be included in the output tuple is 0.
00562                              */
00563                             for (uint i = 0; i < probeFieldLength; i ++) {
00564                                 outputTuple[i + probeFieldOffset].copyFrom(
00565                                     probeTuple[i]);
00566                             }
00567 
00571                             hashTableReader.bindKey(keyBuf);
00572                             joinState = ProduceBuild;
00573                             nextState.push_back(Probe);
00574                             break;
00575                         } else if (returnProbeInner(curPlan) &&
00576                             !returnProbeOuter() && !returnBuild(curPlan)) {
00577                             /*
00578                              * Join types that return (distinct) matching
00579                              * tuples from the probe input: LeftSemiJoin
00580                              *
00581                              * Produce one output tuple per matched tuple from
00582                              * the left side.
00583                              *
00584                              * Set the output tuple to include only
00585                              * the probe input.
00586                              */
00587                             for (uint i = 0; i < probeFieldLength; i ++) {
00588                                 outputTuple[i + probeFieldOffset].copyFrom(
00589                                     probeTuple[i]);
00590                             }
00591                             joinState = ProducePending;
00592                             nextState.push_back(Probe);
00593                             break;
00594                         } else {
00595                             /*
00596                              * RightAnti and RightSemi(when not matched for the
00597                              * first time) fall through here.
00598                              * Go back to match other probing rows.
00599                              * Return non-matched(for RightAnti)tuples from the
00600                              * hash table.
00601                              */
00602                             probeReader.consumeTuple();
00603                         }
00604                     } else {
00605                         /*
00606                          * No match. Need to return the leftTuple if leftOuter
00607                          * join.
00608                          */
00609                         if (returnProbeOuter(curPlan)) {
00610                             /*
00611                              * Join types that return non-matching
00612                              * tuples from the probe input: LeftOuter, FullOuter
00613                              *
00614                              * Set the output tuple to include only the left
00615                              * input, and set NULL values on the right.
00616                              */
00617                             for (uint i = 0; i < probeFieldLength; i ++) {
00618                                 outputTuple[i + probeFieldOffset].copyFrom(
00619                                     probeTuple[i]);
00620                             }
00621 
00622                             for (uint i = 0; i < buildFieldLength; i ++) {
00623                                 outputTuple[i + buildFieldOffset].pData = NULL;
00624                             }
00625                             joinState = ProducePending;
00626                             nextState.push_back(Probe);
00627                             break;
00628                         } else {
00629                             probeReader.consumeTuple();
00630                         }
00631                     }
00632                 }
00633                 break;
00634             }
00635         case ProduceBuild:
00636             {
00637                 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()];
00638                 uint probeTupleSize = inputTupleSize[curPlan->getProbeInput()];
00639                 uint buildTupleSize = inputTupleSize[curPlan->getBuildInput()];
00640                 uint buildFieldOffset =
00641                     returnProbe(curPlan) ?
00642                     probeTupleSize * curPlan->getBuildInput() : 0;
00643                 uint buildFieldLength =
00644                     returnBuild(curPlan) ? buildTupleSize : 0;
00645 
00646                 /*
00647                  * Producing the results.
00648                  * Handle output overflow and quantum expiration in
00649                  * ProducePending state.
00650                  */
00651                 if (hashTableReader.getNext(buildTuple)) {
00652                     for (uint i = 0; i < buildFieldLength; i ++) {
00653                         outputTuple[i + buildFieldOffset].copyFrom(
00654                             buildTuple[i]);
00655                     }
00656 
00657                     joinState = ProducePending;
00658                     /*
00659                      * Come back to this state after producing the output tuple
00660                      * successfully.
00661                      */
00662                     nextState.push_back(ProduceBuild);
00663                 } else {
00664                     joinState = nextState.back();
00665                     nextState.pop_back();
00666                     if (joinState == Probe) {
00667                         probeReader.consumeTuple();
00668                     }
00669                 }
00670                 break;
00671             }
00672         case ProducePending:
00673             {
00674                 if (pOutAccessor->produceTuple(outputTuple)) {
00675                     numTuplesProduced++;
00676                     joinState = nextState.back();
00677                     nextState.pop_back();
00678                     if (joinState == Probe) {
00679                         probeReader.consumeTuple();
00680                     }
00681                 } else {
00682                     numTuplesProduced = 0;
00683                     return EXECRC_BUF_OVERFLOW;
00684                 }
00685 
00686                 /*
00687                  * Successfully produced an output row. Now check if quantum
00688                  * has expired.
00689                  */
00690                 if (numTuplesProduced >= quantum.nTuplesMax) {
00691                     /*
00692                      * Reset count.
00693                      */
00694                     numTuplesProduced = 0;
00695                     return EXECRC_QUANTUM_EXPIRED;
00696                 }
00697                 break;
00698             }
00699         case Done:
00700             {
00701                 pOutAccessor->markEOS();
00702                 return EXECRC_EOS;
00703             }
00704         }
00705     }
00706 
00707     /*
00708      * The state machine should never come here.
00709      */
00710     assert (false);
00711 }

void LhxJoinExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented from ExecStream.

Definition at line 132 of file LhxJoinExecStream.cpp.

References EXEC_RESOURCE_ESTIMATE, EXEC_RESOURCE_UNBOUNDED, ExecStream::getResourceRequirements(), isMAXU(), LhxHashTable::LhxHashTableMinPages, max(), ExecStreamResourceQuantity::nCachePages, numBlocksHashTable, and numMiscCacheBlocks.

00136 {
00137     ConfluenceExecStream::getResourceRequirements(minQuantity,optQuantity);
00138 
00139     uint minPages = LhxHashTable::LhxHashTableMinPages + numMiscCacheBlocks;
00140     minQuantity.nCachePages += minPages;
00141     // if no stats were available, make an unbounded resource request
00142     if (isMAXU(numBlocksHashTable)) {
00143         optType = EXEC_RESOURCE_UNBOUNDED;
00144     } else {
00145         // make sure the opt is bigger than the min; otherwise, the
00146         // resource governor won't try to give it extra
00147         optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable);
00148         optType = EXEC_RESOURCE_ESTIMATE;
00149     }
00150 }

void LhxJoinExecStream::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented from ExecStream.

Definition at line 152 of file LhxJoinExecStream.cpp.

References hashInfo, ExecStreamResourceQuantity::nCachePages, LhxHashInfo::numCachePages, numMiscCacheBlocks, and ExecStream::setResourceAllocation().

void ConfluenceExecStream::prepare ( ConfluenceExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 37 of file ConfluenceExecStream.cpp.

References ConfluenceExecStream::getInputBufProvision(), ConfluenceExecStream::inAccessors, and SingleOutputExecStream::prepare().

Referenced by LcsRowScanBaseExecStream::prepare(), LbmUnionExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), CartesianJoinExecStream::prepare(), and BarrierExecStream::prepare().

00038 {
00039     SingleOutputExecStream::prepare(params);
00040 
00041     for (uint i = 0; i < inAccessors.size(); ++i) {
00042         assert(inAccessors[i]->getProvision() == getInputBufProvision());
00043     }
00044 }

void SingleOutputExecStream::prepare ( SingleOutputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 48 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().

Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().

00049 {
00050     ExecStream::prepare(params);
00051     assert(pOutAccessor);
00052     assert(pOutAccessor->getProvision() == getOutputBufProvision());
00053     if (pOutAccessor->getTupleDesc().empty()) {
00054         assert(!params.outputTupleDesc.empty());
00055         pOutAccessor->setTupleShape(
00056             params.outputTupleDesc,
00057             params.outputTupleFormat);
00058     }
00059 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void ConfluenceExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Reimplemented from SingleOutputExecStream.

Definition at line 31 of file ConfluenceExecStream.cpp.

References ConfluenceExecStream::inAccessors.

00033 {
00034     inAccessors = inAccessorsInit;
00035 }

ExecStreamBufProvision ConfluenceExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Definition at line 58 of file ConfluenceExecStream.cpp.

References BUFPROV_PRODUCER.

Referenced by ConfluenceExecStream::prepare().

00059 {
00060     return BUFPROV_PRODUCER;
00061 }

void SingleOutputExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Implements ExecStream.

Reimplemented in ConduitExecStream.

Definition at line 41 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::pOutAccessor.

Referenced by ConduitExecStream::setOutputBufAccessors().

00043 {
00044     assert(outAccessors.size() == 1);
00045     pOutAccessor = outAccessors[0];
00046 }

ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.

Definition at line 69 of file SingleOutputExecStream.cpp.

References BUFPROV_CONSUMER.

Referenced by SingleOutputExecStream::prepare().

00070 {
00071     return BUFPROV_CONSUMER;
00072 }

bool ExecStream::canEarlyClose (  )  [virtual, inherited]

Returns:
true if the stream can be closed early

Reimplemented in SegBufferWriterExecStream.

Definition at line 49 of file ExecStream.cpp.

00050 {
00051     return true;
00052 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual, inherited]

Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, FlatFileExecStreamImpl, BTreeInsertExecStream, BTreeReadExecStream, FtrsTableWriterExecStream, LbmChopperExecStream, LbmSplicerExecStream, LcsClusterAppendExecStream, LcsClusterReplaceExecStream, LcsRowScanBaseExecStream, and LcsRowScanExecStream.

Definition at line 102 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, and ExecStreamResourceQuantity::nThreads.

00105 {
00106     minQuantity.nThreads = 0;
00107     minQuantity.nCachePages = 0;
00108     optQuantity = minQuantity;
00109 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }


Member Data Documentation

shared_array<TupleData> LhxJoinExecStream::inputTuple [private]

Input tuple.

Definition at line 158 of file LhxJoinExecStream.h.

Referenced by execute(), and prepare().

shared_array<uint> LhxJoinExecStream::inputTupleSize [private]

Definition at line 159 of file LhxJoinExecStream.h.

Referenced by execute(), and prepare().

TupleData LhxJoinExecStream::outputTuple [private]

TupleData to assemble the output tuple.

Definition at line 164 of file LhxJoinExecStream.h.

Referenced by execute(), and prepare().

uint LhxJoinExecStream::numTuplesProduced [private]

Number of tuples produced within the current quantum.

Definition at line 169 of file LhxJoinExecStream.h.

Referenced by execute().

LhxHashInfo LhxJoinExecStream::hashInfo [private]

Hash join info.

Definition at line 174 of file LhxJoinExecStream.h.

Referenced by execute(), open(), prepare(), setHashInfo(), and setResourceAllocation().

LhxHashTable LhxJoinExecStream::hashTable [private]

HashTable to use.

Definition at line 179 of file LhxJoinExecStream.h.

Referenced by closeImpl(), execute(), open(), and prepare().

LhxHashTableReader LhxJoinExecStream::hashTableReader [private]

Definition at line 180 of file LhxJoinExecStream.h.

Referenced by execute(), and open().

BlockNum LhxJoinExecStream::numBlocksHashTable [private]

Initial estimate of blocks required.

If MAXU, no stats are available to compute this value.

Definition at line 186 of file LhxJoinExecStream.h.

Referenced by getResourceRequirements(), and prepare().

uint LhxJoinExecStream::numMiscCacheBlocks [private]

Number of cache blocks set aside for I/O.

Definition at line 191 of file LhxJoinExecStream.h.

Referenced by getResourceRequirements(), prepare(), and setResourceAllocation().

bool LhxJoinExecStream::isTopPlan [private]

Definition at line 196 of file LhxJoinExecStream.h.

Referenced by execute(), and open().

SharedLhxPlan LhxJoinExecStream::rootPlan [private]

Definition at line 197 of file LhxJoinExecStream.h.

Referenced by closeImpl(), and open().

LhxPlan* LhxJoinExecStream::curPlan [private]

Definition at line 198 of file LhxJoinExecStream.h.

Referenced by execute(), open(), returnBuild(), returnBuildInner(), returnBuildOuter(), returnInner(), returnProbe(), returnProbeInner(), and returnProbeOuter().

LhxPartitionInfo LhxJoinExecStream::partInfo [private]

Partition context used in recursive partitioning.

Definition at line 204 of file LhxJoinExecStream.h.

Referenced by execute(), and open().

SharedLhxPartition LhxJoinExecStream::buildPart [private]

The build partition(which is also the only partition).

Definition at line 209 of file LhxJoinExecStream.h.

Referenced by open().

SharedLhxPartition LhxJoinExecStream::probePart [private]

Definition at line 210 of file LhxJoinExecStream.h.

Referenced by open().

LhxPartitionReader LhxJoinExecStream::buildReader [private]

Partition reader.

Definition at line 215 of file LhxJoinExecStream.h.

Referenced by execute(), and open().

LhxPartitionReader LhxJoinExecStream::probeReader [private]

Definition at line 216 of file LhxJoinExecStream.h.

Referenced by execute().

bool LhxJoinExecStream::enableSubPartStat [private]

whether to use sub partition stats.

Definition at line 221 of file LhxJoinExecStream.h.

Referenced by execute(), open(), and prepare().

bool LhxJoinExecStream::enableSwing [private]

Whether to use swing based on input sizes.

Definition at line 226 of file LhxJoinExecStream.h.

Referenced by execute(), open(), and prepare().

uint LhxJoinExecStream::forcePartitionLevel [private]

This is set only in tests.

Force partitioning level.

Definition at line 232 of file LhxJoinExecStream.h.

Referenced by execute(), open(), and prepare().

LhxJoinState LhxJoinExecStream::joinState [private]

Definition at line 237 of file LhxJoinExecStream.h.

Referenced by execute(), and open().

vector<LhxJoinState> LhxJoinExecStream::nextState [private]

The next state of the JoinExecStream.

Definition at line 242 of file LhxJoinExecStream.h.

Referenced by execute(), and open().

shared_ptr<dynamic_bitset<> > LhxJoinExecStream::joinType [private]

Definition at line 247 of file LhxJoinExecStream.h.

Referenced by returnBuildInner(), returnBuildOuter(), returnProbeInner(), returnProbeOuter(), and setJoinType().

bool LhxJoinExecStream::regularJoin [private]

regularJoin: do not match NULLs, and do not remove duplicates in inputs.

setopDistinct: match NULLs, and remove duplicates in inputs. setopAll: match NULLs, and do not remove duplicates in inputs Note: setopAll is not implemented yet.

Definition at line 256 of file LhxJoinExecStream.h.

Referenced by execute(), setHashInfo(), and setJoinType().

bool LhxJoinExecStream::setopDistinct [private]

Definition at line 257 of file LhxJoinExecStream.h.

Referenced by prepare(), setHashInfo(), and setJoinType().

bool LhxJoinExecStream::setopAll [private]

Definition at line 258 of file LhxJoinExecStream.h.

Referenced by setJoinType().

std::vector<SharedExecStreamBufAccessor> ConfluenceExecStream::inAccessors [protected, inherited]

Definition at line 50 of file ConfluenceExecStream.h.

Referenced by NestedLoopJoinExecStream::checkNumInputs(), CartesianJoinExecStream::checkNumInputs(), LbmMinusExecStream::comparePrefixes(), LbmGeneratorExecStream::execute(), MergeExecStream::execute(), BarrierExecStream::execute(), LbmMinusExecStream::findMinInput(), LcsRowScanExecStream::initializeFiltersIfNeeded(), LcsRowScanExecStream::open(), LbmUnionExecStream::open(), LbmMinusExecStream::open(), LbmGeneratorExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), ConfluenceExecStream::open(), LcsRowScanExecStream::prepare(), LbmUnionExecStream::prepare(), LbmMinusExecStream::prepare(), LbmGeneratorExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConfluenceExecStream::prepare(), CartesianJoinExecStream::prepare(), BarrierExecStream::prepare(), NestedLoopJoinExecStream::preProcessRightInput(), BarrierExecStream::processInputTuple(), LbmBitOpExecStream::producePendingOutput(), LbmMinusExecStream::restartSubtrahends(), setHashInfo(), and ConfluenceExecStream::setInputBufAccessors().

SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited]

Definition at line 56 of file SingleOutputExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:38 2009 for Fennel by  doxygen 1.5.1