LbmMinusExecStream Class Reference

LbmMinusExecStream is the execution stream that subtracts from the first bitmap input stream, the bitmap streams from the remaining inputs. More...

#include <LbmMinusExecStream.h>

Inheritance diagram for LbmMinusExecStream:

LbmBitOpExecStream ConfluenceExecStream SingleOutputExecStream ExecStream ClosableObject TraceSource ErrorSource List of all members.

Public Member Functions

virtual void prepare (LbmMinusExecStreamParams const &params)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void closeImpl ()
 Implements ClosableObject.
virtual void prepare (LbmBitOpExecStreamParams const &params)
virtual void prepare (ConfluenceExecStreamParams const &params)
virtual void prepare (SingleOutputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Protected Member Functions

bool produceTuple (TupleData bitmapTuple)
 Produces a tuple to the output stream, based on a bitmap.
ExecStreamResult producePendingOutput (uint iInput)
 Produces output tuple that previously failed due to buffer overflow.
ExecStreamResult readInput (uint iInput, LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
 Reads a byte segment from a specific input stream.
bool flush ()
 Flushes the segment writer if it has any data.
bool addSegments ()
 Adds the processed segments to the segment under construction.
void writeStartRidParamValue ()
 Writes the startRid value to the startRid dynamic parameter, if one exists.

Protected Attributes

DynamicParamId rowLimitParamId
 Parameter id representing the dynamic parameter used to limit the number of rows producers for this stream should produce on a single execute.
DynamicParamId startRidParamId
 Parameter id representing the dynamic parameter used to set the starting rid value for bitmap entries to be produced by this stream's producers.
TupleDatum rowLimitDatum
 Tuple datum used to store dynamic paramter for rowLimit.
TupleDatum startRidDatum
 Tuple datum used to store dynamic parameter for startRid.
RecordNum rowLimit
 Number of rows that can be produced.
LcsRid startRid
 Desired starting rid value for bitmap entries.
boost::scoped_array< LbmSegmentReadersegmentReaders
 One segment reader for each input stream.
uint nInputs
 Number of input streams.
uint iInput
 Current input stream being processed.
boost::scoped_array< TupleDatabitmapSegTuples
 Tuple data for each input stream.
LbmSegmentWriter segmentWriter
 Segment writer.
boost::scoped_array< FixedBufferoutputBuf
 Buffer for writing output bitmap segment.
boost::scoped_array< FixedBufferbyteSegBuf
 Temporary buffer for bit operation.
PBuffer pByteSegBuf
 Pointer to byteSegBuf.
uint bitmapBufSize
 Amount of space available in buffer for bitmaps.
TupleData outputTuple
 Output tuple data containing AND'd bitmap segments.
bool producePending
 True if a tuple needs to be written to the output stream.
LcsRid addRid
 Current rid value to be added to the bitmap segment.
PBuffer addByteSeg
 Current byte segment to be added.
uint addLen
 Current length of byte segment to be added.
int nFields
 Number of non-bitmap fields preceding the bitmap fields.
std::vector< SharedExecStreamBufAccessorinAccessors
SharedExecStreamBufAccessor pOutAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose

Private Types

enum  MinusInputType { UNKNOWN_INPUT = 0, EMPTY_INPUT, NONEMPTY_INPUT }

Private Member Functions

ExecStreamResult readMinuendInputAndFlush (LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
 Read a byte segment from the minuend input stream.
ExecStreamResult readMinuendInput (LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
 Reads the minuend input as a random sequence of segments.
int comparePrefixes ()
void restartSubtrahends ()
void copyPrefix ()
ExecStreamResult advanceSingleSubtrahend (int inputNo, LcsRid rid)
 Advance a single subtrahend to the specified rid.
ExecStreamResult advanceSubtrahends (LcsRid baseRid)
 Advances all subtrahends to the desired start rid.
ExecStreamResult minusSegments (LcsRid baseRid, PBuffer baseByteSeg, uint baseLen)
 Performs the minus operation between the minuend and the subtrahends.
ExecStreamResult findMinInput (int &minInput)
 Determines which subtrahend contains the minimum rid value in its current input stream.
bool checkNeedForRestart ()
 Restarts the subtrahends, as needed, when the input has keys.
bool canSkipMinus ()
 Determines if it's possible to avoid the minus operation for the current segment read from the minuend by using a bitmap that keeps track of subtrahend rids that have been read.

Private Attributes

bool subtrahendsDone
 True if all subtrahends have reached EOS at some point.
bool needToRead
 True if a new segment needs to be read from the minuend.
LcsRid baseRid
 Current startrid for the minuend.
PBuffer baseByteSeg
 Current byte segment for the minuend.
uint baseLen
 Length of minuend's current byte segment.
LcsRid minSubtrahendRid
 Minimum rid from amongst the subtrahends.
LcsRid maxSubtrahendRid
 The maximum rid that have been read by a subtrahend.
bool advancePending
 True if a subtrahend needs to be advanced even though all subtrahends are already positioned past the minuend's startrid.
LcsRid advanceSubtrahendRid
 The rid that the subtrahend needs to be advanced to when advancePending is true.
int advanceSubtrahendInputNo
 The input containing the subtrahend that needs to be advanced.
MinusInputType inputType
 Field used to detect the special case of empty inputs.
LbmSeqSegmentReader minuendReader
 A sequential reader used when the minuend input has keys, which may lead to RIDs being out of order.
bool prevTupleValid
 Whether the previous set of prefix fields is valid.
TupleDataWithBuffer prevTuple
 Previous set of prefix fields.
bool copyPrefixPending
 Whether the prefix should be copied before reading more tuples.
TupleData prefixedBitmapTuple
 Tuple data used to build prefixed output.
boost::dynamic_bitset subtrahendBitmap
 Bitmap containing rid values read from the subtrahends.
bool needSubtrahendRestart
 True if the subtrahends need to be restarted.

Static Private Attributes

static const uint SUBTRAHEND_BITMAP_SIZE = 32768
 Number of bits in the bitmap that keeps track of rid values read from the subtrahends.

Detailed Description

LbmMinusExecStream is the execution stream that subtracts from the first bitmap input stream, the bitmap streams from the remaining inputs.

A minus stream is generally used to subtract an ordered bitmap input streams. In a special case, however, the first input (the "minuend") may contain fields in addition to bitmap fields. The additional fields are considered to be "key fields" and are required to be the first fields. Key fields are propagated to the output stream and allow index only scans to return a result set. A side effect is that the minuend input is only partially ordered when using key fields.

To support a partially ordered minuend input, the streams to be subtracted (the "subtrahends") are restarted when the minuend is out of order.

Author:
Zelaine Fong
Version:
Id
//open/dev/fennel/lucidera/bitmap/LbmMinusExecStream.h#13

Definition at line 58 of file LbmMinusExecStream.h.


Member Enumeration Documentation

enum LbmMinusExecStream::MinusInputType [private]

Enumerator:
UNKNOWN_INPUT 
EMPTY_INPUT 
NONEMPTY_INPUT 

Definition at line 114 of file LbmMinusExecStream.h.

00114                         {
00115         UNKNOWN_INPUT = 0,
00116         EMPTY_INPUT,
00117         NONEMPTY_INPUT
00118     };


Member Function Documentation

ExecStreamResult LbmMinusExecStream::readMinuendInputAndFlush ( LcsRid &  currRid,
PBuffer currByteSeg,
uint currLen 
) [private]

Read a byte segment from the minuend input stream.

In the case where the input has keys, tuples are flushed whenever a new key is read.

Parameters:
currRid the starting rid value of the segment read
currByteSeg the first byte of the segment read
currLen the length of the segment read
Returns:
EXECRC_YIELD if a segment was successfully read

Definition at line 167 of file LbmMinusExecStream.cpp.

References baseByteSeg, baseLen, baseRid, comparePrefixes(), copyPrefix(), copyPrefixPending, EXECRC_BUF_OVERFLOW, EXECRC_YIELD, LbmBitOpExecStream::flush(), LbmSegmentReaderBase::getTupleChange(), LbmBitOpExecStream::iInput, minuendReader, needSubtrahendRestart, needToRead, LbmBitOpExecStream::nFields, LbmBitOpExecStream::pByteSegBuf, prevTupleValid, LbmBitOpExecStream::readInput(), readMinuendInput(), LbmSegmentReaderBase::resetChangeListener(), LbmBitOpExecStream::startRid, and LbmBitOpExecStream::writeStartRidParamValue().

Referenced by execute().

00169 {
00170     ExecStreamResult rc;
00171     bool unordered = false;
00172 
00173     // If there are no keys, read as the minuend as an ordered input.
00174     // Otherwise, read the minuend as a random sequence of segments.
00175     if (nFields == 0) {
00176         rc = readInput(0, currRid, currByteSeg, currLen);
00177     } else {
00178         rc = readMinuendInput(currRid, currByteSeg, currLen);
00179         if (currRid < startRid) {
00180             unordered = true;
00181         }
00182     }
00183     if (rc != EXECRC_YIELD) {
00184         return rc;
00185     }
00186 
00187     // Store the segment just read
00188     memcpy(pByteSegBuf, baseByteSeg - baseLen + 1, baseLen);
00189     needToRead = false;
00190     // reset the startrid to the rid just read in and write the
00191     // dynamic parameter so the subtrahends can skip forward to that
00192     // rid
00193     startRid = baseRid;
00194     writeStartRidParamValue();
00195     iInput = 1;
00196 
00197     // If there are no keys (the usual case), we never need to restart inputs
00198     if (nFields == 0) {
00199         return rc;
00200     }
00201 
00202     // If there are keys, then data is expected to come from an index. RIDs
00203     // may be ordered for each key, but are not ordered for the entire stream.
00204     // In fact, when minus keys are only a subset of an index's keys, then
00205     // RIDs may restart at any time.
00206     // (Ex: RIDs in index [K1, K2] are ordered for each pair [k1, k2].
00207     // However, a minus based on [K1] will be completely out of order.)
00208     //
00209     // Due to the lack of ordering, we may need to restart subtrahends
00210     // whenever the minuend is out of order so all of the subtrahend data
00211     // can be minused from the next minuend input.  That is handled outside
00212     // of this method because restarts don't always need to be done
00213     // immediately after a new key is read.
00214     //
00215     // We also flush the segment writer's current tuple. If it cannot be
00216     // written, then we can't copy the next prefix yet, because the old
00217     // values will be used to construct the pending output tuple.
00218     if (prevTupleValid) {
00219         if (minuendReader.getTupleChange()) {
00220             minuendReader.resetChangeListener();
00221             int keyComp = comparePrefixes();
00222             if (keyComp != 0 || unordered) {
00223                 needSubtrahendRestart = true;
00224                 if (!flush()) {
00225                     copyPrefixPending = true;
00226                     return EXECRC_BUF_OVERFLOW;
00227                 }
00228                 copyPrefix();
00229             }
00230         }
00231     } else {
00232         prevTupleValid = true;
00233         copyPrefix();
00234         minuendReader.resetChangeListener();
00235     }
00236     return rc;
00237 }

ExecStreamResult LbmMinusExecStream::readMinuendInput ( LcsRid &  currRid,
PBuffer currByteSeg,
uint currLen 
) [private]

Reads the minuend input as a random sequence of segments.

Definition at line 239 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::bitmapBufSize, byteNumberToRid(), EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_YIELD, LbmBitOpExecStream::flush(), minuendReader, SingleOutputExecStream::pOutAccessor, and LbmSeqSegmentReader::readSegmentAndAdvance().

Referenced by readMinuendInputAndFlush().

00241 {
00242     LbmByteNumber byteNumber;
00243     ExecStreamResult rc = minuendReader.readSegmentAndAdvance(
00244         byteNumber, currByteSeg, currLen);
00245     currRid = byteNumberToRid(byteNumber);
00246     if (rc == EXECRC_EOS) {
00247         // write out the last pending segment
00248         if (! flush()) {
00249             return EXECRC_BUF_OVERFLOW;
00250         }
00251         pOutAccessor->markEOS();
00252         return EXECRC_EOS;
00253     } else if (rc != EXECRC_YIELD) {
00254         return rc;
00255     }
00256 
00257     // segment read should never be larger than space available
00258     // for segments
00259     assert(currLen <= bitmapBufSize);
00260 
00261     return EXECRC_YIELD;
00262 }

int LbmMinusExecStream::comparePrefixes (  )  [private]

Definition at line 264 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::bitmapSegTuples, ConfluenceExecStream::inAccessors, LbmBitOpExecStream::nFields, and prevTuple.

Referenced by readMinuendInputAndFlush().

00265 {
00266     int ret =
00267         (inAccessors[0]->getTupleDesc()).compareTuplesKey(
00268             prevTuple,
00269             bitmapSegTuples[0],
00270             nFields);
00271     return ret;
00272 }

void LbmMinusExecStream::restartSubtrahends (  )  [private]

Definition at line 274 of file LbmMinusExecStream.cpp.

References advancePending, LbmBitOpExecStream::bitmapSegTuples, ExecStream::getStreamId(), ExecStreamGraph::getStreamInput(), LbmBitOpExecStream::iInput, ConfluenceExecStream::inAccessors, minSubtrahendRid, needSubtrahendRestart, LbmBitOpExecStream::nFields, LbmBitOpExecStream::nInputs, ExecStream::pGraph, LbmBitOpExecStream::segmentReaders, subtrahendBitmap, and subtrahendsDone.

Referenced by checkNeedForRestart(), and open().

00275 {
00276     minSubtrahendRid = LcsRid(0);
00277     advancePending = false;
00278     for (uint i = 1; i < nInputs; i++) {
00279         pGraph->getStreamInput(getStreamId(), i)->open(true);
00280         segmentReaders[i].init(
00281             inAccessors[i],
00282             bitmapSegTuples[i],
00283             (!subtrahendsDone && nFields > 0),
00284             &subtrahendBitmap);
00285     }
00286     iInput = 1;
00287     needSubtrahendRestart = false;
00288 }

void LbmMinusExecStream::copyPrefix (  )  [private]

Definition at line 290 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::bitmapSegTuples, LbmBitOpExecStream::nFields, prevTuple, and TupleDataWithBuffer::resetBuffer().

Referenced by execute(), and readMinuendInputAndFlush().

00291 {
00292     /*
00293       Need to make sure pointers are allocated before memcpy.
00294       resetBuffer restores the pointers to the associated buffer.
00295     */
00296     prevTuple.resetBuffer();
00297 
00298     for (int i = 0; i < nFields; i ++) {
00299         prevTuple[i].memCopyFrom(bitmapSegTuples[0][i]);
00300     }
00301 }

ExecStreamResult LbmMinusExecStream::advanceSingleSubtrahend ( int  inputNo,
LcsRid  rid 
) [private]

Advance a single subtrahend to the specified rid.

Parameters:
inputNo input number of the subtrahend
rid rid to be advanced to
Returns:
EXECRC_YIELD if advance was successful

Definition at line 303 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::segmentReaders.

Referenced by execute().

00306 {
00307     ExecStreamResult rc = segmentReaders[inputNo].advanceToRid(rid);
00308     return rc;
00309 }

ExecStreamResult LbmMinusExecStream::advanceSubtrahends ( LcsRid  baseRid  )  [private]

Advances all subtrahends to the desired start rid.

Parameters:
baseRid desired startrid
Returns:
EXECRC_YIELD if able to successfully advance all subtrahends

Definition at line 311 of file LbmMinusExecStream.cpp.

References EXECRC_EOS, EXECRC_YIELD, LbmBitOpExecStream::iInput, minSubtrahendRid, LbmBitOpExecStream::nInputs, and LbmBitOpExecStream::segmentReaders.

Referenced by execute().

00312 {
00313     // no need to advance subtrahends if they're all positioned past the
00314     // minuend
00315     if (minSubtrahendRid > baseRid) {
00316         return EXECRC_YIELD;
00317     }
00318 
00319     // advance the subtrahends, resuming at the one where we last left off
00320     for (; iInput < nInputs; iInput++) {
00321         ExecStreamResult rc = segmentReaders[iInput].advanceToRid(baseRid);
00322         if (rc == EXECRC_EOS) {
00323             continue;
00324         }
00325         if (rc != EXECRC_YIELD) {
00326             return rc;
00327         }
00328     }
00329 
00330     return EXECRC_YIELD;
00331 }

ExecStreamResult LbmMinusExecStream::minusSegments ( LcsRid  baseRid,
PBuffer  baseByteSeg,
uint  baseLen 
) [private]

Performs the minus operation between the minuend and the subtrahends.

Parameters:
baseRid start rid of the minuend
baseByteSeg pointer to the first byte of the minuend segment; note that the segment is stored backwards so it needs to be read from right to left
baseLen length of the minuend segment
Returns:
EXECRC_YIELD if able to read data from subtrahends

Definition at line 400 of file LbmMinusExecStream.cpp.

References advancePending, advanceSubtrahendInputNo, advanceSubtrahendRid, EXECRC_EOS, EXECRC_YIELD, findMinInput(), LbmSegment::LbmOneByteSize, min(), opaqueToInt(), LbmBitOpExecStream::pByteSegBuf, and LbmBitOpExecStream::segmentReaders.

Referenced by execute().

00402 {
00403     while (true) {
00404         // find the subtrahend with the minimum startrid and read its current
00405         // segment
00406         int minInput;
00407         ExecStreamResult rc = findMinInput(minInput);
00408         if (rc == EXECRC_EOS) {
00409             return rc;
00410         }
00411 
00412         LcsRid currRid;
00413         PBuffer currByteSeg;
00414         uint currLen;
00415         segmentReaders[minInput].readCurrentByteSegment(
00416             currRid, currByteSeg, currLen);
00417 
00418         // if the subtrahends are not within the range of the minuend's
00419         // current rid range, ignore the current segment and get a new one
00420         uint offset =
00421             opaqueToInt(currRid - baseRid) / LbmSegment::LbmOneByteSize;
00422         if (offset >= baseLen) {
00423             break;
00424         }
00425 
00426         // only read from the subtrahends the amount that will match the
00427         // minuend's segment
00428         currLen = std::min(currLen, baseLen - offset);
00429 
00430         // minus from the minuend -- note that segments are stored
00431         // backwards
00432         PBuffer out = pByteSegBuf + baseLen - 1 - offset;
00433         uint len = currLen;
00434         while (len--) {
00435             *out-- &= ~(*currByteSeg--);
00436         }
00437 
00438         // advance the subtrahend by the amount read in; note that we don't
00439         // return if this subtrahend has reached EOS, as there may still be
00440         // other subtrahends that aren't in the EOS state
00441         rc = segmentReaders[minInput].advanceToRid(
00442                 currRid + currLen * LbmSegment::LbmOneByteSize);
00443         if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00444             advancePending = true;
00445             advanceSubtrahendRid =
00446                 currRid + currLen * LbmSegment::LbmOneByteSize;
00447             advanceSubtrahendInputNo = minInput;
00448             return rc;
00449         }
00450     }
00451 
00452     return EXECRC_YIELD;
00453 }

ExecStreamResult LbmMinusExecStream::findMinInput ( int &  minInput  )  [private]

Determines which subtrahend contains the minimum rid value in its current input stream.

Parameters:
minInput returns input number corresponding to the subtrahend with the minimum rid input
Returns:
EXECRC_YIELD if able to successfully find a subtrahend; else EXECRC_EOS if all subtrahends have reached EOS

Definition at line 455 of file LbmMinusExecStream.cpp.

References EXECBUF_EOS, EXECRC_EOS, EXECRC_YIELD, ConfluenceExecStream::inAccessors, maxSubtrahendRid, minSubtrahendRid, LbmBitOpExecStream::nInputs, LbmBitOpExecStream::segmentReaders, and subtrahendsDone.

Referenced by execute(), and minusSegments().

00456 {
00457     minInput = -1;
00458 
00459     for (uint i = 1; i < nInputs; i++) {
00460         if (inAccessors[i]->getState() == EXECBUF_EOS) {
00461             continue;
00462         }
00463 
00464         LcsRid currRid;
00465         PBuffer currByteSeg;
00466         uint currLen;
00467         segmentReaders[i].readCurrentByteSegment(
00468             currRid, currByteSeg, currLen);
00469 
00470         if (minInput == -1 || currRid < minSubtrahendRid) {
00471             minInput = i;
00472             minSubtrahendRid = currRid;
00473         }
00474     }
00475 
00476     if (minInput == -1) {
00477         // Note that once we've made one pass over the subtrahends, by setting
00478         // subtrahendsDone, we'll avoid resetting the bits on subsequent passes.
00479         // Position minSubtrahendRid past the max subtrahend rid since the
00480         // subtrahends are no longer positioned at that minimum rid.
00481         subtrahendsDone = true;
00482         for (uint i = 1; i < nInputs; i++) {
00483             LcsRid rid = segmentReaders[i].getMaxRidSet();
00484             if (rid > maxSubtrahendRid) {
00485                 maxSubtrahendRid = rid;
00486             }
00487         }
00488         minSubtrahendRid = maxSubtrahendRid + 1;
00489         return EXECRC_EOS;
00490     } else {
00491         return EXECRC_YIELD;
00492     }
00493 }

bool LbmMinusExecStream::checkNeedForRestart (  )  [private]

Restarts the subtrahends, as needed, when the input has keys.

The pre-condition for a restart is either a change in key value in the input or unordered rid values in the minuend. Once this pre-condition is met, then two additional criteria are also required -- the current minuend must contain rids that overlap with the subtrahends, and the subtrahend is positioned past the current minuend.

For a minuend that meets the pre-condition just described, even if it doesn't meet the additional criteria, then the very first segment that follows that does meet the criteria will need a restart. After that restart is done, then no further restart checks are needed until the restart pre-condition is met again.

As part of determining whether or not the restart is necessary, we check for overlapping rids. If there is no overlap, then the minus operation can be skipped. This skip minus check is always done, provided the input is non-empty.

Returns:
true if the minus operation can be bypassed when the input has keys

Definition at line 333 of file LbmMinusExecStream.cpp.

References canSkipMinus(), EMPTY_INPUT, inputType, minSubtrahendRid, needSubtrahendRestart, LbmBitOpExecStream::nFields, restartSubtrahends(), and LbmBitOpExecStream::startRid.

Referenced by execute().

00334 {
00335     // Return value indicates whether the minus can be skipped, not whether
00336     // a restart was done.
00337 
00338     if (nFields == 0) {
00339         return false;
00340     } else if (inputType == EMPTY_INPUT) {
00341         // If the input is empty, we can always bypass the minus
00342         return true;
00343     } else if (needSubtrahendRestart) {
00344         bool skipMinus = canSkipMinus();
00345         // If there are potentially overlapping rids and the subtrahend is
00346         // positioned past the current minuend, we need to restart
00347         if (!skipMinus && minSubtrahendRid > startRid) {
00348             restartSubtrahends();
00349         }
00350         return skipMinus;
00351     } else {
00352         return canSkipMinus();
00353     }
00354 }

bool LbmMinusExecStream::canSkipMinus (  )  [private]

Determines if it's possible to avoid the minus operation for the current segment read from the minuend by using a bitmap that keeps track of subtrahend rids that have been read.

Returns:
true if the minus can be skipped

Definition at line 356 of file LbmMinusExecStream.cpp.

References baseByteSeg, baseLen, baseRid, LbmSegment::LbmOneByteSize, maxSubtrahendRid, LbmBitOpExecStream::nInputs, opaqueToInt(), LbmBitOpExecStream::segmentReaders, SUBTRAHEND_BITMAP_SIZE, subtrahendBitmap, and subtrahendsDone.

Referenced by checkNeedForRestart().

00357 {
00358     LcsRid rid = baseRid;
00359     LcsRid endRid = baseRid + baseLen * LbmSegment::LbmOneByteSize - 1;
00360 
00361     // Determine whether the rids in the current minuend segment are
00362     // "covered" by the bitmap in its current state
00363     if (subtrahendsDone) {
00364         // If the first rid we're interested in extends past the max rid read
00365         // from all subtrahends, then there are no rids to subtract off.
00366         if (rid > maxSubtrahendRid) {
00367             return true;
00368         }
00369     } else {
00370         // If the last rid we're interested in extends past the max rid read
00371         // from any subtrahend, then we can't use the bitmap to determine if
00372         // we can skip the minus.
00373         for (uint i = 1; i < nInputs; i++) {
00374             if (endRid > segmentReaders[i].getMaxRidSet()) {
00375                 return false;
00376             }
00377         }
00378     }
00379 
00380     PBuffer seg = baseByteSeg;
00381     for (uint i = 0; i < baseLen; i++) {
00382         uint8_t byte = *((uint8_t *) seg);
00383         for (uint j = 0; j < LbmSegment::LbmOneByteSize; j++) {
00384             if (byte & 1) {
00385                 // once we find a match, no need to look any further
00386                 if (subtrahendBitmap.test(
00387                     opaqueToInt(rid % SUBTRAHEND_BITMAP_SIZE)))
00388                 {
00389                     return false;
00390                 }
00391             }
00392             byte = byte >> 1;
00393             rid++;
00394         }
00395         seg--;
00396     }
00397     return true;
00398 }

bool LbmMinusExecStream::produceTuple ( TupleData  bitmapTuple  )  [protected, virtual]

Produces a tuple to the output stream, based on a bitmap.

Returns:
false if buffer overflow occured while producing the tuple

Reimplemented from LbmBitOpExecStream.

Definition at line 495 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::nFields, SingleOutputExecStream::pOutAccessor, prefixedBitmapTuple, and prevTuple.

00496 {
00497     // If the minuend contained prefix fields, they are prepended to
00498     // the output: ([optional prefix fields], bitmap)
00499     if (nFields) {
00500         for (uint i = 0; i < nFields; i++) {
00501             prefixedBitmapTuple[i].copyFrom(prevTuple[i]);
00502         }
00503         assert (prefixedBitmapTuple.size() == nFields + bitmapTuple.size());
00504         for (uint i = 0; i < 3; i++) {
00505             prefixedBitmapTuple[nFields + i].copyFrom(bitmapTuple[i]);
00506         }
00507         return pOutAccessor->produceTuple(prefixedBitmapTuple);
00508     }
00509     return pOutAccessor->produceTuple(bitmapTuple);
00510 }

void LbmMinusExecStream::prepare ( LbmMinusExecStreamParams const &  params  )  [virtual]

Definition at line 29 of file LbmMinusExecStream.cpp.

References TupleData::compute(), TupleDataWithBuffer::computeAndAllocate(), ConfluenceExecStream::inAccessors, LbmBitOpExecStream::nFields, prefixedBitmapTuple, LbmBitOpExecStream::prepare(), prevTuple, and subtrahendBitmap.

00030 {
00031     LbmBitOpExecStream::prepare(params);
00032 
00033     if (nFields) {
00034         // compute an output tuple based on the minuend
00035         prefixedBitmapTuple.compute(inAccessors[0]->getTupleDesc());
00036 
00037         // prevTuple contains only the prefix fields, it has it's own
00038         // storage to track the previous tuple when the current tuple
00039         // pointers have moved forward
00040         TupleDescriptor prevTupleDesc;
00041         TupleDescriptor const &inputDesc = inAccessors[0]->getTupleDesc();
00042         for (int i = 0; i < nFields; i ++) {
00043             prevTupleDesc.push_back(inputDesc[i]);
00044         }
00045         prevTuple.computeAndAllocate(prevTupleDesc);
00046     }
00047     subtrahendBitmap.resize(0);
00048 }

void LbmMinusExecStream::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from LbmBitOpExecStream.

Definition at line 50 of file LbmMinusExecStream.cpp.

References advancePending, baseRid, LbmBitOpExecStream::bitmapSegTuples, copyPrefixPending, ConfluenceExecStream::inAccessors, LbmSeqSegmentReader::init(), inputType, maxSubtrahendRid, minSubtrahendRid, minuendReader, needToRead, LbmBitOpExecStream::nFields, LbmBitOpExecStream::open(), prevTupleValid, restartSubtrahends(), LbmBitOpExecStream::rowLimit, SUBTRAHEND_BITMAP_SIZE, subtrahendBitmap, subtrahendsDone, and UNKNOWN_INPUT.

00051 {
00052     LbmBitOpExecStream::open(restart);
00053     subtrahendsDone = false;
00054     needToRead = true;
00055     minSubtrahendRid = LcsRid(0);
00056     maxSubtrahendRid = LcsRid(0);
00057     baseRid = LcsRid(0);
00058     advancePending = false;
00059     // since the subtrahends need to read till EOS, don't set a rowLimit
00060     rowLimit = 0;
00061     inputType = UNKNOWN_INPUT;
00062     copyPrefixPending = false;
00063     prevTupleValid = false;
00064     minuendReader.init(inAccessors[0], bitmapSegTuples[0]);
00065 
00066     if (nFields > 0) {
00067         subtrahendBitmap.resize(SUBTRAHEND_BITMAP_SIZE);
00068     }
00069     restartSubtrahends();
00070 }

ExecStreamResult LbmMinusExecStream::execute ( ExecStreamQuantum const &  quantum  )  [virtual]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 72 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::addByteSeg, LbmBitOpExecStream::addLen, LbmBitOpExecStream::addRid, LbmBitOpExecStream::addSegments(), advancePending, advanceSingleSubtrahend(), advanceSubtrahendInputNo, advanceSubtrahendRid, advanceSubtrahends(), baseByteSeg, baseLen, baseRid, checkNeedForRestart(), copyPrefix(), copyPrefixPending, dummy(), EMPTY_INPUT, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, findMinInput(), inputType, LbmSegment::LbmOneByteSize, minusSegments(), needToRead, LbmBitOpExecStream::nFields, NONEMPTY_INPUT, ExecStreamQuantum::nTuplesMax, LbmBitOpExecStream::pByteSegBuf, LbmBitOpExecStream::producePending, LbmBitOpExecStream::producePendingOutput(), readMinuendInputAndFlush(), LbmBitOpExecStream::startRid, subtrahendsDone, and UNKNOWN_INPUT.

00073 {
00074     ExecStreamResult rc;
00075 
00076     // On the first execution, check whether any subtrahend has data
00077     if (inputType == UNKNOWN_INPUT) {
00078         rc = advanceSubtrahends(LcsRid(0));
00079         if (rc != EXECRC_YIELD) {
00080             return rc;
00081         }
00082         int dummy;
00083         rc = findMinInput(dummy);
00084         if (rc == EXECRC_EOS) {
00085             inputType = EMPTY_INPUT;
00086             if (nFields == 0) {
00087                 subtrahendsDone = true;
00088             }
00089         } else {
00090             inputType = NONEMPTY_INPUT;
00091         }
00092     }
00093 
00094     if (producePending) {
00095         rc = producePendingOutput(0);
00096         if (rc != EXECRC_YIELD) {
00097             return rc;
00098         }
00099     }
00100 
00101     bool skipMinus = false;
00102     if (copyPrefixPending) {
00103         copyPrefix();
00104         copyPrefixPending = false;
00105         needToRead = false;
00106         // Since we bypassed the restart check when this current minuend
00107         // was read, we need to do the check here
00108         skipMinus = checkNeedForRestart();
00109     }
00110 
00111     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00112         // read a segment from the minuend if we've finished processing the
00113         // previous segment
00114         if (needToRead) {
00115             rc = readMinuendInputAndFlush(baseRid, baseByteSeg, baseLen);
00116             if (rc != EXECRC_YIELD) {
00117                 return rc;
00118             }
00119 
00120             // See if we need to restart the subtrahends
00121             skipMinus = checkNeedForRestart();
00122         }
00123 
00124         // Minus the subtrahends if they haven't all reached EOS in the
00125         // case where there are no keys.  In the case where there are keys,
00126         // the bitmap determines whether we can skip the minus.
00127         if ((nFields == 0 && !subtrahendsDone) || !skipMinus) {
00128             if (advancePending) {
00129                 rc =
00130                     advanceSingleSubtrahend(
00131                         advanceSubtrahendInputNo,
00132                         advanceSubtrahendRid);
00133                 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00134                     return rc;
00135                 }
00136                 advancePending = false;
00137             } else {
00138                 rc = advanceSubtrahends(baseRid);
00139                 if (rc != EXECRC_YIELD) {
00140                     return rc;
00141                 }
00142             }
00143 
00144             rc = minusSegments(baseRid, baseByteSeg, baseLen);
00145             if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00146                 return rc;
00147             }
00148         }
00149 
00150         // bump up the startrid past the segment just read and
00151         // write out the minuend segment
00152         needToRead = true;
00153         startRid = baseRid + baseLen * LbmSegment::LbmOneByteSize;
00154         addRid = baseRid;
00155         addByteSeg = pByteSegBuf;
00156         addLen = baseLen;
00157         if (!addSegments()) {
00158             return EXECRC_BUF_OVERFLOW;
00159         }
00160 
00161         // loop back to read the next minuend segment
00162     }
00163 
00164     return EXECRC_QUANTUM_EXPIRED;
00165 }

void LbmMinusExecStream::closeImpl (  )  [virtual]

Implements ClosableObject.

ExecStream implementations may override this to release any resources acquired while open.

Reimplemented from LbmBitOpExecStream.

Definition at line 512 of file LbmMinusExecStream.cpp.

References LbmBitOpExecStream::closeImpl(), and subtrahendBitmap.

00513 {
00514     subtrahendBitmap.resize(0);
00515     LbmBitOpExecStream::closeImpl();
00516 }

ExecStreamResult LbmBitOpExecStream::producePendingOutput ( uint  iInput  )  [protected, inherited]

Produces output tuple that previously failed due to buffer overflow.

Writes out the remaining segments that could not fit in the previous buffer. Determines if input has reached EOS.

Parameters:
iInput input to read to determine if EOS reached
Returns:
EXECRC_YIELD if successful

Definition at line 110 of file LbmBitOpExecStream.cpp.

References LbmBitOpExecStream::addSegments(), EXECBUF_EOS, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_YIELD, ConfluenceExecStream::inAccessors, LbmSegmentWriter::isEmpty(), LbmBitOpExecStream::outputTuple, SingleOutputExecStream::pOutAccessor, LbmBitOpExecStream::producePending, LbmBitOpExecStream::produceTuple(), LbmSegmentWriter::reset(), and LbmBitOpExecStream::segmentWriter.

Referenced by execute(), and LbmIntersectExecStream::execute().

00111 {
00112     if (!produceTuple(outputTuple)) {
00113         return EXECRC_BUF_OVERFLOW;
00114     }
00115     // in the middle of adding segments when buffer overflow occurred;
00116     // go back and add the remaining segments
00117     if (!segmentWriter.isEmpty()) {
00118         segmentWriter.reset();
00119         if (!addSegments()) {
00120             return EXECRC_BUF_OVERFLOW;
00121         }
00122     }
00123     producePending = false;
00124     if (inAccessors[iInput]->getState() == EXECBUF_EOS) {
00125         pOutAccessor->markEOS();
00126         return EXECRC_EOS;
00127     }
00128 
00129     return EXECRC_YIELD;
00130 }

ExecStreamResult LbmBitOpExecStream::readInput ( uint  iInput,
LcsRid &  currRid,
PBuffer currByteSeg,
uint currLen 
) [protected, inherited]

Reads a byte segment from a specific input stream.

Parameters:
iInput input to read
currRid startRid of the segment read
currByteSeg byte segment read; points to the beginning of segment that is stored backwards
currLen length of the byte segment read
Returns:
EXECRC_YIELD if read was successful

Definition at line 132 of file LbmBitOpExecStream.cpp.

References LbmBitOpExecStream::bitmapBufSize, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_YIELD, LbmBitOpExecStream::flush(), SingleOutputExecStream::pOutAccessor, LbmBitOpExecStream::segmentReaders, and LbmBitOpExecStream::startRid.

Referenced by LbmIntersectExecStream::execute(), and readMinuendInputAndFlush().

00134 {
00135     ExecStreamResult rc = segmentReaders[iInput].advanceToRid(startRid);
00136 
00137     if (rc == EXECRC_EOS) {
00138         // write out the last pending segment
00139         if (! flush()) {
00140             return EXECRC_BUF_OVERFLOW;
00141         }
00142         pOutAccessor->markEOS();
00143         return EXECRC_EOS;
00144     } else if (rc != EXECRC_YIELD) {
00145         return rc;
00146     }
00147 
00148     segmentReaders[iInput].readCurrentByteSegment(
00149         currRid, currByteSeg, currLen);
00150     // segment read should never be larger than space available
00151     // for segments
00152     assert(currLen <= bitmapBufSize);
00153 
00154     return EXECRC_YIELD;
00155 }

bool LbmBitOpExecStream::flush (  )  [protected, inherited]

Flushes the segment writer if it has any data.

The data is transferred to a bitmap tuple and the segment writer is reset. Finally, the tuple is produced to the output stream. If the tuple cannot be written, then it becomes a pending tuple.

Returns:
false if buffer overflow occured while producing the tuple

Definition at line 157 of file LbmBitOpExecStream.cpp.

References LbmSegmentWriter::isEmpty(), LbmBitOpExecStream::outputTuple, LbmBitOpExecStream::producePending, LbmSegmentWriter::produceSegmentTuple(), LbmBitOpExecStream::produceTuple(), LbmSegmentWriter::reset(), and LbmBitOpExecStream::segmentWriter.

Referenced by LbmBitOpExecStream::readInput(), readMinuendInput(), and readMinuendInputAndFlush().

00158 {
00159     assert (!producePending);
00160 
00161     if (!segmentWriter.isEmpty()) {
00162         outputTuple = segmentWriter.produceSegmentTuple();
00163         segmentWriter.reset();
00164         if (!produceTuple(outputTuple)) {
00165             producePending = true;
00166         }
00167     }
00168     return !producePending;
00169  }

bool LbmBitOpExecStream::addSegments (  )  [protected, inherited]

Adds the processed segments to the segment under construction.

If the segment fills up, writes it to the output buffer and continues constructing the rest of the segment. Leading, trailing, and intermediate zeros in the segment are removed.

Returns:
false if buffer overflow occurred writing out a segment

Definition at line 171 of file LbmBitOpExecStream.cpp.

References LbmBitOpExecStream::addByteSeg, LbmBitOpExecStream::addLen, LbmBitOpExecStream::addRid, LbmSegmentWriter::addSegment(), LbmBitOpExecStream::outputTuple, LbmBitOpExecStream::producePending, LbmSegmentWriter::produceSegmentTuple(), LbmBitOpExecStream::produceTuple(), LbmSegmentWriter::reset(), and LbmBitOpExecStream::segmentWriter.

Referenced by execute(), LbmIntersectExecStream::intersectSegments(), and LbmBitOpExecStream::producePendingOutput().

00172 {
00173     while (addLen > 0) {
00174         if (segmentWriter.addSegment(addRid, addByteSeg, addLen)) {
00175             break;
00176         }
00177 
00178         outputTuple = segmentWriter.produceSegmentTuple();
00179         if (!produceTuple(outputTuple)) {
00180             producePending = true;
00181             return false;
00182         }
00183 
00184         // loop back and start creating a new segment for the remainder of
00185         // the segments that wouldn't fit
00186         segmentWriter.reset();
00187     }
00188 
00189     return true;
00190 }

void LbmBitOpExecStream::writeStartRidParamValue (  )  [protected, inherited]

Writes the startRid value to the startRid dynamic parameter, if one exists.

Definition at line 205 of file LbmBitOpExecStream.cpp.

References opaqueToInt(), ExecStream::pDynamicParamManager, LbmBitOpExecStream::startRidDatum, and LbmBitOpExecStream::startRidParamId.

Referenced by LbmIntersectExecStream::execute(), LbmIntersectExecStream::intersectSegments(), LbmBitOpExecStream::open(), and readMinuendInputAndFlush().

00206 {
00207     if (opaqueToInt(startRidParamId) > 0) {
00208         pDynamicParamManager->writeParam(startRidParamId, startRidDatum);
00209     }
00210 }

void LbmBitOpExecStream::prepare ( LbmBitOpExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 28 of file LbmBitOpExecStream.cpp.

References LbmBitOpExecStream::bitmapSegTuples, TupleDatum::cbData, ConfluenceExecStream::inAccessors, LbmBitOpExecStream::nFields, LbmBitOpExecStream::nInputs, TupleDatum::pData, ConfluenceExecStream::prepare(), LbmBitOpExecStream::rowLimit, LbmBitOpExecStream::rowLimitDatum, LbmBitOpExecStreamParams::rowLimitParamId, LbmBitOpExecStream::rowLimitParamId, LbmBitOpExecStream::segmentReaders, LbmBitOpExecStream::startRid, LbmBitOpExecStream::startRidDatum, LbmBitOpExecStreamParams::startRidParamId, and LbmBitOpExecStream::startRidParamId.

Referenced by prepare(), and LbmIntersectExecStream::prepare().

00029 {
00030     ConfluenceExecStream::prepare(params);
00031 
00032     // set dynanmic parameter ids
00033     rowLimitParamId = params.rowLimitParamId;
00034     startRidParamId = params.startRidParamId;
00035 
00036     // setup tupledatums for writing dynamic parameter values
00037     rowLimitDatum.pData = (PConstBuffer) &rowLimit;
00038     rowLimitDatum.cbData = sizeof(rowLimit);
00039     startRidDatum.pData = (PConstBuffer) &startRid;
00040     startRidDatum.cbData = sizeof(startRid);
00041 
00042     // initialize segment readers for reading bitmaps from input stream
00043     nInputs = inAccessors.size();
00044     segmentReaders.reset(new LbmSegmentReader[nInputs]);
00045     bitmapSegTuples.reset(new TupleData[nInputs]);
00046     for (uint i = 0; i < nInputs; i++) {
00047         bitmapSegTuples[i].compute(inAccessors[i]->getTupleDesc());
00048     }
00049 
00050     nFields = inAccessors[0]->getTupleDesc().size() - 3;
00051 }

void ConfluenceExecStream::prepare ( ConfluenceExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 37 of file ConfluenceExecStream.cpp.

References ConfluenceExecStream::getInputBufProvision(), ConfluenceExecStream::inAccessors, and SingleOutputExecStream::prepare().

Referenced by LcsRowScanBaseExecStream::prepare(), LbmUnionExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), CartesianJoinExecStream::prepare(), and BarrierExecStream::prepare().

00038 {
00039     SingleOutputExecStream::prepare(params);
00040 
00041     for (uint i = 0; i < inAccessors.size(); ++i) {
00042         assert(inAccessors[i]->getProvision() == getInputBufProvision());
00043     }
00044 }

void SingleOutputExecStream::prepare ( SingleOutputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 48 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().

Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().

00049 {
00050     ExecStream::prepare(params);
00051     assert(pOutAccessor);
00052     assert(pOutAccessor->getProvision() == getOutputBufProvision());
00053     if (pOutAccessor->getTupleDesc().empty()) {
00054         assert(!params.outputTupleDesc.empty());
00055         pOutAccessor->setTupleShape(
00056             params.outputTupleDesc,
00057             params.outputTupleFormat);
00058     }
00059 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void ConfluenceExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Reimplemented from SingleOutputExecStream.

Definition at line 31 of file ConfluenceExecStream.cpp.

References ConfluenceExecStream::inAccessors.

00033 {
00034     inAccessors = inAccessorsInit;
00035 }

ExecStreamBufProvision ConfluenceExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Definition at line 58 of file ConfluenceExecStream.cpp.

References BUFPROV_PRODUCER.

Referenced by ConfluenceExecStream::prepare().

00059 {
00060     return BUFPROV_PRODUCER;
00061 }

void SingleOutputExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Implements ExecStream.

Reimplemented in ConduitExecStream.

Definition at line 41 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::pOutAccessor.

Referenced by ConduitExecStream::setOutputBufAccessors().

00043 {
00044     assert(outAccessors.size() == 1);
00045     pOutAccessor = outAccessors[0];
00046 }

ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.

Definition at line 69 of file SingleOutputExecStream.cpp.

References BUFPROV_CONSUMER.

Referenced by SingleOutputExecStream::prepare().

00070 {
00071     return BUFPROV_CONSUMER;
00072 }

bool ExecStream::canEarlyClose (  )  [virtual, inherited]

Returns:
true if the stream can be closed early

Reimplemented in SegBufferWriterExecStream.

Definition at line 49 of file ExecStream.cpp.

00050 {
00051     return true;
00052 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual, inherited]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 93 of file ExecStream.cpp.

References EXEC_RESOURCE_ACCURATE.

Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().

00097 {
00098     getResourceRequirements(minQuantity, optQuantity);
00099     optType = EXEC_RESOURCE_ACCURATE;
00100 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual, inherited]

Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, FlatFileExecStreamImpl, BTreeInsertExecStream, BTreeReadExecStream, FtrsTableWriterExecStream, LbmChopperExecStream, LbmSplicerExecStream, LcsClusterAppendExecStream, LcsClusterReplaceExecStream, LcsRowScanBaseExecStream, and LcsRowScanExecStream.

Definition at line 102 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, and ExecStreamResourceQuantity::nThreads.

00105 {
00106     minQuantity.nThreads = 0;
00107     minQuantity.nCachePages = 0;
00108     optQuantity = minQuantity;
00109 }

void ExecStream::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual, inherited]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 111 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.

Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().

00113 {
00114     resourceAllocation = quantity;
00115     if (pQuotaAccessor) {
00116         pQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00117     }
00118     if (pScratchQuotaAccessor) {
00119         pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00120     }
00121 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }


Member Data Documentation

bool LbmMinusExecStream::subtrahendsDone [private]

True if all subtrahends have reached EOS at some point.

Definition at line 64 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), execute(), findMinInput(), open(), and restartSubtrahends().

bool LbmMinusExecStream::needToRead [private]

True if a new segment needs to be read from the minuend.

Definition at line 69 of file LbmMinusExecStream.h.

Referenced by execute(), open(), and readMinuendInputAndFlush().

LcsRid LbmMinusExecStream::baseRid [private]

Current startrid for the minuend.

Definition at line 74 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), execute(), open(), and readMinuendInputAndFlush().

PBuffer LbmMinusExecStream::baseByteSeg [private]

Current byte segment for the minuend.

Definition at line 79 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), execute(), and readMinuendInputAndFlush().

uint LbmMinusExecStream::baseLen [private]

Length of minuend's current byte segment.

Definition at line 84 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), execute(), and readMinuendInputAndFlush().

LcsRid LbmMinusExecStream::minSubtrahendRid [private]

Minimum rid from amongst the subtrahends.

Definition at line 89 of file LbmMinusExecStream.h.

Referenced by advanceSubtrahends(), checkNeedForRestart(), findMinInput(), open(), and restartSubtrahends().

LcsRid LbmMinusExecStream::maxSubtrahendRid [private]

The maximum rid that have been read by a subtrahend.

This only applies if the input has keys.

Definition at line 95 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), findMinInput(), and open().

bool LbmMinusExecStream::advancePending [private]

True if a subtrahend needs to be advanced even though all subtrahends are already positioned past the minuend's startrid.

Definition at line 101 of file LbmMinusExecStream.h.

Referenced by execute(), minusSegments(), open(), and restartSubtrahends().

LcsRid LbmMinusExecStream::advanceSubtrahendRid [private]

The rid that the subtrahend needs to be advanced to when advancePending is true.

Definition at line 107 of file LbmMinusExecStream.h.

Referenced by execute(), and minusSegments().

int LbmMinusExecStream::advanceSubtrahendInputNo [private]

The input containing the subtrahend that needs to be advanced.

Definition at line 112 of file LbmMinusExecStream.h.

Referenced by execute(), and minusSegments().

MinusInputType LbmMinusExecStream::inputType [private]

Field used to detect the special case of empty inputs.

When the subtrahends are empty, there is no need to subtract them.

Definition at line 124 of file LbmMinusExecStream.h.

Referenced by checkNeedForRestart(), execute(), and open().

LbmSeqSegmentReader LbmMinusExecStream::minuendReader [private]

A sequential reader used when the minuend input has keys, which may lead to RIDs being out of order.

Definition at line 130 of file LbmMinusExecStream.h.

Referenced by open(), readMinuendInput(), and readMinuendInputAndFlush().

bool LbmMinusExecStream::prevTupleValid [private]

Whether the previous set of prefix fields is valid.

Definition at line 135 of file LbmMinusExecStream.h.

Referenced by open(), and readMinuendInputAndFlush().

TupleDataWithBuffer LbmMinusExecStream::prevTuple [private]

Previous set of prefix fields.

Definition at line 140 of file LbmMinusExecStream.h.

Referenced by comparePrefixes(), copyPrefix(), prepare(), and produceTuple().

bool LbmMinusExecStream::copyPrefixPending [private]

Whether the prefix should be copied before reading more tuples.

Definition at line 145 of file LbmMinusExecStream.h.

Referenced by execute(), open(), and readMinuendInputAndFlush().

TupleData LbmMinusExecStream::prefixedBitmapTuple [private]

Tuple data used to build prefixed output.

Definition at line 150 of file LbmMinusExecStream.h.

Referenced by prepare(), and produceTuple().

const uint LbmMinusExecStream::SUBTRAHEND_BITMAP_SIZE = 32768 [static, private]

Number of bits in the bitmap that keeps track of rid values read from the subtrahends.

Note that because the size of the bitmap is smaller than the number of possible rid values, there can be false hits when testing the bitmap. The larger this value, the more memory required, but the lower the likelihood of false hits.

Definition at line 159 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), and open().

boost::dynamic_bitset LbmMinusExecStream::subtrahendBitmap [private]

Bitmap containing rid values read from the subtrahends.

Only used in the case when the input contains keys.

Definition at line 165 of file LbmMinusExecStream.h.

Referenced by canSkipMinus(), closeImpl(), open(), prepare(), and restartSubtrahends().

bool LbmMinusExecStream::needSubtrahendRestart [private]

True if the subtrahends need to be restarted.

Definition at line 170 of file LbmMinusExecStream.h.

Referenced by checkNeedForRestart(), readMinuendInputAndFlush(), and restartSubtrahends().

DynamicParamId LbmBitOpExecStream::rowLimitParamId [protected, inherited]

Parameter id representing the dynamic parameter used to limit the number of rows producers for this stream should produce on a single execute.

Definition at line 71 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::open(), and LbmBitOpExecStream::prepare().

DynamicParamId LbmBitOpExecStream::startRidParamId [protected, inherited]

Parameter id representing the dynamic parameter used to set the starting rid value for bitmap entries to be produced by this stream's producers.

Definition at line 78 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), and LbmBitOpExecStream::writeStartRidParamValue().

TupleDatum LbmBitOpExecStream::rowLimitDatum [protected, inherited]

Tuple datum used to store dynamic paramter for rowLimit.

Definition at line 83 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::open(), and LbmBitOpExecStream::prepare().

TupleDatum LbmBitOpExecStream::startRidDatum [protected, inherited]

Tuple datum used to store dynamic parameter for startRid.

Definition at line 88 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::prepare(), and LbmBitOpExecStream::writeStartRidParamValue().

RecordNum LbmBitOpExecStream::rowLimit [protected, inherited]

Number of rows that can be produced.

Definition at line 93 of file LbmBitOpExecStream.h.

Referenced by open(), LbmBitOpExecStream::open(), and LbmBitOpExecStream::prepare().

LcsRid LbmBitOpExecStream::startRid [protected, inherited]

Desired starting rid value for bitmap entries.

Definition at line 98 of file LbmBitOpExecStream.h.

Referenced by checkNeedForRestart(), execute(), LbmIntersectExecStream::execute(), LbmIntersectExecStream::intersectSegments(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), LbmBitOpExecStream::readInput(), and readMinuendInputAndFlush().

boost::scoped_array<LbmSegmentReader> LbmBitOpExecStream::segmentReaders [protected, inherited]

One segment reader for each input stream.

Definition at line 103 of file LbmBitOpExecStream.h.

Referenced by advanceSingleSubtrahend(), advanceSubtrahends(), canSkipMinus(), findMinInput(), LbmIntersectExecStream::intersectSegments(), minusSegments(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), LbmBitOpExecStream::readInput(), and restartSubtrahends().

uint LbmBitOpExecStream::nInputs [protected, inherited]

Number of input streams.

Definition at line 108 of file LbmBitOpExecStream.h.

Referenced by advanceSubtrahends(), canSkipMinus(), LbmIntersectExecStream::execute(), findMinInput(), LbmIntersectExecStream::intersectSegments(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), and restartSubtrahends().

uint LbmBitOpExecStream::iInput [protected, inherited]

Current input stream being processed.

Definition at line 113 of file LbmBitOpExecStream.h.

Referenced by advanceSubtrahends(), LbmIntersectExecStream::execute(), LbmIntersectExecStream::open(), readMinuendInputAndFlush(), and restartSubtrahends().

boost::scoped_array<TupleData> LbmBitOpExecStream::bitmapSegTuples [protected, inherited]

Tuple data for each input stream.

Definition at line 118 of file LbmBitOpExecStream.h.

Referenced by comparePrefixes(), copyPrefix(), open(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), and restartSubtrahends().

LbmSegmentWriter LbmBitOpExecStream::segmentWriter [protected, inherited]

Segment writer.

Definition at line 123 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::addSegments(), LbmBitOpExecStream::flush(), LbmBitOpExecStream::open(), and LbmBitOpExecStream::producePendingOutput().

boost::scoped_array<FixedBuffer> LbmBitOpExecStream::outputBuf [protected, inherited]

Buffer for writing output bitmap segment.

Definition at line 128 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::closeImpl(), and LbmBitOpExecStream::open().

boost::scoped_array<FixedBuffer> LbmBitOpExecStream::byteSegBuf [protected, inherited]

Temporary buffer for bit operation.

Definition at line 133 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::closeImpl(), and LbmBitOpExecStream::open().

PBuffer LbmBitOpExecStream::pByteSegBuf [protected, inherited]

Pointer to byteSegBuf.

Definition at line 138 of file LbmBitOpExecStream.h.

Referenced by execute(), LbmIntersectExecStream::intersectSegments(), minusSegments(), LbmBitOpExecStream::open(), and readMinuendInputAndFlush().

uint LbmBitOpExecStream::bitmapBufSize [protected, inherited]

Amount of space available in buffer for bitmaps.

Definition at line 143 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::open(), LbmBitOpExecStream::readInput(), and readMinuendInput().

TupleData LbmBitOpExecStream::outputTuple [protected, inherited]

Output tuple data containing AND'd bitmap segments.

Definition at line 148 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::addSegments(), LbmBitOpExecStream::flush(), and LbmBitOpExecStream::producePendingOutput().

bool LbmBitOpExecStream::producePending [protected, inherited]

True if a tuple needs to be written to the output stream.

Definition at line 153 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::addSegments(), execute(), LbmIntersectExecStream::execute(), LbmBitOpExecStream::flush(), LbmBitOpExecStream::open(), and LbmBitOpExecStream::producePendingOutput().

LcsRid LbmBitOpExecStream::addRid [protected, inherited]

Current rid value to be added to the bitmap segment.

Definition at line 158 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::addSegments(), execute(), and LbmIntersectExecStream::intersectSegments().

PBuffer LbmBitOpExecStream::addByteSeg [protected, inherited]

Current byte segment to be added.

Definition at line 163 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::addSegments(), execute(), and LbmIntersectExecStream::intersectSegments().

uint LbmBitOpExecStream::addLen [protected, inherited]

Current length of byte segment to be added.

Definition at line 168 of file LbmBitOpExecStream.h.

Referenced by LbmBitOpExecStream::addSegments(), execute(), and LbmIntersectExecStream::intersectSegments().

int LbmBitOpExecStream::nFields [protected, inherited]

Number of non-bitmap fields preceding the bitmap fields.

Definition at line 173 of file LbmBitOpExecStream.h.

Referenced by checkNeedForRestart(), comparePrefixes(), copyPrefix(), execute(), open(), prepare(), LbmBitOpExecStream::prepare(), produceTuple(), readMinuendInputAndFlush(), and restartSubtrahends().

std::vector<SharedExecStreamBufAccessor> ConfluenceExecStream::inAccessors [protected, inherited]

Definition at line 50 of file ConfluenceExecStream.h.

Referenced by NestedLoopJoinExecStream::checkNumInputs(), CartesianJoinExecStream::checkNumInputs(), comparePrefixes(), LbmGeneratorExecStream::execute(), MergeExecStream::execute(), BarrierExecStream::execute(), findMinInput(), LcsRowScanExecStream::initializeFiltersIfNeeded(), LcsRowScanExecStream::open(), LbmUnionExecStream::open(), open(), LbmGeneratorExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), ConfluenceExecStream::open(), LcsRowScanExecStream::prepare(), LbmUnionExecStream::prepare(), prepare(), LbmGeneratorExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConfluenceExecStream::prepare(), CartesianJoinExecStream::prepare(), BarrierExecStream::prepare(), NestedLoopJoinExecStream::preProcessRightInput(), BarrierExecStream::processInputTuple(), LbmBitOpExecStream::producePendingOutput(), restartSubtrahends(), LhxJoinExecStream::setHashInfo(), and ConfluenceExecStream::setInputBufAccessors().

SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited]

Definition at line 56 of file SingleOutputExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:34 2009 for Fennel by  doxygen 1.5.1