#include <LcsRowScanExecStream.h>
Inheritance diagram for LcsRowScanExecStream:
Public Member Functions | |
LcsRowScanExecStream () | |
virtual void | prepare (LcsRowScanExecStreamParams const ¶ms) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | closeImpl () |
Implements ClosableObject. | |
virtual void | prepare (LcsRowScanBaseExecStreamParams const ¶ms) |
virtual void | prepare (ConfluenceExecStreamParams const ¶ms) |
virtual void | prepare (SingleOutputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual bool | canEarlyClose () |
| |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
Protected Member Functions | |
void | syncColumns (SharedLcsClusterReader &pScan) |
Positions column readers based on new cluster reader position. | |
bool | readColVals (SharedLcsClusterReader &pScan, TupleDataWithBuffer &tupleData, uint colStart) |
Reads column values based on current position of cluster reader. | |
Protected Attributes | |
VectorOfUint | projMap |
Projection map that maps columns read from cluster to their position in the output projection. | |
uint | nClusters |
Number of clusters to be scanned. | |
boost::scoped_array< SharedLcsClusterReader > | pClusters |
Array containing cluster readers. | |
TupleDescriptor | projDescriptor |
Tuple descriptor representing columns to be projected from scans. | |
std::vector< int > | nonClusterCols |
List of the non-cluster columns that need to be projected. | |
bool | allSpecial |
True in the special case where we are only reading special columns. | |
CircularBuffer< LcsRidRun > | ridRuns |
Circular buffer of rid runs. | |
std::vector< UnalignedAttributeAccessor > | attrAccessors |
Accessors used for loading actual column values. | |
std::vector< SharedExecStreamBufAccessor > | inAccessors |
SharedExecStreamBufAccessor | pOutAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
Private Member Functions | |
virtual void | buildOutputProj (TupleProjection &outputProj, LcsRowScanBaseExecStreamParams const ¶ms) |
Builds outputProj from params. | |
bool | initializeFiltersIfNeeded () |
initializes the filter data structures | |
void | prepareResidualFilters (LcsRowScanExecStreamParams const ¶ms) |
initializes the filter data structures during prepare time | |
void | initializeSystemSampling () |
Initializes the system sampling data structures during open time. | |
ExecStreamResult | fillRidRunBuffer () |
Populates the circular rid run buffer. | |
Private Attributes | |
TupleDataWithBuffer | outputTupleData |
Tuple data for all columns read from all clusters, including filter columns. | |
uint | iFilterToInitialize |
This variable is used to control the initialization of residual filters. | |
TupleData | projOutputTupleData |
TupleProjection | outputProj |
TupleData | ridTupleData |
Tuple data for input stream. | |
LbmRidReader | ridReader |
Rid reader. | |
RecordNum | nRidsRead |
Number of rids read. | |
LcsRid | inputRid |
Current rid read from the input stream. | |
LcsRid | nextRid |
Next rid that needs to be fetched. | |
bool | readDeletedRid |
True if need to read a new deleted rid from the input stream. | |
bool | deletedRidEos |
True if reached EOS on deleted rid input stream. | |
LcsRid | deletedRid |
Current deleted rid. | |
bool | tupleFound |
true if tuple has been read and not yet produced | |
bool | isFullScan |
true if executing full table scan | |
bool | hasExtraFilter |
true if there's extra range list filter(as the last input) | |
bool | producePending |
true if produceTuple pending | |
boost::scoped_array< LcsResidualColumnFilters * > | filters |
The local filter data structure. | |
int32_t | nFilters |
The number of residual column filters configured. | |
TableSamplingMode | samplingMode |
One of SAMPLING_OFF, SAMPLING_BERNOULLI or SAMPLING_SYSTEM. | |
float | samplingRate |
the sampling rate (0.0 to 1.0) | |
bool | isSamplingRepeatable |
true if the sample should be repeatable | |
int32_t | repeatableSeed |
seed for repeatable sampling | |
int32_t | samplingClumps |
number of clumps for system sampling | |
uint64_t | clumpSize |
size of each sampling clump | |
uint64_t | clumpDistance |
distance (in rows) between each clump | |
uint64_t | clumpPos |
position (0 to clumpSize) in current clump | |
uint64_t | clumpSkipPos |
position (clumpDistance to 0) in between clumps | |
uint | numClumps |
The number of clumps that need to be built. | |
uint | numClumpsBuilt |
Running counter of the number of clumps built. | |
boost::scoped_ptr< BernoulliRng > | samplingRng |
RNG for Bernoulli sampling. | |
int64_t | rowCount |
Number of rows in the table. | |
bool | ridRunsBuilt |
True if completed building rid runs. | |
LcsRidRun | currRidRun |
Current rid run being constructed. | |
CircularBufferIter< LcsRidRun > | ridRunIter |
Iterator over the circular buffer containing rid runs. |
The stream returns a projected subset of columns from the table
Definition at line 110 of file LcsRowScanExecStream.h.
LcsRowScanExecStream::LcsRowScanExecStream | ( | ) |
Definition at line 33 of file LcsRowScanExecStream.cpp.
References CircularBuffer< T >::resize(), and LcsRowScanBaseExecStream::ridRuns.
00034 : 00035 LcsRowScanBaseExecStream(), 00036 ridRunIter(&ridRuns) 00037 { 00038 ridRuns.resize(4000); 00039 }
void LcsRowScanExecStream::buildOutputProj | ( | TupleProjection & | outputProj, | |
LcsRowScanBaseExecStreamParams const & | params | |||
) | [private, virtual] |
Builds outputProj from params.
outputProj | the projection to be built | |
params | the LcsRowScanBaseExecStreamParams |
Reimplemented from LcsRowScanBaseExecStream.
Definition at line 681 of file LcsRowScanExecStream.cpp.
References outputProj, LcsRowScanBaseExecStreamParams::outputProj, and LcsRowScanExecStreamParams::residualFilterCols.
00684 { 00685 LcsRowScanExecStreamParams const &rowScanParams = 00686 dynamic_cast<const LcsRowScanExecStreamParams&>(params); 00687 00688 /* 00689 * Build a projection that contains filter columns 00690 */ 00691 for (uint i = 0; i < rowScanParams.outputProj.size(); i++) { 00692 outputProj.push_back(rowScanParams.outputProj[i]); 00693 } 00694 for (uint i = 0; i < rowScanParams.residualFilterCols.size(); i++) { 00695 uint j; 00696 for (j = 0; j < rowScanParams.outputProj.size(); j++) { 00697 if (rowScanParams.outputProj[j] == 00698 rowScanParams.residualFilterCols[i]) 00699 { 00700 break; 00701 } 00702 } 00703 00704 if (j >= rowScanParams.outputProj.size()) { 00705 outputProj.push_back(rowScanParams.residualFilterCols[i]); 00706 } 00707 } 00708 }
bool LcsRowScanExecStream::initializeFiltersIfNeeded | ( | ) | [private] |
initializes the filter data structures
Definition at line 256 of file LcsRowScanExecStream.cpp.
References EXECBUF_EOS, filters, FixedBuffer, TupleAccessor::getCurrentByteCount(), TupleAccessor::getCurrentTupleBuf(), iFilterToInitialize, ConfluenceExecStream::inAccessors, nFilters, TupleAccessor::setCurrentTupleBuf(), and TupleAccessor::unmarshal().
Referenced by execute().
00257 { 00258 /* 00259 * initialize the filters local data 00260 */ 00261 for (; iFilterToInitialize < nFilters; iFilterToInitialize++) { 00262 SharedExecStreamBufAccessor &pInAccessor = 00263 inAccessors[iFilterToInitialize + 1]; 00264 TupleAccessor &inputAccessor = 00265 pInAccessor->getConsumptionTupleAccessor(); 00266 00267 if (pInAccessor->getState() != EXECBUF_EOS) { 00268 PLcsResidualColumnFilters filter = filters[iFilterToInitialize]; 00269 00270 while (pInAccessor->demandData()) { 00271 SharedLcsResidualFilter filterData(new LcsResidualFilter); 00272 00273 pInAccessor->accessConsumptionTuple(); 00274 00275 /* 00276 * Build lower and upper bound data 00277 */ 00278 filterData->boundData.compute(pInAccessor->getTupleDesc()); 00279 filterData->boundBuf.reset( 00280 new FixedBuffer[inputAccessor.getCurrentByteCount()]); 00281 00282 memcpy( 00283 filterData->boundBuf.get(), 00284 pInAccessor->getConsumptionStart(), 00285 inputAccessor.getCurrentByteCount()); 00286 00287 /* 00288 * inputAccessor is used to unmarshal into boundData. 00289 * in order to do this, its current buffer is set to 00290 * boundBuf and restored. 00291 */ 00292 PConstBuffer tmpBuf; 00293 tmpBuf = inputAccessor.getCurrentTupleBuf(); 00294 inputAccessor.setCurrentTupleBuf(filterData->boundBuf.get()); 00295 inputAccessor.unmarshal(filterData->boundData); 00296 inputAccessor.setCurrentTupleBuf(tmpBuf); 00297 00298 /* 00299 * record directives. 00300 */ 00301 filterData->lowerBoundDirective = 00302 SearchEndpoint(*filterData->boundData[0].pData); 00303 filterData->upperBoundDirective = 00304 SearchEndpoint(*filterData->boundData[2].pData); 00305 00306 filter->filterData.push_back(filterData); 00307 00308 pInAccessor->consumeTuple(); 00309 } 00310 00311 if (pInAccessor->getState() != EXECBUF_EOS) { 00312 return false; 00313 } 00314 } 00315 filters[iFilterToInitialize]->filterDataInitialized = true; 00316 } 00317 return true; 00318 }
void LcsRowScanExecStream::prepareResidualFilters | ( | LcsRowScanExecStreamParams const & | params | ) | [private] |
initializes the filter data structures during prepare time
params | the LcsRowScanExecStreamParams |
Definition at line 41 of file LcsRowScanExecStream.cpp.
References LcsResidualColumnFilters::attrAccessor, UnalignedAttributeAccessor::compute(), TupleDataWithBuffer::computeAndAllocate(), filters, LcsResidualColumnFilters::hasResidualFilters, LcsResidualColumnFilters::inputKeyDesc, LcsRowScanBaseExecStreamParams::lcsClusterScanDefs, LcsResidualColumnFilters::lowerBoundProj, LcsRowScanBaseExecStream::nClusters, nFilters, LcsRowScanBaseExecStream::nonClusterCols, LcsRowScanBaseExecStreamParams::outputProj, LcsRowScanBaseExecStream::pClusters, LcsRowScanBaseExecStream::projDescriptor, TupleDescriptor::projectFrom(), LcsRowScanBaseExecStream::projMap, LcsResidualColumnFilters::readerKeyData, LcsResidualColumnFilters::readerKeyProj, LcsRowScanExecStreamParams::residualFilterCols, and LcsResidualColumnFilters::upperBoundProj.
Referenced by prepare().
00043 { 00044 nFilters = params.residualFilterCols.size(); 00045 00046 /* 00047 * compute the outputTupleData position of filter columns 00048 */ 00049 VectorOfUint valueCols; 00050 uint j, k = 0; 00051 for (uint i = 0; i < nFilters; i++) { 00052 for (j = 0; j < params.outputProj.size(); j++) { 00053 if (params.outputProj[j] == params.residualFilterCols[i]) { 00054 valueCols.push_back(j); 00055 break; 00056 } 00057 } 00058 00059 if (j >= params.outputProj.size()) { 00060 valueCols.push_back(params.outputProj.size() + k); 00061 k++; 00062 } 00063 } 00064 00065 /* 00066 * compute the cluster id and cluster position 00067 */ 00068 uint valueClus; 00069 uint clusterPos; 00070 uint clusterStart = 0; 00071 uint realClusterStart = 0; 00072 00073 filters.reset(new PLcsResidualColumnFilters[nFilters]); 00074 00075 for (uint i = 0; i < nClusters; i++) { 00076 uint clusterEnd = clusterStart + 00077 params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1; 00078 00079 for (uint j = 0; j < nFilters; j++) { 00080 if (params.residualFilterCols[j] >= clusterStart && 00081 params.residualFilterCols[j] <= clusterEnd) 00082 { 00083 valueClus = i; 00084 00085 /* 00086 * find the position within the cluster 00087 */ 00088 for (uint k = 0; k < projMap.size(); k++) { 00089 if (projMap[k] == valueCols[j]) { 00090 clusterPos = k - realClusterStart - 00091 nonClusterCols.size(); 00092 00093 LcsResidualColumnFilters &filter = 00094 pClusters[valueClus]-> 00095 clusterCols[clusterPos]. 00096 getFilters(); 00097 00098 filters[j] = &filter; 00099 00100 filter.hasResidualFilters = true; 00101 00102 filter.readerKeyProj.push_back(valueCols[j]); 00103 filter.inputKeyDesc.projectFrom( 00104 projDescriptor, 00105 filter.readerKeyProj); 00106 filter.attrAccessor.compute( 00107 filter.inputKeyDesc[0]); 00108 00109 filter.lowerBoundProj.push_back(1); 00110 filter.upperBoundProj.push_back(3); 00111 filter.readerKeyData.computeAndAllocate( 00112 filter.inputKeyDesc); 00113 00114 break; 00115 } 00116 } 00117 // Continue with the same cluster for more filters 00118 } 00119 } 00120 // Look for filters in the next cluster; modify cluster boundaries 00121 clusterStart = clusterEnd + 1; 00122 realClusterStart += pClusters[i]->nColsToRead; 00123 } 00124 }
void LcsRowScanExecStream::initializeSystemSampling | ( | ) | [private] |
Initializes the system sampling data structures during open time.
Definition at line 321 of file LcsRowScanExecStream.cpp.
References clumpDistance, clumpPos, clumpSize, clumpSkipPos, numClumps, rowCount, samplingClumps, samplingRate, and TRACE_FINE.
Referenced by open().
00322 { 00323 clumpPos = 0; 00324 clumpSkipPos = 0; 00325 00326 FENNEL_TRACE(TRACE_FINE, "rowCount = " << rowCount); 00327 FENNEL_TRACE( 00328 TRACE_FINE, "samplingRate = " << static_cast<double>(samplingRate)); 00329 00330 if (rowCount <= 0) { 00331 // Handle empty table or non-sense input. 00332 clumpSize = 1; 00333 clumpDistance = 0; 00334 numClumps = 0; 00335 return; 00336 } 00337 00338 // Manipulate this value in a separate member field so we don't 00339 // mistakenly modify our stored copy of the parameter. 00340 numClumps = samplingClumps; 00341 00342 // Compute clump size and distance 00343 int64_t sampleSize = 00344 static_cast<uint64_t>( 00345 round( 00346 static_cast<double>(rowCount) * 00347 static_cast<double>(samplingRate))); 00348 if (sampleSize < numClumps) { 00349 // Read at least as many rows as there are clumps, even if sample rate 00350 // is very small. 00351 sampleSize = numClumps; 00352 } 00353 00354 if (sampleSize > rowCount) { 00355 // samplingRate should be < 1.0, but handle the case where it isn't, 00356 // or where there are fewer rows than clumps. 00357 sampleSize = rowCount; 00358 numClumps = 1; 00359 } 00360 00361 FENNEL_TRACE(TRACE_FINE, "sampleSize = " << sampleSize); 00362 00363 clumpSize = 00364 static_cast<uint64_t>( 00365 round( 00366 static_cast<double>(sampleSize) / 00367 static_cast<double>(numClumps))); 00368 assert(sampleSize >= clumpSize); 00369 assert(clumpSize >= 1); 00370 00371 FENNEL_TRACE(TRACE_FINE, "clumpSize = " << clumpSize); 00372 00373 if (numClumps > 1) { 00374 // Arrange for the last clump to end at the end of the table. 00375 clumpDistance = 00376 static_cast<uint64_t>( 00377 round( 00378 static_cast<double>(rowCount - sampleSize) / 00379 static_cast<double>(numClumps - 1))); 00380 00381 // Rounding can cause us to push the final clump past the end of the 00382 // table. Avoid this when possible. 00383 uint64_t rowsRequired = 00384 (clumpSize + clumpDistance) * (numClumps - 1) + clumpSize; 00385 if (rowsRequired > rowCount && clumpDistance > 0) { 00386 clumpDistance--; 00387 } 00388 } else { 00389 // The entire sample will come from the beginning of the table. 00390 clumpDistance = (rowCount - sampleSize); 00391 } 00392 00393 FENNEL_TRACE(TRACE_FINE, "clumpDistance = " << clumpDistance); 00394 }
ExecStreamResult LcsRowScanExecStream::fillRidRunBuffer | ( | ) | [private] |
Populates the circular rid run buffer.
Definition at line 520 of file LcsRowScanExecStream.cpp.
References clumpDistance, clumpPos, clumpSize, clumpSkipPos, currRidRun, deletedRid, deletedRidEos, EXECRC_EOS, EXECRC_YIELD, inputRid, isFullScan, MAXU, LcsRidRun::nRids, numClumps, numClumpsBuilt, opaqueToInt(), CircularBuffer< T >::push_back(), readDeletedRid, LbmRidReaderBase::readRidAndAdvance(), ridReader, LcsRowScanBaseExecStream::ridRuns, ridRunsBuilt, rowCount, SAMPLING_OFF, SAMPLING_SYSTEM, samplingMode, samplingRng, CircularBuffer< T >::setReadOnly(), CircularBuffer< T >::spaceAvailable(), and LcsRidRun::startRid.
Referenced by execute().
00521 { 00522 ExecStreamResult rc; 00523 RecordNum nRows; 00524 00525 do { 00526 if (!isFullScan) { 00527 rc = ridReader.readRidAndAdvance(inputRid); 00528 if (rc == EXECRC_EOS) { 00529 ridRunsBuilt = true; 00530 break; 00531 } 00532 if (rc != EXECRC_YIELD) { 00533 return rc; 00534 } 00535 nRows = 1; 00536 00537 } else { 00538 if (!deletedRidEos && readDeletedRid) { 00539 rc = ridReader.readRidAndAdvance(deletedRid); 00540 if (rc == EXECRC_EOS) { 00541 deletedRidEos = true; 00542 if (samplingMode == SAMPLING_OFF) { 00543 ridRunsBuilt = true; 00544 } else if (samplingMode == SAMPLING_SYSTEM && 00545 numClumps == 0) 00546 { 00547 ridRunsBuilt = true; 00548 break; 00549 } 00550 } else if (rc != EXECRC_YIELD) { 00551 return rc; 00552 } else { 00553 readDeletedRid = false; 00554 } 00555 } 00556 // skip over deleted rids 00557 if (!deletedRidEos && inputRid == deletedRid) { 00558 inputRid++; 00559 readDeletedRid = true; 00560 continue; 00561 } else { 00562 if (deletedRidEos) { 00563 nRows = MAXU; 00564 } else { 00565 nRows = opaqueToInt(deletedRid - inputRid); 00566 } 00567 } 00568 } 00569 00570 if (samplingMode != SAMPLING_OFF) { 00571 if (samplingMode == SAMPLING_SYSTEM) { 00572 if (clumpSkipPos > 0) { 00573 // We need to skip clumpSkipPos RIDs, taking into 00574 // account deleted RIDs. If all deleted RIDs have been 00575 // processed (a), we can just skip forward to the next 00576 // clump. If we know the next deleted RID, skip to the 00577 // next clump if we can (b), else skip to the deleted 00578 // RID (c). Processing will return here to handle the 00579 // remaining clumpSkipPos rows when we reach the next 00580 // live RID. If we don't know the next deleted RID 00581 // (d), skip the current live RID, let the deleted RID 00582 // processing occur above and then processing will 00583 // return here to deal with the remaining clumpSkipPos 00584 // rows. 00585 if (deletedRidEos) { 00586 // (a) 00587 inputRid += clumpSkipPos; 00588 clumpSkipPos = 0; 00589 } else if (!readDeletedRid) { 00590 if (deletedRid > inputRid + clumpSkipPos) { 00591 // (b) 00592 inputRid += clumpSkipPos; 00593 clumpSkipPos = 0; 00594 nRows = opaqueToInt(deletedRid - inputRid); 00595 } else { 00596 // (c) 00597 clumpSkipPos -= opaqueToInt(deletedRid - inputRid); 00598 inputRid = deletedRid; 00599 continue; 00600 } 00601 } else { 00602 // (d) 00603 clumpSkipPos--; 00604 inputRid++; 00605 continue; 00606 } 00607 } 00608 00609 if (nRows >= clumpSize - clumpPos) { 00610 // Scale back the size of the rid run based on the 00611 // clump size 00612 nRows = clumpSize - clumpPos; 00613 clumpPos = 0; 00614 clumpSkipPos = clumpDistance; 00615 if (++numClumpsBuilt == numClumps) { 00616 ridRunsBuilt = true; 00617 } 00618 } else { 00619 // We only have enough rids for a partial clump 00620 clumpPos += nRows; 00621 } 00622 } else { 00623 // Bernoulli sampling 00624 if (opaqueToInt(inputRid) >= opaqueToInt(rowCount)) { 00625 ridRunsBuilt = true; 00626 break; 00627 } 00628 if (!samplingRng->nextValue()) { 00629 inputRid++; 00630 continue; 00631 } 00632 nRows = 1; 00633 } 00634 } 00635 00636 if (currRidRun.startRid == LcsRid(MAXU)) { 00637 currRidRun.startRid = inputRid; 00638 currRidRun.nRids = nRows; 00639 } else if (currRidRun.startRid + currRidRun.nRids == inputRid) { 00640 // If the next set of rids is contiguous with the previous, 00641 // continue adding on to the current run 00642 if (nRows == RecordNum(MAXU)) { 00643 currRidRun.nRids = MAXU; 00644 } else { 00645 currRidRun.nRids += nRows; 00646 } 00647 } else { 00648 // Otherwise, end the current one 00649 ridRuns.push_back(currRidRun); 00650 00651 // And start a new one 00652 currRidRun.startRid = inputRid; 00653 currRidRun.nRids = nRows; 00654 } 00655 00656 if (isFullScan) { 00657 inputRid += nRows; 00658 } 00659 } while (ridRuns.spaceAvailable() && !ridRunsBuilt); 00660 00661 // Write out the last run 00662 if (ridRunsBuilt && currRidRun.startRid != LcsRid(MAXU)) { 00663 ridRuns.push_back(currRidRun); 00664 } 00665 00666 if (ridRunsBuilt) { 00667 ridRuns.setReadOnly(); 00668 } 00669 return EXECRC_YIELD; 00670 }
void LcsRowScanExecStream::prepare | ( | LcsRowScanExecStreamParams const & | params | ) | [virtual] |
Definition at line 126 of file LcsRowScanExecStream.cpp.
References LcsRowScanBaseExecStream::attrAccessors, TupleData::compute(), TupleDataWithBuffer::computeAndAllocate(), LcsRowScanExecStreamParams::hasExtraFilter, hasExtraFilter, ConfluenceExecStream::inAccessors, LcsRowScanExecStreamParams::isFullScan, isFullScan, isSamplingRepeatable, nFilters, outputProj, LcsRowScanBaseExecStreamParams::outputProj, outputTupleData, SingleOutputExecStream::pOutAccessor, LcsRowScanBaseExecStream::prepare(), prepareResidualFilters(), LcsRowScanBaseExecStream::projDescriptor, projOutputTupleData, repeatableSeed, ridTupleData, rowCount, SAMPLING_BERNOULLI, SAMPLING_OFF, LcsRowScanExecStreamParams::samplingClumps, samplingClumps, LcsRowScanExecStreamParams::samplingIsRepeatable, LcsRowScanExecStreamParams::samplingMode, samplingMode, LcsRowScanExecStreamParams::samplingRate, samplingRate, LcsRowScanExecStreamParams::samplingRepeatableSeed, samplingRng, LcsRowScanExecStreamParams::samplingRowCount, and STANDARD_TYPE_RECORDNUM.
00127 { 00128 LcsRowScanBaseExecStream::prepare(params); 00129 00130 isFullScan = params.isFullScan; 00131 hasExtraFilter = params.hasExtraFilter; 00132 00133 // Set up rid bitmap input stream 00134 ridTupleData.compute(inAccessors[0]->getTupleDesc()); 00135 00136 // validate input stream parameters 00137 TupleDescriptor inputDesc = inAccessors[0]->getTupleDesc(); 00138 assert(inputDesc.size() == 3); 00139 StandardTypeDescriptorFactory stdTypeFactory; 00140 TupleAttributeDescriptor expectedRidDesc( 00141 stdTypeFactory.newDataType(STANDARD_TYPE_RECORDNUM)); 00142 assert(inputDesc[0] == expectedRidDesc); 00143 00144 assert(hasExtraFilter == (inAccessors.size() > 1)); 00145 00146 if (hasExtraFilter) { 00147 prepareResidualFilters(params); 00148 } else { 00149 nFilters = 0; 00150 } 00151 00152 /* 00153 * projDescriptor now also includes filter columns 00154 */ 00155 for (uint i = 0; i < params.outputProj.size(); i++) { 00156 outputProj.push_back(i); 00157 } 00158 00159 pOutAccessor->setTupleShape(pOutAccessor->getTupleDesc()); 00160 outputTupleData.computeAndAllocate(projDescriptor); 00161 00162 /* 00163 * build the real output accessor 00164 * it will be used to unmarshal data into the 00165 * real output row: projOutputTuple. 00166 */ 00167 projOutputTupleData.compute(pOutAccessor->getTupleDesc()); 00168 00169 attrAccessors.resize(projDescriptor.size()); 00170 for (uint i = 0; i < projDescriptor.size(); ++i) { 00171 attrAccessors[i].compute(projDescriptor[i]); 00172 } 00173 00174 /* configure sampling */ 00175 samplingMode = params.samplingMode; 00176 00177 if (samplingMode != SAMPLING_OFF) { 00178 samplingRate = params.samplingRate; 00179 rowCount = params.samplingRowCount; 00180 00181 if (samplingMode == SAMPLING_BERNOULLI) { 00182 isSamplingRepeatable = params.samplingIsRepeatable; 00183 repeatableSeed = params.samplingRepeatableSeed; 00184 samplingClumps = -1; 00185 00186 samplingRng.reset(new BernoulliRng(samplingRate)); 00187 } else { 00188 assert(isFullScan); 00189 00190 samplingClumps = params.samplingClumps; 00191 assert(samplingClumps > 0); 00192 00193 isSamplingRepeatable = false; 00194 } 00195 } 00196 }
void LcsRowScanExecStream::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from LcsRowScanBaseExecStream.
Definition at line 198 of file LcsRowScanExecStream.cpp.
References CircularBuffer< T >::clear(), clumpDistance, clumpPos, clumpSize, currRidRun, deletedRidEos, filters, iFilterToInitialize, ConfluenceExecStream::inAccessors, LbmRidReader::init(), initializeSystemSampling(), inputRid, isFullScan, isSamplingRepeatable, MAXU, nextRid, nFilters, LcsRidRun::nRids, nRidsRead, numClumpsBuilt, LcsRowScanBaseExecStream::open(), producePending, readDeletedRid, repeatableSeed, CircularBufferIter< T >::reset(), ridReader, ridRunIter, LcsRowScanBaseExecStream::ridRuns, ridRunsBuilt, ridTupleData, SAMPLING_BERNOULLI, SAMPLING_SYSTEM, samplingMode, samplingRng, LcsRidRun::startRid, and tupleFound.
00199 { 00200 LcsRowScanBaseExecStream::open(restart); 00201 producePending = false; 00202 tupleFound = false; 00203 nRidsRead = 0; 00204 ridRunsBuilt = false; 00205 currRidRun.startRid = LcsRid(MAXU); 00206 currRidRun.nRids = 0; 00207 ridRuns.clear(); 00208 ridRunIter.reset(); 00209 00210 if (isFullScan) { 00211 inputRid = LcsRid(0); 00212 readDeletedRid = true; 00213 deletedRidEos = false; 00214 } 00215 nextRid = LcsRid(0); 00216 ridReader.init(inAccessors[0], ridTupleData); 00217 00218 /* 00219 * Read from the 1st input, but only if we're not doing a restart. 00220 * Restarts can reuse the structures set up on the initial open 00221 * because the current assumption is that the residual filter 00222 * values don't change in between restarts. If on restart, if a filter 00223 * wasn't completely initialized, then reinitialize it. 00224 */ 00225 if (!restart) { 00226 iFilterToInitialize = 0; 00227 } else if (iFilterToInitialize < nFilters) { 00228 if (!filters[iFilterToInitialize]->filterDataInitialized) { 00229 filters[iFilterToInitialize]->filterData.clear(); 00230 } 00231 } 00232 00233 if (samplingMode == SAMPLING_BERNOULLI) { 00234 if (isSamplingRepeatable) { 00235 samplingRng->reseed(repeatableSeed); 00236 } else if (!restart) { 00237 samplingRng->reseed(static_cast<uint32_t>(time(0))); 00238 } 00239 } else if (samplingMode == SAMPLING_SYSTEM) { 00240 clumpSize = 0; 00241 clumpDistance = 0; 00242 clumpPos = 0; 00243 numClumpsBuilt = 0; 00244 00245 initializeSystemSampling(); 00246 } 00247 }
ExecStreamResult LcsRowScanExecStream::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 397 of file LcsRowScanExecStream.cpp.
References CircularBufferIter< T >::done(), EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, fillRidRunBuffer(), CircularBufferIter< T >::getCurrPos(), LcsClusterReader::getFetchRids(), initializeFiltersIfNeeded(), isFullScan, LCS_RID_COLUMN_ID, MAXU, LcsRowScanBaseExecStream::nClusters, nextRid, CircularBuffer< T >::nFreeSpace(), LcsRowScanBaseExecStream::nonClusterCols, nRidsRead, ExecStreamQuantum::nTuplesMax, opaqueToInt(), outputProj, outputTupleData, LcsRowScanBaseExecStream::pClusters, SingleOutputExecStream::pOutAccessor, producePending, TupleData::projectFrom(), LcsRowScanBaseExecStream::projMap, projOutputTupleData, LcsRowScanBaseExecStream::readColVals(), TupleDataWithBuffer::resetBuffer(), ridRunIter, LcsRowScanBaseExecStream::ridRuns, ridRunsBuilt, LcsRowScanBaseExecStream::syncColumns(), and tupleFound.
00398 { 00399 if (!initializeFiltersIfNeeded()) { 00400 return EXECRC_BUF_UNDERFLOW; 00401 } 00402 00403 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00404 uint iClu; 00405 bool passedFilter; 00406 00407 while (!producePending) { 00408 // No need to fill the rid run buffer each time through the loop 00409 if (!ridRunsBuilt && ridRuns.nFreeSpace() > 100) { 00410 ExecStreamResult rc = fillRidRunBuffer(); 00411 if (rc != EXECRC_YIELD) { 00412 return rc; 00413 } 00414 } 00415 00416 // Determine the rid that needs to be fetched based on the 00417 // contents of the rid run buffer. 00418 LcsRid rid = 00419 LcsClusterReader::getFetchRids(ridRunIter, nextRid, true); 00420 if (rid == LcsRid(MAXU)) { 00421 assert(ridRunIter.done()); 00422 pOutAccessor->markEOS(); 00423 return EXECRC_EOS; 00424 } 00425 00426 uint prevClusterEnd = 0; 00427 // reset datum pointers, in case previous tuple had nulls 00428 outputTupleData.resetBuffer(); 00429 00430 // Read the non-cluster columns first 00431 for (uint j = 0; j < nonClusterCols.size(); j++) { 00432 if (nonClusterCols[j] == LCS_RID_COLUMN_ID) { 00433 memcpy( 00434 const_cast<PBuffer>(outputTupleData[projMap[j]].pData), 00435 (PBuffer) &rid, sizeof(LcsRid)); 00436 prevClusterEnd++; 00437 } else { 00438 permAssert(false); 00439 } 00440 } 00441 00442 // Then go through each cluster, forming rows and checking ranges 00443 for (iClu = 0, passedFilter = true; iClu < nClusters; iClu++) { 00444 SharedLcsClusterReader &pScan = pClusters[iClu]; 00445 00446 // Resync the cluster reader to the current rid position 00447 pScan->catchUp(ridRunIter.getCurrPos(), nextRid); 00448 00449 // if we have not read a batch yet or we've reached the 00450 // end of a batch, position to the rid we want to read 00451 00452 if (!pScan->isPositioned() || rid >= pScan->getRangeEndRid()) { 00453 bool rc = pScan->position(rid); 00454 00455 // rid not found, so just consume the rid and 00456 // continue 00457 if (rc == false) 00458 break; 00459 00460 assert(rid >= pScan->getRangeStartRid() 00461 && rid < pScan->getRangeEndRid()); 00462 00463 // Tell all column scans that the batch has changed. 00464 syncColumns(pScan); 00465 } else { 00466 // Should not have moved into previous batch. 00467 assert(rid > pScan->getRangeStartRid()); 00468 00469 // move to correct position within scan; we know we 00470 // will not fall off end of batch, so use non-checking 00471 // function (for speed) 00472 pScan->advanceWithinBatch( 00473 opaqueToInt(rid - pScan->getCurrentRid())); 00474 } 00475 00476 passedFilter = 00477 readColVals( 00478 pScan, 00479 outputTupleData, 00480 prevClusterEnd); 00481 if (!passedFilter) { 00482 break; 00483 } 00484 prevClusterEnd += pScan->nColsToRead; 00485 } 00486 00487 if (!passedFilter) { 00488 continue; 00489 } 00490 if (iClu == nClusters) { 00491 tupleFound = true; 00492 } 00493 producePending = true; 00494 } 00495 00496 // produce tuple 00497 projOutputTupleData.projectFrom(outputTupleData, outputProj); 00498 if (tupleFound) { 00499 if (!pOutAccessor->produceTuple(projOutputTupleData)) { 00500 return EXECRC_BUF_OVERFLOW; 00501 } 00502 } 00503 producePending = false; 00504 00505 if (isFullScan) { 00506 // if tuple not found, reached end of table 00507 if (!tupleFound) { 00508 pOutAccessor->markEOS(); 00509 return EXECRC_EOS; 00510 } 00511 } 00512 00513 tupleFound = false; 00514 nRidsRead++; 00515 } 00516 00517 return EXECRC_QUANTUM_EXPIRED; 00518 }
void LcsRowScanExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual] |
Reimplemented from LcsRowScanBaseExecStream.
Definition at line 249 of file LcsRowScanExecStream.cpp.
References LcsRowScanBaseExecStream::getResourceRequirements().
00252 { 00253 LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity); 00254 }
void LcsRowScanExecStream::closeImpl | ( | ) | [virtual] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from LcsRowScanBaseExecStream.
Definition at line 672 of file LcsRowScanExecStream.cpp.
References LcsRowScanBaseExecStream::closeImpl(), filters, and nFilters.
00673 { 00674 LcsRowScanBaseExecStream::closeImpl(); 00675 00676 for (uint i = 0; i < nFilters; i++) { 00677 filters[i]->filterData.clear(); 00678 } 00679 }
void LcsRowScanBaseExecStream::syncColumns | ( | SharedLcsClusterReader & | pScan | ) | [protected, inherited] |
Positions column readers based on new cluster reader position.
pScan | cluster reader |
Definition at line 172 of file LcsRowScanBaseExecStream.cpp.
Referenced by LbmGeneratorExecStream::advanceReader(), execute(), LbmGeneratorExecStream::execute(), and LbmGeneratorExecStream::generateMultiKeyBitmaps().
00173 { 00174 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) { 00175 pScan->clusterCols[iCluCol].sync(); 00176 } 00177 }
bool LcsRowScanBaseExecStream::readColVals | ( | SharedLcsClusterReader & | pScan, | |
TupleDataWithBuffer & | tupleData, | |||
uint | colStart | |||
) | [protected, inherited] |
Reads column values based on current position of cluster reader.
pScan | cluster reader | |
tupleData | tupledata where data will be loaded | |
colStart | starting column offset where first column will be loaded |
Definition at line 179 of file LcsRowScanBaseExecStream.cpp.
References LcsRowScanBaseExecStream::allSpecial, LcsRowScanBaseExecStream::attrAccessors, LcsRowScanBaseExecStream::projDescriptor, and LcsRowScanBaseExecStream::projMap.
Referenced by execute(), and LbmGeneratorExecStream::generateMultiKeyBitmaps().
00183 { 00184 if (!allSpecial) { 00185 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) { 00186 // Get value of each column and load it to the appropriate 00187 // tuple datum entry 00188 PBuffer curValue = pScan->clusterCols[iCluCol].getCurrentValue(); 00189 uint idx = projMap[colStart + iCluCol]; 00190 00191 attrAccessors[idx].loadValue(tupleData[idx], curValue); 00192 if (pScan->clusterCols[iCluCol].getFilters().hasResidualFilters) { 00193 if (!pScan->clusterCols[iCluCol].applyFilters( 00194 projDescriptor, 00195 tupleData)) 00196 { 00197 return false; 00198 } 00199 } 00200 } 00201 } 00202 return true; 00203 }
void LcsRowScanBaseExecStream::prepare | ( | LcsRowScanBaseExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 33 of file LcsRowScanBaseExecStream.cpp.
References LcsRowScanBaseExecStream::allSpecial, LcsRowScanBaseExecStream::buildOutputProj(), BTreeParams::keyProj, BTreeDescriptor::keyProjection, LCS_RID_COLUMN_ID, LcsRowScanBaseExecStreamParams::lcsClusterScanDefs, LcsRowScanBaseExecStream::nClusters, LcsRowScanBaseExecStream::nonClusterCols, LcsRowScanBaseExecStreamParams::outputProj, BTreeParams::pageOwnerId, BTreeDescriptor::pageOwnerId, ExecStreamParams::pCacheAccessor, SegmentAccessor::pCacheAccessor, LcsRowScanBaseExecStream::pClusters, SingleOutputExecStream::pOutAccessor, ConfluenceExecStream::prepare(), LcsRowScanBaseExecStream::projDescriptor, LcsRowScanBaseExecStream::projMap, BTreeParams::pSegment, SegmentAccessor::pSegment, LcsRowScanBaseExecStream::ridRuns, BTreeParams::rootPageId, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeParams::segmentId, BTreeDescriptor::segmentId, BTreeParams::tupleDesc, and BTreeDescriptor::tupleDescriptor.
Referenced by prepare(), and LbmGeneratorExecStream::prepare().
00035 { 00036 ConfluenceExecStream::prepare(params); 00037 00038 // Copy cluster definition parameters and setup btree readers for each 00039 // cluster. Also, setup the full output tuple based on the ordered 00040 // list of cluster descriptors. 00041 00042 nClusters = params.lcsClusterScanDefs.size(); 00043 pClusters.reset(new SharedLcsClusterReader[nClusters]); 00044 00045 uint clusterStart = 0; 00046 uint projCount = 0; 00047 TupleDescriptor allClusterTupleDesc; 00048 TupleProjection newProj, outputProj; 00049 00050 buildOutputProj(outputProj, params); 00051 00052 newProj.resize(outputProj.size()); 00053 00054 // if we're projecting non-cluster columns, keep track of them separately; 00055 TupleDescriptor outputTupleDesc = pOutAccessor->getTupleDesc(); 00056 for (uint i = 0; i < outputProj.size(); i++) { 00057 if (outputProj[i] == LCS_RID_COLUMN_ID) { 00058 newProj[i] = projCount++; 00059 allClusterTupleDesc.push_back(outputTupleDesc[i]); 00060 nonClusterCols.push_back(params.outputProj[i]); 00061 } 00062 } 00063 00064 allSpecial = (nonClusterCols.size() == newProj.size()); 00065 00066 for (uint i = 0; i < nClusters; i++) { 00067 SharedLcsClusterReader &pClu = pClusters[i]; 00068 00069 BTreeExecStreamParams const &bTreeParams = params.lcsClusterScanDefs[i]; 00070 00071 BTreeDescriptor treeDescriptor; 00072 treeDescriptor.segmentAccessor.pSegment = bTreeParams.pSegment; 00073 treeDescriptor.segmentAccessor.pCacheAccessor = 00074 bTreeParams.pCacheAccessor; 00075 treeDescriptor.tupleDescriptor = bTreeParams.tupleDesc; 00076 treeDescriptor.keyProjection = bTreeParams.keyProj; 00077 treeDescriptor.rootPageId = bTreeParams.rootPageId; 00078 treeDescriptor.segmentId = bTreeParams.segmentId; 00079 treeDescriptor.pageOwnerId = bTreeParams.pageOwnerId; 00080 00081 pClu = 00082 SharedLcsClusterReader( 00083 new LcsClusterReader(treeDescriptor, &ridRuns)); 00084 00085 // setup the cluster and column readers to only read the columns 00086 // that are going to be projected 00087 uint clusterEnd = clusterStart + 00088 params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1; 00089 00090 // create a vector of the columns that are projected from 00091 // this cluster and recompute the projection list 00092 // based on the individual cluster projections 00093 TupleProjection clusterProj; 00094 for (uint j = 0; j < newProj.size(); j++) { 00095 if (outputProj[j] >= clusterStart && 00096 outputProj[j] <= clusterEnd) 00097 { 00098 clusterProj.push_back(outputProj[j] - clusterStart); 00099 newProj[j] = projCount++; 00100 } 00101 } 00102 clusterStart = clusterEnd + 1; 00103 00104 // need to select at least one column from cluster, except in the 00105 // cases where we're only selecting special columns or when there 00106 // are filter columns; in the former case, we'll just arbitrarily 00107 // read the first column, but not actually project it 00108 if (allSpecial) { 00109 clusterProj.push_back(0); 00110 } 00111 pClu->initColumnReaders( 00112 params.lcsClusterScanDefs[i].clusterTupleDesc.size(), 00113 clusterProj); 00114 if (!allSpecial) { 00115 for (uint j = 0; j < pClu->nColsToRead; j++) { 00116 allClusterTupleDesc.push_back( 00117 params.lcsClusterScanDefs[i]. 00118 clusterTupleDesc[clusterProj[j]]); 00119 } 00120 } 00121 } 00122 00123 // setup projected tuple descriptor, by reshuffling allClusterTupleDesc 00124 // built above, into the correct projection order 00125 00126 for (uint i = 0; i < newProj.size(); i++) { 00127 projDescriptor.push_back(allClusterTupleDesc[newProj[i]]); 00128 } 00129 00130 // create a projection map to map cluster data read to the output 00131 // projection 00132 projMap.resize(newProj.size()); 00133 for (uint i = 0; i < projMap.size(); i++) { 00134 for (uint j = 0; j < newProj.size(); j++) { 00135 if (newProj[j] == i) { 00136 projMap[i] = j; 00137 } 00138 } 00139 } 00140 }
void ConfluenceExecStream::prepare | ( | ConfluenceExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 37 of file ConfluenceExecStream.cpp.
References ConfluenceExecStream::getInputBufProvision(), ConfluenceExecStream::inAccessors, and SingleOutputExecStream::prepare().
Referenced by LcsRowScanBaseExecStream::prepare(), LbmUnionExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), CartesianJoinExecStream::prepare(), and BarrierExecStream::prepare().
00038 { 00039 SingleOutputExecStream::prepare(params); 00040 00041 for (uint i = 0; i < inAccessors.size(); ++i) { 00042 assert(inAccessors[i]->getProvision() == getInputBufProvision()); 00043 } 00044 }
void SingleOutputExecStream::prepare | ( | SingleOutputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 48 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().
Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().
00049 { 00050 ExecStream::prepare(params); 00051 assert(pOutAccessor); 00052 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00053 if (pOutAccessor->getTupleDesc().empty()) { 00054 assert(!params.outputTupleDesc.empty()); 00055 pOutAccessor->setTupleShape( 00056 params.outputTupleDesc, 00057 params.outputTupleFormat); 00058 } 00059 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual, inherited] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 93 of file ExecStream.cpp.
References EXEC_RESOURCE_ACCURATE.
Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().
00097 { 00098 getResourceRequirements(minQuantity, optQuantity); 00099 optType = EXEC_RESOURCE_ACCURATE; 00100 }
void ConfluenceExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Reimplemented from SingleOutputExecStream.
Definition at line 31 of file ConfluenceExecStream.cpp.
References ConfluenceExecStream::inAccessors.
00033 { 00034 inAccessors = inAccessorsInit; 00035 }
ExecStreamBufProvision ConfluenceExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from ExecStream.
Definition at line 58 of file ConfluenceExecStream.cpp.
References BUFPROV_PRODUCER.
Referenced by ConfluenceExecStream::prepare().
00059 { 00060 return BUFPROV_PRODUCER; 00061 }
void SingleOutputExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Implements ExecStream.
Reimplemented in ConduitExecStream.
Definition at line 41 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::pOutAccessor.
Referenced by ConduitExecStream::setOutputBufAccessors().
00043 { 00044 assert(outAccessors.size() == 1); 00045 pOutAccessor = outAccessors[0]; 00046 }
ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented from ExecStream.
Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.
Definition at line 69 of file SingleOutputExecStream.cpp.
References BUFPROV_CONSUMER.
Referenced by SingleOutputExecStream::prepare().
00070 { 00071 return BUFPROV_CONSUMER; 00072 }
bool ExecStream::canEarlyClose | ( | ) | [virtual, inherited] |
Reimplemented in SegBufferWriterExecStream.
Definition at line 49 of file ExecStream.cpp.
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual, inherited] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 111 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.
Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().
00113 { 00114 resourceAllocation = quantity; 00115 if (pQuotaAccessor) { 00116 pQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00117 } 00118 if (pScratchQuotaAccessor) { 00119 pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00120 } 00121 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
Tuple data for all columns read from all clusters, including filter columns.
Definition at line 117 of file LcsRowScanExecStream.h.
This variable is used to control the initialization of residual filters.
It's 1 less than the index of the first filtering input to read. After open, it's initializaed to 0. On execute, the filtering inputs are read sequentially, while this variable is incremented, until an underflow or all filtering inputs have been read. On return due to an underflow, this variable allows reading to resume where it had left off.
Definition at line 129 of file LcsRowScanExecStream.h.
Referenced by initializeFiltersIfNeeded(), and open().
Definition at line 139 of file LcsRowScanExecStream.h.
Referenced by buildOutputProj(), execute(), and prepare().
TupleData LcsRowScanExecStream::ridTupleData [private] |
LbmRidReader LcsRowScanExecStream::ridReader [private] |
Rid reader.
Definition at line 149 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and open().
RecordNum LcsRowScanExecStream::nRidsRead [private] |
LcsRid LcsRowScanExecStream::inputRid [private] |
Current rid read from the input stream.
Definition at line 159 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and open().
LcsRid LcsRowScanExecStream::nextRid [private] |
bool LcsRowScanExecStream::readDeletedRid [private] |
True if need to read a new deleted rid from the input stream.
Definition at line 169 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and open().
bool LcsRowScanExecStream::deletedRidEos [private] |
True if reached EOS on deleted rid input stream.
Definition at line 174 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and open().
LcsRid LcsRowScanExecStream::deletedRid [private] |
Current deleted rid.
Definition at line 179 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer().
bool LcsRowScanExecStream::tupleFound [private] |
true if tuple has been read and not yet produced
Definition at line 184 of file LcsRowScanExecStream.h.
bool LcsRowScanExecStream::isFullScan [private] |
true if executing full table scan
Definition at line 189 of file LcsRowScanExecStream.h.
Referenced by execute(), fillRidRunBuffer(), open(), and prepare().
bool LcsRowScanExecStream::hasExtraFilter [private] |
true if there's extra range list filter(as the last input)
Definition at line 194 of file LcsRowScanExecStream.h.
Referenced by prepare().
bool LcsRowScanExecStream::producePending [private] |
boost::scoped_array<LcsResidualColumnFilters *> LcsRowScanExecStream::filters [private] |
The local filter data structure.
Note that these are aliasing pointers to facilitate filter data initialization and memory deallocation.
Definition at line 207 of file LcsRowScanExecStream.h.
Referenced by closeImpl(), initializeFiltersIfNeeded(), open(), and prepareResidualFilters().
int32_t LcsRowScanExecStream::nFilters [private] |
The number of residual column filters configured.
Definition at line 212 of file LcsRowScanExecStream.h.
Referenced by closeImpl(), initializeFiltersIfNeeded(), open(), prepare(), and prepareResidualFilters().
One of SAMPLING_OFF, SAMPLING_BERNOULLI or SAMPLING_SYSTEM.
Definition at line 217 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), open(), and prepare().
float LcsRowScanExecStream::samplingRate [private] |
the sampling rate (0.0 to 1.0)
Definition at line 222 of file LcsRowScanExecStream.h.
Referenced by initializeSystemSampling(), and prepare().
bool LcsRowScanExecStream::isSamplingRepeatable [private] |
int32_t LcsRowScanExecStream::repeatableSeed [private] |
int32_t LcsRowScanExecStream::samplingClumps [private] |
number of clumps for system sampling
Definition at line 237 of file LcsRowScanExecStream.h.
Referenced by initializeSystemSampling(), and prepare().
uint64_t LcsRowScanExecStream::clumpSize [private] |
size of each sampling clump
Definition at line 242 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), initializeSystemSampling(), and open().
uint64_t LcsRowScanExecStream::clumpDistance [private] |
distance (in rows) between each clump
Definition at line 247 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), initializeSystemSampling(), and open().
uint64_t LcsRowScanExecStream::clumpPos [private] |
position (0 to clumpSize) in current clump
Definition at line 252 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), initializeSystemSampling(), and open().
uint64_t LcsRowScanExecStream::clumpSkipPos [private] |
position (clumpDistance to 0) in between clumps
Definition at line 257 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and initializeSystemSampling().
uint LcsRowScanExecStream::numClumps [private] |
The number of clumps that need to be built.
Definition at line 262 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and initializeSystemSampling().
uint LcsRowScanExecStream::numClumpsBuilt [private] |
Running counter of the number of clumps built.
Definition at line 267 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and open().
boost::scoped_ptr<BernoulliRng> LcsRowScanExecStream::samplingRng [private] |
RNG for Bernoulli sampling.
Definition at line 272 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), open(), and prepare().
int64_t LcsRowScanExecStream::rowCount [private] |
Number of rows in the table.
Used only for sampling. In the case of Bernoulli sampling, includes count of deleted rows.
Definition at line 278 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), initializeSystemSampling(), and prepare().
bool LcsRowScanExecStream::ridRunsBuilt [private] |
True if completed building rid runs.
Definition at line 283 of file LcsRowScanExecStream.h.
Referenced by execute(), fillRidRunBuffer(), and open().
LcsRidRun LcsRowScanExecStream::currRidRun [private] |
Current rid run being constructed.
Definition at line 288 of file LcsRowScanExecStream.h.
Referenced by fillRidRunBuffer(), and open().
Iterator over the circular buffer containing rid runs.
Definition at line 293 of file LcsRowScanExecStream.h.
VectorOfUint LcsRowScanBaseExecStream::projMap [protected, inherited] |
Projection map that maps columns read from cluster to their position in the output projection.
Definition at line 77 of file LcsRowScanBaseExecStream.h.
Referenced by execute(), LbmGeneratorExecStream::generateSingletons(), LcsRowScanBaseExecStream::prepare(), prepareResidualFilters(), and LcsRowScanBaseExecStream::readColVals().
uint LcsRowScanBaseExecStream::nClusters [protected, inherited] |
Number of clusters to be scanned.
Definition at line 82 of file LcsRowScanBaseExecStream.h.
Referenced by LcsRowScanBaseExecStream::closeImpl(), execute(), LbmGeneratorExecStream::execute(), LbmGeneratorExecStream::generateMultiKeyBitmaps(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsRowScanBaseExecStream::LcsRowScanBaseExecStream(), LcsRowScanBaseExecStream::open(), LbmGeneratorExecStream::open(), LcsRowScanBaseExecStream::prepare(), LbmGeneratorExecStream::prepare(), and prepareResidualFilters().
boost::scoped_array<SharedLcsClusterReader> LcsRowScanBaseExecStream::pClusters [protected, inherited] |
Array containing cluster readers.
Definition at line 87 of file LcsRowScanBaseExecStream.h.
Referenced by LcsRowScanBaseExecStream::closeImpl(), execute(), LbmGeneratorExecStream::execute(), LbmGeneratorExecStream::generateBitmaps(), LbmGeneratorExecStream::generateMultiKeyBitmaps(), LbmGeneratorExecStream::generateSingleKeyBitmaps(), LbmGeneratorExecStream::generateSingletons(), LcsRowScanBaseExecStream::open(), LbmGeneratorExecStream::open(), LcsRowScanBaseExecStream::prepare(), and prepareResidualFilters().
TupleDescriptor LcsRowScanBaseExecStream::projDescriptor [protected, inherited] |
Tuple descriptor representing columns to be projected from scans.
Definition at line 92 of file LcsRowScanBaseExecStream.h.
Referenced by prepare(), LcsRowScanBaseExecStream::prepare(), prepareResidualFilters(), and LcsRowScanBaseExecStream::readColVals().
std::vector<int> LcsRowScanBaseExecStream::nonClusterCols [protected, inherited] |
List of the non-cluster columns that need to be projected.
Definition at line 97 of file LcsRowScanBaseExecStream.h.
Referenced by execute(), LcsRowScanBaseExecStream::prepare(), and prepareResidualFilters().
bool LcsRowScanBaseExecStream::allSpecial [protected, inherited] |
True in the special case where we are only reading special columns.
I.e., we don't actually have to read the underlying cluster data.
Definition at line 103 of file LcsRowScanBaseExecStream.h.
Referenced by LcsRowScanBaseExecStream::prepare(), and LcsRowScanBaseExecStream::readColVals().
CircularBuffer<LcsRidRun> LcsRowScanBaseExecStream::ridRuns [protected, inherited] |
Circular buffer of rid runs.
Definition at line 108 of file LcsRowScanBaseExecStream.h.
Referenced by execute(), LbmGeneratorExecStream::execute(), fillRidRunBuffer(), LcsRowScanExecStream(), open(), LbmGeneratorExecStream::open(), LcsRowScanBaseExecStream::prepare(), and LbmGeneratorExecStream::prepare().
std::vector<UnalignedAttributeAccessor> LcsRowScanBaseExecStream::attrAccessors [protected, inherited] |
Accessors used for loading actual column values.
Definition at line 120 of file LcsRowScanBaseExecStream.h.
Referenced by LbmGeneratorExecStream::generateBitmaps(), LbmGeneratorExecStream::generateSingletons(), prepare(), LbmGeneratorExecStream::prepare(), and LcsRowScanBaseExecStream::readColVals().
std::vector<SharedExecStreamBufAccessor> ConfluenceExecStream::inAccessors [protected, inherited] |
Definition at line 50 of file ConfluenceExecStream.h.
Referenced by NestedLoopJoinExecStream::checkNumInputs(), CartesianJoinExecStream::checkNumInputs(), LbmMinusExecStream::comparePrefixes(), LbmGeneratorExecStream::execute(), MergeExecStream::execute(), BarrierExecStream::execute(), LbmMinusExecStream::findMinInput(), initializeFiltersIfNeeded(), open(), LbmUnionExecStream::open(), LbmMinusExecStream::open(), LbmGeneratorExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), ConfluenceExecStream::open(), prepare(), LbmUnionExecStream::prepare(), LbmMinusExecStream::prepare(), LbmGeneratorExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConfluenceExecStream::prepare(), CartesianJoinExecStream::prepare(), BarrierExecStream::prepare(), NestedLoopJoinExecStream::preProcessRightInput(), BarrierExecStream::processInputTuple(), LbmBitOpExecStream::producePendingOutput(), LbmMinusExecStream::restartSubtrahends(), LhxJoinExecStream::setHashInfo(), and ConfluenceExecStream::setInputBufAccessors().
SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited] |
Definition at line 56 of file SingleOutputExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().