#include <LbmMinusExecStream.h>
Inheritance diagram for LbmMinusExecStream:
Public Member Functions | |
virtual void | prepare (LbmMinusExecStreamParams const ¶ms) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | closeImpl () |
Implements ClosableObject. | |
virtual void | prepare (LbmBitOpExecStreamParams const ¶ms) |
virtual void | prepare (ConfluenceExecStreamParams const ¶ms) |
virtual void | prepare (SingleOutputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual bool | canEarlyClose () |
| |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
Protected Member Functions | |
bool | produceTuple (TupleData bitmapTuple) |
Produces a tuple to the output stream, based on a bitmap. | |
ExecStreamResult | producePendingOutput (uint iInput) |
Produces output tuple that previously failed due to buffer overflow. | |
ExecStreamResult | readInput (uint iInput, LcsRid &currRid, PBuffer &currByteSeg, uint &currLen) |
Reads a byte segment from a specific input stream. | |
bool | flush () |
Flushes the segment writer if it has any data. | |
bool | addSegments () |
Adds the processed segments to the segment under construction. | |
void | writeStartRidParamValue () |
Writes the startRid value to the startRid dynamic parameter, if one exists. | |
Protected Attributes | |
DynamicParamId | rowLimitParamId |
Parameter id representing the dynamic parameter used to limit the number of rows producers for this stream should produce on a single execute. | |
DynamicParamId | startRidParamId |
Parameter id representing the dynamic parameter used to set the starting rid value for bitmap entries to be produced by this stream's producers. | |
TupleDatum | rowLimitDatum |
Tuple datum used to store dynamic paramter for rowLimit. | |
TupleDatum | startRidDatum |
Tuple datum used to store dynamic parameter for startRid. | |
RecordNum | rowLimit |
Number of rows that can be produced. | |
LcsRid | startRid |
Desired starting rid value for bitmap entries. | |
boost::scoped_array< LbmSegmentReader > | segmentReaders |
One segment reader for each input stream. | |
uint | nInputs |
Number of input streams. | |
uint | iInput |
Current input stream being processed. | |
boost::scoped_array< TupleData > | bitmapSegTuples |
Tuple data for each input stream. | |
LbmSegmentWriter | segmentWriter |
Segment writer. | |
boost::scoped_array< FixedBuffer > | outputBuf |
Buffer for writing output bitmap segment. | |
boost::scoped_array< FixedBuffer > | byteSegBuf |
Temporary buffer for bit operation. | |
PBuffer | pByteSegBuf |
Pointer to byteSegBuf. | |
uint | bitmapBufSize |
Amount of space available in buffer for bitmaps. | |
TupleData | outputTuple |
Output tuple data containing AND'd bitmap segments. | |
bool | producePending |
True if a tuple needs to be written to the output stream. | |
LcsRid | addRid |
Current rid value to be added to the bitmap segment. | |
PBuffer | addByteSeg |
Current byte segment to be added. | |
uint | addLen |
Current length of byte segment to be added. | |
int | nFields |
Number of non-bitmap fields preceding the bitmap fields. | |
std::vector< SharedExecStreamBufAccessor > | inAccessors |
SharedExecStreamBufAccessor | pOutAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
Private Types | |
enum | MinusInputType { UNKNOWN_INPUT = 0, EMPTY_INPUT, NONEMPTY_INPUT } |
Private Member Functions | |
ExecStreamResult | readMinuendInputAndFlush (LcsRid &currRid, PBuffer &currByteSeg, uint &currLen) |
Read a byte segment from the minuend input stream. | |
ExecStreamResult | readMinuendInput (LcsRid &currRid, PBuffer &currByteSeg, uint &currLen) |
Reads the minuend input as a random sequence of segments. | |
int | comparePrefixes () |
void | restartSubtrahends () |
void | copyPrefix () |
ExecStreamResult | advanceSingleSubtrahend (int inputNo, LcsRid rid) |
Advance a single subtrahend to the specified rid. | |
ExecStreamResult | advanceSubtrahends (LcsRid baseRid) |
Advances all subtrahends to the desired start rid. | |
ExecStreamResult | minusSegments (LcsRid baseRid, PBuffer baseByteSeg, uint baseLen) |
Performs the minus operation between the minuend and the subtrahends. | |
ExecStreamResult | findMinInput (int &minInput) |
Determines which subtrahend contains the minimum rid value in its current input stream. | |
bool | checkNeedForRestart () |
Restarts the subtrahends, as needed, when the input has keys. | |
bool | canSkipMinus () |
Determines if it's possible to avoid the minus operation for the current segment read from the minuend by using a bitmap that keeps track of subtrahend rids that have been read. | |
Private Attributes | |
bool | subtrahendsDone |
True if all subtrahends have reached EOS at some point. | |
bool | needToRead |
True if a new segment needs to be read from the minuend. | |
LcsRid | baseRid |
Current startrid for the minuend. | |
PBuffer | baseByteSeg |
Current byte segment for the minuend. | |
uint | baseLen |
Length of minuend's current byte segment. | |
LcsRid | minSubtrahendRid |
Minimum rid from amongst the subtrahends. | |
LcsRid | maxSubtrahendRid |
The maximum rid that have been read by a subtrahend. | |
bool | advancePending |
True if a subtrahend needs to be advanced even though all subtrahends are already positioned past the minuend's startrid. | |
LcsRid | advanceSubtrahendRid |
The rid that the subtrahend needs to be advanced to when advancePending is true. | |
int | advanceSubtrahendInputNo |
The input containing the subtrahend that needs to be advanced. | |
MinusInputType | inputType |
Field used to detect the special case of empty inputs. | |
LbmSeqSegmentReader | minuendReader |
A sequential reader used when the minuend input has keys, which may lead to RIDs being out of order. | |
bool | prevTupleValid |
Whether the previous set of prefix fields is valid. | |
TupleDataWithBuffer | prevTuple |
Previous set of prefix fields. | |
bool | copyPrefixPending |
Whether the prefix should be copied before reading more tuples. | |
TupleData | prefixedBitmapTuple |
Tuple data used to build prefixed output. | |
boost::dynamic_bitset | subtrahendBitmap |
Bitmap containing rid values read from the subtrahends. | |
bool | needSubtrahendRestart |
True if the subtrahends need to be restarted. | |
Static Private Attributes | |
static const uint | SUBTRAHEND_BITMAP_SIZE = 32768 |
Number of bits in the bitmap that keeps track of rid values read from the subtrahends. |
A minus stream is generally used to subtract an ordered bitmap input streams. In a special case, however, the first input (the "minuend") may contain fields in addition to bitmap fields. The additional fields are considered to be "key fields" and are required to be the first fields. Key fields are propagated to the output stream and allow index only scans to return a result set. A side effect is that the minuend input is only partially ordered when using key fields.
To support a partially ordered minuend input, the streams to be subtracted (the "subtrahends") are restarted when the minuend is out of order.
Definition at line 58 of file LbmMinusExecStream.h.
enum LbmMinusExecStream::MinusInputType [private] |
Definition at line 114 of file LbmMinusExecStream.h.
00114 { 00115 UNKNOWN_INPUT = 0, 00116 EMPTY_INPUT, 00117 NONEMPTY_INPUT 00118 };
ExecStreamResult LbmMinusExecStream::readMinuendInputAndFlush | ( | LcsRid & | currRid, | |
PBuffer & | currByteSeg, | |||
uint & | currLen | |||
) | [private] |
Read a byte segment from the minuend input stream.
In the case where the input has keys, tuples are flushed whenever a new key is read.
currRid | the starting rid value of the segment read | |
currByteSeg | the first byte of the segment read | |
currLen | the length of the segment read |
Definition at line 167 of file LbmMinusExecStream.cpp.
References baseByteSeg, baseLen, baseRid, comparePrefixes(), copyPrefix(), copyPrefixPending, EXECRC_BUF_OVERFLOW, EXECRC_YIELD, LbmBitOpExecStream::flush(), LbmSegmentReaderBase::getTupleChange(), LbmBitOpExecStream::iInput, minuendReader, needSubtrahendRestart, needToRead, LbmBitOpExecStream::nFields, LbmBitOpExecStream::pByteSegBuf, prevTupleValid, LbmBitOpExecStream::readInput(), readMinuendInput(), LbmSegmentReaderBase::resetChangeListener(), LbmBitOpExecStream::startRid, and LbmBitOpExecStream::writeStartRidParamValue().
Referenced by execute().
00169 { 00170 ExecStreamResult rc; 00171 bool unordered = false; 00172 00173 // If there are no keys, read as the minuend as an ordered input. 00174 // Otherwise, read the minuend as a random sequence of segments. 00175 if (nFields == 0) { 00176 rc = readInput(0, currRid, currByteSeg, currLen); 00177 } else { 00178 rc = readMinuendInput(currRid, currByteSeg, currLen); 00179 if (currRid < startRid) { 00180 unordered = true; 00181 } 00182 } 00183 if (rc != EXECRC_YIELD) { 00184 return rc; 00185 } 00186 00187 // Store the segment just read 00188 memcpy(pByteSegBuf, baseByteSeg - baseLen + 1, baseLen); 00189 needToRead = false; 00190 // reset the startrid to the rid just read in and write the 00191 // dynamic parameter so the subtrahends can skip forward to that 00192 // rid 00193 startRid = baseRid; 00194 writeStartRidParamValue(); 00195 iInput = 1; 00196 00197 // If there are no keys (the usual case), we never need to restart inputs 00198 if (nFields == 0) { 00199 return rc; 00200 } 00201 00202 // If there are keys, then data is expected to come from an index. RIDs 00203 // may be ordered for each key, but are not ordered for the entire stream. 00204 // In fact, when minus keys are only a subset of an index's keys, then 00205 // RIDs may restart at any time. 00206 // (Ex: RIDs in index [K1, K2] are ordered for each pair [k1, k2]. 00207 // However, a minus based on [K1] will be completely out of order.) 00208 // 00209 // Due to the lack of ordering, we may need to restart subtrahends 00210 // whenever the minuend is out of order so all of the subtrahend data 00211 // can be minused from the next minuend input. That is handled outside 00212 // of this method because restarts don't always need to be done 00213 // immediately after a new key is read. 00214 // 00215 // We also flush the segment writer's current tuple. If it cannot be 00216 // written, then we can't copy the next prefix yet, because the old 00217 // values will be used to construct the pending output tuple. 00218 if (prevTupleValid) { 00219 if (minuendReader.getTupleChange()) { 00220 minuendReader.resetChangeListener(); 00221 int keyComp = comparePrefixes(); 00222 if (keyComp != 0 || unordered) { 00223 needSubtrahendRestart = true; 00224 if (!flush()) { 00225 copyPrefixPending = true; 00226 return EXECRC_BUF_OVERFLOW; 00227 } 00228 copyPrefix(); 00229 } 00230 } 00231 } else { 00232 prevTupleValid = true; 00233 copyPrefix(); 00234 minuendReader.resetChangeListener(); 00235 } 00236 return rc; 00237 }
ExecStreamResult LbmMinusExecStream::readMinuendInput | ( | LcsRid & | currRid, | |
PBuffer & | currByteSeg, | |||
uint & | currLen | |||
) | [private] |
Reads the minuend input as a random sequence of segments.
Definition at line 239 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::bitmapBufSize, byteNumberToRid(), EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_YIELD, LbmBitOpExecStream::flush(), minuendReader, SingleOutputExecStream::pOutAccessor, and LbmSeqSegmentReader::readSegmentAndAdvance().
Referenced by readMinuendInputAndFlush().
00241 { 00242 LbmByteNumber byteNumber; 00243 ExecStreamResult rc = minuendReader.readSegmentAndAdvance( 00244 byteNumber, currByteSeg, currLen); 00245 currRid = byteNumberToRid(byteNumber); 00246 if (rc == EXECRC_EOS) { 00247 // write out the last pending segment 00248 if (! flush()) { 00249 return EXECRC_BUF_OVERFLOW; 00250 } 00251 pOutAccessor->markEOS(); 00252 return EXECRC_EOS; 00253 } else if (rc != EXECRC_YIELD) { 00254 return rc; 00255 } 00256 00257 // segment read should never be larger than space available 00258 // for segments 00259 assert(currLen <= bitmapBufSize); 00260 00261 return EXECRC_YIELD; 00262 }
int LbmMinusExecStream::comparePrefixes | ( | ) | [private] |
Definition at line 264 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::bitmapSegTuples, ConfluenceExecStream::inAccessors, LbmBitOpExecStream::nFields, and prevTuple.
Referenced by readMinuendInputAndFlush().
00265 { 00266 int ret = 00267 (inAccessors[0]->getTupleDesc()).compareTuplesKey( 00268 prevTuple, 00269 bitmapSegTuples[0], 00270 nFields); 00271 return ret; 00272 }
void LbmMinusExecStream::restartSubtrahends | ( | ) | [private] |
Definition at line 274 of file LbmMinusExecStream.cpp.
References advancePending, LbmBitOpExecStream::bitmapSegTuples, ExecStream::getStreamId(), ExecStreamGraph::getStreamInput(), LbmBitOpExecStream::iInput, ConfluenceExecStream::inAccessors, minSubtrahendRid, needSubtrahendRestart, LbmBitOpExecStream::nFields, LbmBitOpExecStream::nInputs, ExecStream::pGraph, LbmBitOpExecStream::segmentReaders, subtrahendBitmap, and subtrahendsDone.
Referenced by checkNeedForRestart(), and open().
00275 { 00276 minSubtrahendRid = LcsRid(0); 00277 advancePending = false; 00278 for (uint i = 1; i < nInputs; i++) { 00279 pGraph->getStreamInput(getStreamId(), i)->open(true); 00280 segmentReaders[i].init( 00281 inAccessors[i], 00282 bitmapSegTuples[i], 00283 (!subtrahendsDone && nFields > 0), 00284 &subtrahendBitmap); 00285 } 00286 iInput = 1; 00287 needSubtrahendRestart = false; 00288 }
void LbmMinusExecStream::copyPrefix | ( | ) | [private] |
Definition at line 290 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::bitmapSegTuples, LbmBitOpExecStream::nFields, prevTuple, and TupleDataWithBuffer::resetBuffer().
Referenced by execute(), and readMinuendInputAndFlush().
00291 { 00292 /* 00293 Need to make sure pointers are allocated before memcpy. 00294 resetBuffer restores the pointers to the associated buffer. 00295 */ 00296 prevTuple.resetBuffer(); 00297 00298 for (int i = 0; i < nFields; i ++) { 00299 prevTuple[i].memCopyFrom(bitmapSegTuples[0][i]); 00300 } 00301 }
ExecStreamResult LbmMinusExecStream::advanceSingleSubtrahend | ( | int | inputNo, | |
LcsRid | rid | |||
) | [private] |
Advance a single subtrahend to the specified rid.
inputNo | input number of the subtrahend | |
rid | rid to be advanced to |
Definition at line 303 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::segmentReaders.
Referenced by execute().
00306 { 00307 ExecStreamResult rc = segmentReaders[inputNo].advanceToRid(rid); 00308 return rc; 00309 }
ExecStreamResult LbmMinusExecStream::advanceSubtrahends | ( | LcsRid | baseRid | ) | [private] |
Advances all subtrahends to the desired start rid.
baseRid | desired startrid |
Definition at line 311 of file LbmMinusExecStream.cpp.
References EXECRC_EOS, EXECRC_YIELD, LbmBitOpExecStream::iInput, minSubtrahendRid, LbmBitOpExecStream::nInputs, and LbmBitOpExecStream::segmentReaders.
Referenced by execute().
00312 { 00313 // no need to advance subtrahends if they're all positioned past the 00314 // minuend 00315 if (minSubtrahendRid > baseRid) { 00316 return EXECRC_YIELD; 00317 } 00318 00319 // advance the subtrahends, resuming at the one where we last left off 00320 for (; iInput < nInputs; iInput++) { 00321 ExecStreamResult rc = segmentReaders[iInput].advanceToRid(baseRid); 00322 if (rc == EXECRC_EOS) { 00323 continue; 00324 } 00325 if (rc != EXECRC_YIELD) { 00326 return rc; 00327 } 00328 } 00329 00330 return EXECRC_YIELD; 00331 }
ExecStreamResult LbmMinusExecStream::minusSegments | ( | LcsRid | baseRid, | |
PBuffer | baseByteSeg, | |||
uint | baseLen | |||
) | [private] |
Performs the minus operation between the minuend and the subtrahends.
baseRid | start rid of the minuend | |
baseByteSeg | pointer to the first byte of the minuend segment; note that the segment is stored backwards so it needs to be read from right to left | |
baseLen | length of the minuend segment |
Definition at line 400 of file LbmMinusExecStream.cpp.
References advancePending, advanceSubtrahendInputNo, advanceSubtrahendRid, EXECRC_EOS, EXECRC_YIELD, findMinInput(), LbmSegment::LbmOneByteSize, min(), opaqueToInt(), LbmBitOpExecStream::pByteSegBuf, and LbmBitOpExecStream::segmentReaders.
Referenced by execute().
00402 { 00403 while (true) { 00404 // find the subtrahend with the minimum startrid and read its current 00405 // segment 00406 int minInput; 00407 ExecStreamResult rc = findMinInput(minInput); 00408 if (rc == EXECRC_EOS) { 00409 return rc; 00410 } 00411 00412 LcsRid currRid; 00413 PBuffer currByteSeg; 00414 uint currLen; 00415 segmentReaders[minInput].readCurrentByteSegment( 00416 currRid, currByteSeg, currLen); 00417 00418 // if the subtrahends are not within the range of the minuend's 00419 // current rid range, ignore the current segment and get a new one 00420 uint offset = 00421 opaqueToInt(currRid - baseRid) / LbmSegment::LbmOneByteSize; 00422 if (offset >= baseLen) { 00423 break; 00424 } 00425 00426 // only read from the subtrahends the amount that will match the 00427 // minuend's segment 00428 currLen = std::min(currLen, baseLen - offset); 00429 00430 // minus from the minuend -- note that segments are stored 00431 // backwards 00432 PBuffer out = pByteSegBuf + baseLen - 1 - offset; 00433 uint len = currLen; 00434 while (len--) { 00435 *out-- &= ~(*currByteSeg--); 00436 } 00437 00438 // advance the subtrahend by the amount read in; note that we don't 00439 // return if this subtrahend has reached EOS, as there may still be 00440 // other subtrahends that aren't in the EOS state 00441 rc = segmentReaders[minInput].advanceToRid( 00442 currRid + currLen * LbmSegment::LbmOneByteSize); 00443 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) { 00444 advancePending = true; 00445 advanceSubtrahendRid = 00446 currRid + currLen * LbmSegment::LbmOneByteSize; 00447 advanceSubtrahendInputNo = minInput; 00448 return rc; 00449 } 00450 } 00451 00452 return EXECRC_YIELD; 00453 }
ExecStreamResult LbmMinusExecStream::findMinInput | ( | int & | minInput | ) | [private] |
Determines which subtrahend contains the minimum rid value in its current input stream.
minInput | returns input number corresponding to the subtrahend with the minimum rid input |
Definition at line 455 of file LbmMinusExecStream.cpp.
References EXECBUF_EOS, EXECRC_EOS, EXECRC_YIELD, ConfluenceExecStream::inAccessors, maxSubtrahendRid, minSubtrahendRid, LbmBitOpExecStream::nInputs, LbmBitOpExecStream::segmentReaders, and subtrahendsDone.
Referenced by execute(), and minusSegments().
00456 { 00457 minInput = -1; 00458 00459 for (uint i = 1; i < nInputs; i++) { 00460 if (inAccessors[i]->getState() == EXECBUF_EOS) { 00461 continue; 00462 } 00463 00464 LcsRid currRid; 00465 PBuffer currByteSeg; 00466 uint currLen; 00467 segmentReaders[i].readCurrentByteSegment( 00468 currRid, currByteSeg, currLen); 00469 00470 if (minInput == -1 || currRid < minSubtrahendRid) { 00471 minInput = i; 00472 minSubtrahendRid = currRid; 00473 } 00474 } 00475 00476 if (minInput == -1) { 00477 // Note that once we've made one pass over the subtrahends, by setting 00478 // subtrahendsDone, we'll avoid resetting the bits on subsequent passes. 00479 // Position minSubtrahendRid past the max subtrahend rid since the 00480 // subtrahends are no longer positioned at that minimum rid. 00481 subtrahendsDone = true; 00482 for (uint i = 1; i < nInputs; i++) { 00483 LcsRid rid = segmentReaders[i].getMaxRidSet(); 00484 if (rid > maxSubtrahendRid) { 00485 maxSubtrahendRid = rid; 00486 } 00487 } 00488 minSubtrahendRid = maxSubtrahendRid + 1; 00489 return EXECRC_EOS; 00490 } else { 00491 return EXECRC_YIELD; 00492 } 00493 }
bool LbmMinusExecStream::checkNeedForRestart | ( | ) | [private] |
Restarts the subtrahends, as needed, when the input has keys.
The pre-condition for a restart is either a change in key value in the input or unordered rid values in the minuend. Once this pre-condition is met, then two additional criteria are also required -- the current minuend must contain rids that overlap with the subtrahends, and the subtrahend is positioned past the current minuend.
For a minuend that meets the pre-condition just described, even if it doesn't meet the additional criteria, then the very first segment that follows that does meet the criteria will need a restart. After that restart is done, then no further restart checks are needed until the restart pre-condition is met again.
As part of determining whether or not the restart is necessary, we check for overlapping rids. If there is no overlap, then the minus operation can be skipped. This skip minus check is always done, provided the input is non-empty.
Definition at line 333 of file LbmMinusExecStream.cpp.
References canSkipMinus(), EMPTY_INPUT, inputType, minSubtrahendRid, needSubtrahendRestart, LbmBitOpExecStream::nFields, restartSubtrahends(), and LbmBitOpExecStream::startRid.
Referenced by execute().
00334 { 00335 // Return value indicates whether the minus can be skipped, not whether 00336 // a restart was done. 00337 00338 if (nFields == 0) { 00339 return false; 00340 } else if (inputType == EMPTY_INPUT) { 00341 // If the input is empty, we can always bypass the minus 00342 return true; 00343 } else if (needSubtrahendRestart) { 00344 bool skipMinus = canSkipMinus(); 00345 // If there are potentially overlapping rids and the subtrahend is 00346 // positioned past the current minuend, we need to restart 00347 if (!skipMinus && minSubtrahendRid > startRid) { 00348 restartSubtrahends(); 00349 } 00350 return skipMinus; 00351 } else { 00352 return canSkipMinus(); 00353 } 00354 }
bool LbmMinusExecStream::canSkipMinus | ( | ) | [private] |
Determines if it's possible to avoid the minus operation for the current segment read from the minuend by using a bitmap that keeps track of subtrahend rids that have been read.
Definition at line 356 of file LbmMinusExecStream.cpp.
References baseByteSeg, baseLen, baseRid, LbmSegment::LbmOneByteSize, maxSubtrahendRid, LbmBitOpExecStream::nInputs, opaqueToInt(), LbmBitOpExecStream::segmentReaders, SUBTRAHEND_BITMAP_SIZE, subtrahendBitmap, and subtrahendsDone.
Referenced by checkNeedForRestart().
00357 { 00358 LcsRid rid = baseRid; 00359 LcsRid endRid = baseRid + baseLen * LbmSegment::LbmOneByteSize - 1; 00360 00361 // Determine whether the rids in the current minuend segment are 00362 // "covered" by the bitmap in its current state 00363 if (subtrahendsDone) { 00364 // If the first rid we're interested in extends past the max rid read 00365 // from all subtrahends, then there are no rids to subtract off. 00366 if (rid > maxSubtrahendRid) { 00367 return true; 00368 } 00369 } else { 00370 // If the last rid we're interested in extends past the max rid read 00371 // from any subtrahend, then we can't use the bitmap to determine if 00372 // we can skip the minus. 00373 for (uint i = 1; i < nInputs; i++) { 00374 if (endRid > segmentReaders[i].getMaxRidSet()) { 00375 return false; 00376 } 00377 } 00378 } 00379 00380 PBuffer seg = baseByteSeg; 00381 for (uint i = 0; i < baseLen; i++) { 00382 uint8_t byte = *((uint8_t *) seg); 00383 for (uint j = 0; j < LbmSegment::LbmOneByteSize; j++) { 00384 if (byte & 1) { 00385 // once we find a match, no need to look any further 00386 if (subtrahendBitmap.test( 00387 opaqueToInt(rid % SUBTRAHEND_BITMAP_SIZE))) 00388 { 00389 return false; 00390 } 00391 } 00392 byte = byte >> 1; 00393 rid++; 00394 } 00395 seg--; 00396 } 00397 return true; 00398 }
bool LbmMinusExecStream::produceTuple | ( | TupleData | bitmapTuple | ) | [protected, virtual] |
Produces a tuple to the output stream, based on a bitmap.
Reimplemented from LbmBitOpExecStream.
Definition at line 495 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::nFields, SingleOutputExecStream::pOutAccessor, prefixedBitmapTuple, and prevTuple.
00496 { 00497 // If the minuend contained prefix fields, they are prepended to 00498 // the output: ([optional prefix fields], bitmap) 00499 if (nFields) { 00500 for (uint i = 0; i < nFields; i++) { 00501 prefixedBitmapTuple[i].copyFrom(prevTuple[i]); 00502 } 00503 assert (prefixedBitmapTuple.size() == nFields + bitmapTuple.size()); 00504 for (uint i = 0; i < 3; i++) { 00505 prefixedBitmapTuple[nFields + i].copyFrom(bitmapTuple[i]); 00506 } 00507 return pOutAccessor->produceTuple(prefixedBitmapTuple); 00508 } 00509 return pOutAccessor->produceTuple(bitmapTuple); 00510 }
void LbmMinusExecStream::prepare | ( | LbmMinusExecStreamParams const & | params | ) | [virtual] |
Definition at line 29 of file LbmMinusExecStream.cpp.
References TupleData::compute(), TupleDataWithBuffer::computeAndAllocate(), ConfluenceExecStream::inAccessors, LbmBitOpExecStream::nFields, prefixedBitmapTuple, LbmBitOpExecStream::prepare(), prevTuple, and subtrahendBitmap.
00030 { 00031 LbmBitOpExecStream::prepare(params); 00032 00033 if (nFields) { 00034 // compute an output tuple based on the minuend 00035 prefixedBitmapTuple.compute(inAccessors[0]->getTupleDesc()); 00036 00037 // prevTuple contains only the prefix fields, it has it's own 00038 // storage to track the previous tuple when the current tuple 00039 // pointers have moved forward 00040 TupleDescriptor prevTupleDesc; 00041 TupleDescriptor const &inputDesc = inAccessors[0]->getTupleDesc(); 00042 for (int i = 0; i < nFields; i ++) { 00043 prevTupleDesc.push_back(inputDesc[i]); 00044 } 00045 prevTuple.computeAndAllocate(prevTupleDesc); 00046 } 00047 subtrahendBitmap.resize(0); 00048 }
void LbmMinusExecStream::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from LbmBitOpExecStream.
Definition at line 50 of file LbmMinusExecStream.cpp.
References advancePending, baseRid, LbmBitOpExecStream::bitmapSegTuples, copyPrefixPending, ConfluenceExecStream::inAccessors, LbmSeqSegmentReader::init(), inputType, maxSubtrahendRid, minSubtrahendRid, minuendReader, needToRead, LbmBitOpExecStream::nFields, LbmBitOpExecStream::open(), prevTupleValid, restartSubtrahends(), LbmBitOpExecStream::rowLimit, SUBTRAHEND_BITMAP_SIZE, subtrahendBitmap, subtrahendsDone, and UNKNOWN_INPUT.
00051 { 00052 LbmBitOpExecStream::open(restart); 00053 subtrahendsDone = false; 00054 needToRead = true; 00055 minSubtrahendRid = LcsRid(0); 00056 maxSubtrahendRid = LcsRid(0); 00057 baseRid = LcsRid(0); 00058 advancePending = false; 00059 // since the subtrahends need to read till EOS, don't set a rowLimit 00060 rowLimit = 0; 00061 inputType = UNKNOWN_INPUT; 00062 copyPrefixPending = false; 00063 prevTupleValid = false; 00064 minuendReader.init(inAccessors[0], bitmapSegTuples[0]); 00065 00066 if (nFields > 0) { 00067 subtrahendBitmap.resize(SUBTRAHEND_BITMAP_SIZE); 00068 } 00069 restartSubtrahends(); 00070 }
ExecStreamResult LbmMinusExecStream::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 72 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::addByteSeg, LbmBitOpExecStream::addLen, LbmBitOpExecStream::addRid, LbmBitOpExecStream::addSegments(), advancePending, advanceSingleSubtrahend(), advanceSubtrahendInputNo, advanceSubtrahendRid, advanceSubtrahends(), baseByteSeg, baseLen, baseRid, checkNeedForRestart(), copyPrefix(), copyPrefixPending, dummy(), EMPTY_INPUT, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, findMinInput(), inputType, LbmSegment::LbmOneByteSize, minusSegments(), needToRead, LbmBitOpExecStream::nFields, NONEMPTY_INPUT, ExecStreamQuantum::nTuplesMax, LbmBitOpExecStream::pByteSegBuf, LbmBitOpExecStream::producePending, LbmBitOpExecStream::producePendingOutput(), readMinuendInputAndFlush(), LbmBitOpExecStream::startRid, subtrahendsDone, and UNKNOWN_INPUT.
00073 { 00074 ExecStreamResult rc; 00075 00076 // On the first execution, check whether any subtrahend has data 00077 if (inputType == UNKNOWN_INPUT) { 00078 rc = advanceSubtrahends(LcsRid(0)); 00079 if (rc != EXECRC_YIELD) { 00080 return rc; 00081 } 00082 int dummy; 00083 rc = findMinInput(dummy); 00084 if (rc == EXECRC_EOS) { 00085 inputType = EMPTY_INPUT; 00086 if (nFields == 0) { 00087 subtrahendsDone = true; 00088 } 00089 } else { 00090 inputType = NONEMPTY_INPUT; 00091 } 00092 } 00093 00094 if (producePending) { 00095 rc = producePendingOutput(0); 00096 if (rc != EXECRC_YIELD) { 00097 return rc; 00098 } 00099 } 00100 00101 bool skipMinus = false; 00102 if (copyPrefixPending) { 00103 copyPrefix(); 00104 copyPrefixPending = false; 00105 needToRead = false; 00106 // Since we bypassed the restart check when this current minuend 00107 // was read, we need to do the check here 00108 skipMinus = checkNeedForRestart(); 00109 } 00110 00111 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00112 // read a segment from the minuend if we've finished processing the 00113 // previous segment 00114 if (needToRead) { 00115 rc = readMinuendInputAndFlush(baseRid, baseByteSeg, baseLen); 00116 if (rc != EXECRC_YIELD) { 00117 return rc; 00118 } 00119 00120 // See if we need to restart the subtrahends 00121 skipMinus = checkNeedForRestart(); 00122 } 00123 00124 // Minus the subtrahends if they haven't all reached EOS in the 00125 // case where there are no keys. In the case where there are keys, 00126 // the bitmap determines whether we can skip the minus. 00127 if ((nFields == 0 && !subtrahendsDone) || !skipMinus) { 00128 if (advancePending) { 00129 rc = 00130 advanceSingleSubtrahend( 00131 advanceSubtrahendInputNo, 00132 advanceSubtrahendRid); 00133 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) { 00134 return rc; 00135 } 00136 advancePending = false; 00137 } else { 00138 rc = advanceSubtrahends(baseRid); 00139 if (rc != EXECRC_YIELD) { 00140 return rc; 00141 } 00142 } 00143 00144 rc = minusSegments(baseRid, baseByteSeg, baseLen); 00145 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) { 00146 return rc; 00147 } 00148 } 00149 00150 // bump up the startrid past the segment just read and 00151 // write out the minuend segment 00152 needToRead = true; 00153 startRid = baseRid + baseLen * LbmSegment::LbmOneByteSize; 00154 addRid = baseRid; 00155 addByteSeg = pByteSegBuf; 00156 addLen = baseLen; 00157 if (!addSegments()) { 00158 return EXECRC_BUF_OVERFLOW; 00159 } 00160 00161 // loop back to read the next minuend segment 00162 } 00163 00164 return EXECRC_QUANTUM_EXPIRED; 00165 }
void LbmMinusExecStream::closeImpl | ( | ) | [virtual] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from LbmBitOpExecStream.
Definition at line 512 of file LbmMinusExecStream.cpp.
References LbmBitOpExecStream::closeImpl(), and subtrahendBitmap.
00513 { 00514 subtrahendBitmap.resize(0); 00515 LbmBitOpExecStream::closeImpl(); 00516 }
ExecStreamResult LbmBitOpExecStream::producePendingOutput | ( | uint | iInput | ) | [protected, inherited] |
Produces output tuple that previously failed due to buffer overflow.
Writes out the remaining segments that could not fit in the previous buffer. Determines if input has reached EOS.
iInput | input to read to determine if EOS reached |
Definition at line 110 of file LbmBitOpExecStream.cpp.
References LbmBitOpExecStream::addSegments(), EXECBUF_EOS, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_YIELD, ConfluenceExecStream::inAccessors, LbmSegmentWriter::isEmpty(), LbmBitOpExecStream::outputTuple, SingleOutputExecStream::pOutAccessor, LbmBitOpExecStream::producePending, LbmBitOpExecStream::produceTuple(), LbmSegmentWriter::reset(), and LbmBitOpExecStream::segmentWriter.
Referenced by execute(), and LbmIntersectExecStream::execute().
00111 { 00112 if (!produceTuple(outputTuple)) { 00113 return EXECRC_BUF_OVERFLOW; 00114 } 00115 // in the middle of adding segments when buffer overflow occurred; 00116 // go back and add the remaining segments 00117 if (!segmentWriter.isEmpty()) { 00118 segmentWriter.reset(); 00119 if (!addSegments()) { 00120 return EXECRC_BUF_OVERFLOW; 00121 } 00122 } 00123 producePending = false; 00124 if (inAccessors[iInput]->getState() == EXECBUF_EOS) { 00125 pOutAccessor->markEOS(); 00126 return EXECRC_EOS; 00127 } 00128 00129 return EXECRC_YIELD; 00130 }
ExecStreamResult LbmBitOpExecStream::readInput | ( | uint | iInput, | |
LcsRid & | currRid, | |||
PBuffer & | currByteSeg, | |||
uint & | currLen | |||
) | [protected, inherited] |
Reads a byte segment from a specific input stream.
iInput | input to read | |
currRid | startRid of the segment read | |
currByteSeg | byte segment read; points to the beginning of segment that is stored backwards | |
currLen | length of the byte segment read |
Definition at line 132 of file LbmBitOpExecStream.cpp.
References LbmBitOpExecStream::bitmapBufSize, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_YIELD, LbmBitOpExecStream::flush(), SingleOutputExecStream::pOutAccessor, LbmBitOpExecStream::segmentReaders, and LbmBitOpExecStream::startRid.
Referenced by LbmIntersectExecStream::execute(), and readMinuendInputAndFlush().
00134 { 00135 ExecStreamResult rc = segmentReaders[iInput].advanceToRid(startRid); 00136 00137 if (rc == EXECRC_EOS) { 00138 // write out the last pending segment 00139 if (! flush()) { 00140 return EXECRC_BUF_OVERFLOW; 00141 } 00142 pOutAccessor->markEOS(); 00143 return EXECRC_EOS; 00144 } else if (rc != EXECRC_YIELD) { 00145 return rc; 00146 } 00147 00148 segmentReaders[iInput].readCurrentByteSegment( 00149 currRid, currByteSeg, currLen); 00150 // segment read should never be larger than space available 00151 // for segments 00152 assert(currLen <= bitmapBufSize); 00153 00154 return EXECRC_YIELD; 00155 }
bool LbmBitOpExecStream::flush | ( | ) | [protected, inherited] |
Flushes the segment writer if it has any data.
The data is transferred to a bitmap tuple and the segment writer is reset. Finally, the tuple is produced to the output stream. If the tuple cannot be written, then it becomes a pending tuple.
Definition at line 157 of file LbmBitOpExecStream.cpp.
References LbmSegmentWriter::isEmpty(), LbmBitOpExecStream::outputTuple, LbmBitOpExecStream::producePending, LbmSegmentWriter::produceSegmentTuple(), LbmBitOpExecStream::produceTuple(), LbmSegmentWriter::reset(), and LbmBitOpExecStream::segmentWriter.
Referenced by LbmBitOpExecStream::readInput(), readMinuendInput(), and readMinuendInputAndFlush().
00158 { 00159 assert (!producePending); 00160 00161 if (!segmentWriter.isEmpty()) { 00162 outputTuple = segmentWriter.produceSegmentTuple(); 00163 segmentWriter.reset(); 00164 if (!produceTuple(outputTuple)) { 00165 producePending = true; 00166 } 00167 } 00168 return !producePending; 00169 }
bool LbmBitOpExecStream::addSegments | ( | ) | [protected, inherited] |
Adds the processed segments to the segment under construction.
If the segment fills up, writes it to the output buffer and continues constructing the rest of the segment. Leading, trailing, and intermediate zeros in the segment are removed.
Definition at line 171 of file LbmBitOpExecStream.cpp.
References LbmBitOpExecStream::addByteSeg, LbmBitOpExecStream::addLen, LbmBitOpExecStream::addRid, LbmSegmentWriter::addSegment(), LbmBitOpExecStream::outputTuple, LbmBitOpExecStream::producePending, LbmSegmentWriter::produceSegmentTuple(), LbmBitOpExecStream::produceTuple(), LbmSegmentWriter::reset(), and LbmBitOpExecStream::segmentWriter.
Referenced by execute(), LbmIntersectExecStream::intersectSegments(), and LbmBitOpExecStream::producePendingOutput().
00172 { 00173 while (addLen > 0) { 00174 if (segmentWriter.addSegment(addRid, addByteSeg, addLen)) { 00175 break; 00176 } 00177 00178 outputTuple = segmentWriter.produceSegmentTuple(); 00179 if (!produceTuple(outputTuple)) { 00180 producePending = true; 00181 return false; 00182 } 00183 00184 // loop back and start creating a new segment for the remainder of 00185 // the segments that wouldn't fit 00186 segmentWriter.reset(); 00187 } 00188 00189 return true; 00190 }
void LbmBitOpExecStream::writeStartRidParamValue | ( | ) | [protected, inherited] |
Writes the startRid value to the startRid dynamic parameter, if one exists.
Definition at line 205 of file LbmBitOpExecStream.cpp.
References opaqueToInt(), ExecStream::pDynamicParamManager, LbmBitOpExecStream::startRidDatum, and LbmBitOpExecStream::startRidParamId.
Referenced by LbmIntersectExecStream::execute(), LbmIntersectExecStream::intersectSegments(), LbmBitOpExecStream::open(), and readMinuendInputAndFlush().
00206 { 00207 if (opaqueToInt(startRidParamId) > 0) { 00208 pDynamicParamManager->writeParam(startRidParamId, startRidDatum); 00209 } 00210 }
void LbmBitOpExecStream::prepare | ( | LbmBitOpExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 28 of file LbmBitOpExecStream.cpp.
References LbmBitOpExecStream::bitmapSegTuples, TupleDatum::cbData, ConfluenceExecStream::inAccessors, LbmBitOpExecStream::nFields, LbmBitOpExecStream::nInputs, TupleDatum::pData, ConfluenceExecStream::prepare(), LbmBitOpExecStream::rowLimit, LbmBitOpExecStream::rowLimitDatum, LbmBitOpExecStreamParams::rowLimitParamId, LbmBitOpExecStream::rowLimitParamId, LbmBitOpExecStream::segmentReaders, LbmBitOpExecStream::startRid, LbmBitOpExecStream::startRidDatum, LbmBitOpExecStreamParams::startRidParamId, and LbmBitOpExecStream::startRidParamId.
Referenced by prepare(), and LbmIntersectExecStream::prepare().
00029 { 00030 ConfluenceExecStream::prepare(params); 00031 00032 // set dynanmic parameter ids 00033 rowLimitParamId = params.rowLimitParamId; 00034 startRidParamId = params.startRidParamId; 00035 00036 // setup tupledatums for writing dynamic parameter values 00037 rowLimitDatum.pData = (PConstBuffer) &rowLimit; 00038 rowLimitDatum.cbData = sizeof(rowLimit); 00039 startRidDatum.pData = (PConstBuffer) &startRid; 00040 startRidDatum.cbData = sizeof(startRid); 00041 00042 // initialize segment readers for reading bitmaps from input stream 00043 nInputs = inAccessors.size(); 00044 segmentReaders.reset(new LbmSegmentReader[nInputs]); 00045 bitmapSegTuples.reset(new TupleData[nInputs]); 00046 for (uint i = 0; i < nInputs; i++) { 00047 bitmapSegTuples[i].compute(inAccessors[i]->getTupleDesc()); 00048 } 00049 00050 nFields = inAccessors[0]->getTupleDesc().size() - 3; 00051 }
void ConfluenceExecStream::prepare | ( | ConfluenceExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 37 of file ConfluenceExecStream.cpp.
References ConfluenceExecStream::getInputBufProvision(), ConfluenceExecStream::inAccessors, and SingleOutputExecStream::prepare().
Referenced by LcsRowScanBaseExecStream::prepare(), LbmUnionExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), CartesianJoinExecStream::prepare(), and BarrierExecStream::prepare().
00038 { 00039 SingleOutputExecStream::prepare(params); 00040 00041 for (uint i = 0; i < inAccessors.size(); ++i) { 00042 assert(inAccessors[i]->getProvision() == getInputBufProvision()); 00043 } 00044 }
void SingleOutputExecStream::prepare | ( | SingleOutputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 48 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().
Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().
00049 { 00050 ExecStream::prepare(params); 00051 assert(pOutAccessor); 00052 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00053 if (pOutAccessor->getTupleDesc().empty()) { 00054 assert(!params.outputTupleDesc.empty()); 00055 pOutAccessor->setTupleShape( 00056 params.outputTupleDesc, 00057 params.outputTupleFormat); 00058 } 00059 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void ConfluenceExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Reimplemented from SingleOutputExecStream.
Definition at line 31 of file ConfluenceExecStream.cpp.
References ConfluenceExecStream::inAccessors.
00033 { 00034 inAccessors = inAccessorsInit; 00035 }
ExecStreamBufProvision ConfluenceExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from ExecStream.
Definition at line 58 of file ConfluenceExecStream.cpp.
References BUFPROV_PRODUCER.
Referenced by ConfluenceExecStream::prepare().
00059 { 00060 return BUFPROV_PRODUCER; 00061 }
void SingleOutputExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Implements ExecStream.
Reimplemented in ConduitExecStream.
Definition at line 41 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::pOutAccessor.
Referenced by ConduitExecStream::setOutputBufAccessors().
00043 { 00044 assert(outAccessors.size() == 1); 00045 pOutAccessor = outAccessors[0]; 00046 }
ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented from ExecStream.
Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.
Definition at line 69 of file SingleOutputExecStream.cpp.
References BUFPROV_CONSUMER.
Referenced by SingleOutputExecStream::prepare().
00070 { 00071 return BUFPROV_CONSUMER; 00072 }
bool ExecStream::canEarlyClose | ( | ) | [virtual, inherited] |
Reimplemented in SegBufferWriterExecStream.
Definition at line 49 of file ExecStream.cpp.
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual, inherited] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 93 of file ExecStream.cpp.
References EXEC_RESOURCE_ACCURATE.
Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().
00097 { 00098 getResourceRequirements(minQuantity, optQuantity); 00099 optType = EXEC_RESOURCE_ACCURATE; 00100 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual, inherited] |
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, FlatFileExecStreamImpl, BTreeInsertExecStream, BTreeReadExecStream, FtrsTableWriterExecStream, LbmChopperExecStream, LbmSplicerExecStream, LcsClusterAppendExecStream, LcsClusterReplaceExecStream, LcsRowScanBaseExecStream, and LcsRowScanExecStream.
Definition at line 102 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, and ExecStreamResourceQuantity::nThreads.
00105 { 00106 minQuantity.nThreads = 0; 00107 minQuantity.nCachePages = 0; 00108 optQuantity = minQuantity; 00109 }
void ExecStream::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual, inherited] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 111 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.
Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().
00113 { 00114 resourceAllocation = quantity; 00115 if (pQuotaAccessor) { 00116 pQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00117 } 00118 if (pScratchQuotaAccessor) { 00119 pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00120 } 00121 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
bool LbmMinusExecStream::subtrahendsDone [private] |
True if all subtrahends have reached EOS at some point.
Definition at line 64 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), execute(), findMinInput(), open(), and restartSubtrahends().
bool LbmMinusExecStream::needToRead [private] |
True if a new segment needs to be read from the minuend.
Definition at line 69 of file LbmMinusExecStream.h.
Referenced by execute(), open(), and readMinuendInputAndFlush().
LcsRid LbmMinusExecStream::baseRid [private] |
Current startrid for the minuend.
Definition at line 74 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), execute(), open(), and readMinuendInputAndFlush().
PBuffer LbmMinusExecStream::baseByteSeg [private] |
Current byte segment for the minuend.
Definition at line 79 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), execute(), and readMinuendInputAndFlush().
uint LbmMinusExecStream::baseLen [private] |
Length of minuend's current byte segment.
Definition at line 84 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), execute(), and readMinuendInputAndFlush().
LcsRid LbmMinusExecStream::minSubtrahendRid [private] |
Minimum rid from amongst the subtrahends.
Definition at line 89 of file LbmMinusExecStream.h.
Referenced by advanceSubtrahends(), checkNeedForRestart(), findMinInput(), open(), and restartSubtrahends().
LcsRid LbmMinusExecStream::maxSubtrahendRid [private] |
The maximum rid that have been read by a subtrahend.
This only applies if the input has keys.
Definition at line 95 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), findMinInput(), and open().
bool LbmMinusExecStream::advancePending [private] |
True if a subtrahend needs to be advanced even though all subtrahends are already positioned past the minuend's startrid.
Definition at line 101 of file LbmMinusExecStream.h.
Referenced by execute(), minusSegments(), open(), and restartSubtrahends().
LcsRid LbmMinusExecStream::advanceSubtrahendRid [private] |
The rid that the subtrahend needs to be advanced to when advancePending is true.
Definition at line 107 of file LbmMinusExecStream.h.
Referenced by execute(), and minusSegments().
int LbmMinusExecStream::advanceSubtrahendInputNo [private] |
The input containing the subtrahend that needs to be advanced.
Definition at line 112 of file LbmMinusExecStream.h.
Referenced by execute(), and minusSegments().
MinusInputType LbmMinusExecStream::inputType [private] |
Field used to detect the special case of empty inputs.
When the subtrahends are empty, there is no need to subtract them.
Definition at line 124 of file LbmMinusExecStream.h.
Referenced by checkNeedForRestart(), execute(), and open().
A sequential reader used when the minuend input has keys, which may lead to RIDs being out of order.
Definition at line 130 of file LbmMinusExecStream.h.
Referenced by open(), readMinuendInput(), and readMinuendInputAndFlush().
bool LbmMinusExecStream::prevTupleValid [private] |
Whether the previous set of prefix fields is valid.
Definition at line 135 of file LbmMinusExecStream.h.
Referenced by open(), and readMinuendInputAndFlush().
Previous set of prefix fields.
Definition at line 140 of file LbmMinusExecStream.h.
Referenced by comparePrefixes(), copyPrefix(), prepare(), and produceTuple().
bool LbmMinusExecStream::copyPrefixPending [private] |
Whether the prefix should be copied before reading more tuples.
Definition at line 145 of file LbmMinusExecStream.h.
Referenced by execute(), open(), and readMinuendInputAndFlush().
Tuple data used to build prefixed output.
Definition at line 150 of file LbmMinusExecStream.h.
Referenced by prepare(), and produceTuple().
const uint LbmMinusExecStream::SUBTRAHEND_BITMAP_SIZE = 32768 [static, private] |
Number of bits in the bitmap that keeps track of rid values read from the subtrahends.
Note that because the size of the bitmap is smaller than the number of possible rid values, there can be false hits when testing the bitmap. The larger this value, the more memory required, but the lower the likelihood of false hits.
Definition at line 159 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), and open().
boost::dynamic_bitset LbmMinusExecStream::subtrahendBitmap [private] |
Bitmap containing rid values read from the subtrahends.
Only used in the case when the input contains keys.
Definition at line 165 of file LbmMinusExecStream.h.
Referenced by canSkipMinus(), closeImpl(), open(), prepare(), and restartSubtrahends().
bool LbmMinusExecStream::needSubtrahendRestart [private] |
True if the subtrahends need to be restarted.
Definition at line 170 of file LbmMinusExecStream.h.
Referenced by checkNeedForRestart(), readMinuendInputAndFlush(), and restartSubtrahends().
DynamicParamId LbmBitOpExecStream::rowLimitParamId [protected, inherited] |
Parameter id representing the dynamic parameter used to limit the number of rows producers for this stream should produce on a single execute.
Definition at line 71 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::open(), and LbmBitOpExecStream::prepare().
DynamicParamId LbmBitOpExecStream::startRidParamId [protected, inherited] |
Parameter id representing the dynamic parameter used to set the starting rid value for bitmap entries to be produced by this stream's producers.
Definition at line 78 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), and LbmBitOpExecStream::writeStartRidParamValue().
TupleDatum LbmBitOpExecStream::rowLimitDatum [protected, inherited] |
Tuple datum used to store dynamic paramter for rowLimit.
Definition at line 83 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::open(), and LbmBitOpExecStream::prepare().
TupleDatum LbmBitOpExecStream::startRidDatum [protected, inherited] |
Tuple datum used to store dynamic parameter for startRid.
Definition at line 88 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::prepare(), and LbmBitOpExecStream::writeStartRidParamValue().
RecordNum LbmBitOpExecStream::rowLimit [protected, inherited] |
Number of rows that can be produced.
Definition at line 93 of file LbmBitOpExecStream.h.
Referenced by open(), LbmBitOpExecStream::open(), and LbmBitOpExecStream::prepare().
LcsRid LbmBitOpExecStream::startRid [protected, inherited] |
Desired starting rid value for bitmap entries.
Definition at line 98 of file LbmBitOpExecStream.h.
Referenced by checkNeedForRestart(), execute(), LbmIntersectExecStream::execute(), LbmIntersectExecStream::intersectSegments(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), LbmBitOpExecStream::readInput(), and readMinuendInputAndFlush().
boost::scoped_array<LbmSegmentReader> LbmBitOpExecStream::segmentReaders [protected, inherited] |
One segment reader for each input stream.
Definition at line 103 of file LbmBitOpExecStream.h.
Referenced by advanceSingleSubtrahend(), advanceSubtrahends(), canSkipMinus(), findMinInput(), LbmIntersectExecStream::intersectSegments(), minusSegments(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), LbmBitOpExecStream::readInput(), and restartSubtrahends().
uint LbmBitOpExecStream::nInputs [protected, inherited] |
Number of input streams.
Definition at line 108 of file LbmBitOpExecStream.h.
Referenced by advanceSubtrahends(), canSkipMinus(), LbmIntersectExecStream::execute(), findMinInput(), LbmIntersectExecStream::intersectSegments(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), and restartSubtrahends().
uint LbmBitOpExecStream::iInput [protected, inherited] |
Current input stream being processed.
Definition at line 113 of file LbmBitOpExecStream.h.
Referenced by advanceSubtrahends(), LbmIntersectExecStream::execute(), LbmIntersectExecStream::open(), readMinuendInputAndFlush(), and restartSubtrahends().
boost::scoped_array<TupleData> LbmBitOpExecStream::bitmapSegTuples [protected, inherited] |
Tuple data for each input stream.
Definition at line 118 of file LbmBitOpExecStream.h.
Referenced by comparePrefixes(), copyPrefix(), open(), LbmBitOpExecStream::open(), LbmBitOpExecStream::prepare(), and restartSubtrahends().
LbmSegmentWriter LbmBitOpExecStream::segmentWriter [protected, inherited] |
Segment writer.
Definition at line 123 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::addSegments(), LbmBitOpExecStream::flush(), LbmBitOpExecStream::open(), and LbmBitOpExecStream::producePendingOutput().
boost::scoped_array<FixedBuffer> LbmBitOpExecStream::outputBuf [protected, inherited] |
Buffer for writing output bitmap segment.
Definition at line 128 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::closeImpl(), and LbmBitOpExecStream::open().
boost::scoped_array<FixedBuffer> LbmBitOpExecStream::byteSegBuf [protected, inherited] |
Temporary buffer for bit operation.
Definition at line 133 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::closeImpl(), and LbmBitOpExecStream::open().
PBuffer LbmBitOpExecStream::pByteSegBuf [protected, inherited] |
Pointer to byteSegBuf.
Definition at line 138 of file LbmBitOpExecStream.h.
Referenced by execute(), LbmIntersectExecStream::intersectSegments(), minusSegments(), LbmBitOpExecStream::open(), and readMinuendInputAndFlush().
uint LbmBitOpExecStream::bitmapBufSize [protected, inherited] |
Amount of space available in buffer for bitmaps.
Definition at line 143 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::open(), LbmBitOpExecStream::readInput(), and readMinuendInput().
TupleData LbmBitOpExecStream::outputTuple [protected, inherited] |
Output tuple data containing AND'd bitmap segments.
Definition at line 148 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::addSegments(), LbmBitOpExecStream::flush(), and LbmBitOpExecStream::producePendingOutput().
bool LbmBitOpExecStream::producePending [protected, inherited] |
True if a tuple needs to be written to the output stream.
Definition at line 153 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::addSegments(), execute(), LbmIntersectExecStream::execute(), LbmBitOpExecStream::flush(), LbmBitOpExecStream::open(), and LbmBitOpExecStream::producePendingOutput().
LcsRid LbmBitOpExecStream::addRid [protected, inherited] |
Current rid value to be added to the bitmap segment.
Definition at line 158 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::addSegments(), execute(), and LbmIntersectExecStream::intersectSegments().
PBuffer LbmBitOpExecStream::addByteSeg [protected, inherited] |
Current byte segment to be added.
Definition at line 163 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::addSegments(), execute(), and LbmIntersectExecStream::intersectSegments().
uint LbmBitOpExecStream::addLen [protected, inherited] |
Current length of byte segment to be added.
Definition at line 168 of file LbmBitOpExecStream.h.
Referenced by LbmBitOpExecStream::addSegments(), execute(), and LbmIntersectExecStream::intersectSegments().
int LbmBitOpExecStream::nFields [protected, inherited] |
Number of non-bitmap fields preceding the bitmap fields.
Definition at line 173 of file LbmBitOpExecStream.h.
Referenced by checkNeedForRestart(), comparePrefixes(), copyPrefix(), execute(), open(), prepare(), LbmBitOpExecStream::prepare(), produceTuple(), readMinuendInputAndFlush(), and restartSubtrahends().
std::vector<SharedExecStreamBufAccessor> ConfluenceExecStream::inAccessors [protected, inherited] |
Definition at line 50 of file ConfluenceExecStream.h.
Referenced by NestedLoopJoinExecStream::checkNumInputs(), CartesianJoinExecStream::checkNumInputs(), comparePrefixes(), LbmGeneratorExecStream::execute(), MergeExecStream::execute(), BarrierExecStream::execute(), findMinInput(), LcsRowScanExecStream::initializeFiltersIfNeeded(), LcsRowScanExecStream::open(), LbmUnionExecStream::open(), open(), LbmGeneratorExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), ConfluenceExecStream::open(), LcsRowScanExecStream::prepare(), LbmUnionExecStream::prepare(), prepare(), LbmGeneratorExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConfluenceExecStream::prepare(), CartesianJoinExecStream::prepare(), BarrierExecStream::prepare(), NestedLoopJoinExecStream::preProcessRightInput(), BarrierExecStream::processInputTuple(), LbmBitOpExecStream::producePendingOutput(), restartSubtrahends(), LhxJoinExecStream::setHashInfo(), and ConfluenceExecStream::setInputBufAccessors().
SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited] |
Definition at line 56 of file SingleOutputExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().