LcsRowScanExecStream Class Reference

Given a stream of RIDs, performs a table scan for those RIDs using the appropriate clustered indexes defined on the table. More...

#include <LcsRowScanExecStream.h>

Inheritance diagram for LcsRowScanExecStream:

LcsRowScanBaseExecStream ConfluenceExecStream SingleOutputExecStream ExecStream ClosableObject TraceSource ErrorSource List of all members.

Public Member Functions

 LcsRowScanExecStream ()
virtual void prepare (LcsRowScanExecStreamParams const &params)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void closeImpl ()
 Implements ClosableObject.
virtual void prepare (LcsRowScanBaseExecStreamParams const &params)
virtual void prepare (ConfluenceExecStreamParams const &params)
virtual void prepare (SingleOutputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Protected Member Functions

void syncColumns (SharedLcsClusterReader &pScan)
 Positions column readers based on new cluster reader position.
bool readColVals (SharedLcsClusterReader &pScan, TupleDataWithBuffer &tupleData, uint colStart)
 Reads column values based on current position of cluster reader.

Protected Attributes

VectorOfUint projMap
 Projection map that maps columns read from cluster to their position in the output projection.
uint nClusters
 Number of clusters to be scanned.
boost::scoped_array< SharedLcsClusterReaderpClusters
 Array containing cluster readers.
TupleDescriptor projDescriptor
 Tuple descriptor representing columns to be projected from scans.
std::vector< int > nonClusterCols
 List of the non-cluster columns that need to be projected.
bool allSpecial
 True in the special case where we are only reading special columns.
CircularBuffer< LcsRidRunridRuns
 Circular buffer of rid runs.
std::vector< UnalignedAttributeAccessorattrAccessors
 Accessors used for loading actual column values.
std::vector< SharedExecStreamBufAccessorinAccessors
SharedExecStreamBufAccessor pOutAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose

Private Member Functions

virtual void buildOutputProj (TupleProjection &outputProj, LcsRowScanBaseExecStreamParams const &params)
 Builds outputProj from params.
bool initializeFiltersIfNeeded ()
 initializes the filter data structures
void prepareResidualFilters (LcsRowScanExecStreamParams const &params)
 initializes the filter data structures during prepare time
void initializeSystemSampling ()
 Initializes the system sampling data structures during open time.
ExecStreamResult fillRidRunBuffer ()
 Populates the circular rid run buffer.

Private Attributes

TupleDataWithBuffer outputTupleData
 Tuple data for all columns read from all clusters, including filter columns.
uint iFilterToInitialize
 This variable is used to control the initialization of residual filters.
TupleData projOutputTupleData
TupleProjection outputProj
TupleData ridTupleData
 Tuple data for input stream.
LbmRidReader ridReader
 Rid reader.
RecordNum nRidsRead
 Number of rids read.
LcsRid inputRid
 Current rid read from the input stream.
LcsRid nextRid
 Next rid that needs to be fetched.
bool readDeletedRid
 True if need to read a new deleted rid from the input stream.
bool deletedRidEos
 True if reached EOS on deleted rid input stream.
LcsRid deletedRid
 Current deleted rid.
bool tupleFound
 true if tuple has been read and not yet produced
bool isFullScan
 true if executing full table scan
bool hasExtraFilter
 true if there's extra range list filter(as the last input)
bool producePending
 true if produceTuple pending
boost::scoped_array< LcsResidualColumnFilters * > filters
 The local filter data structure.
int32_t nFilters
 The number of residual column filters configured.
TableSamplingMode samplingMode
 One of SAMPLING_OFF, SAMPLING_BERNOULLI or SAMPLING_SYSTEM.
float samplingRate
 the sampling rate (0.0 to 1.0)
bool isSamplingRepeatable
 true if the sample should be repeatable
int32_t repeatableSeed
 seed for repeatable sampling
int32_t samplingClumps
 number of clumps for system sampling
uint64_t clumpSize
 size of each sampling clump
uint64_t clumpDistance
 distance (in rows) between each clump
uint64_t clumpPos
 position (0 to clumpSize) in current clump
uint64_t clumpSkipPos
 position (clumpDistance to 0) in between clumps
uint numClumps
 The number of clumps that need to be built.
uint numClumpsBuilt
 Running counter of the number of clumps built.
boost::scoped_ptr< BernoulliRngsamplingRng
 RNG for Bernoulli sampling.
int64_t rowCount
 Number of rows in the table.
bool ridRunsBuilt
 True if completed building rid runs.
LcsRidRun currRidRun
 Current rid run being constructed.
CircularBufferIter< LcsRidRunridRunIter
 Iterator over the circular buffer containing rid runs.

Detailed Description

Given a stream of RIDs, performs a table scan for those RIDs using the appropriate clustered indexes defined on the table.

The stream returns a projected subset of columns from the table

Definition at line 110 of file LcsRowScanExecStream.h.


Constructor & Destructor Documentation

LcsRowScanExecStream::LcsRowScanExecStream (  ) 

Definition at line 33 of file LcsRowScanExecStream.cpp.

References CircularBuffer< T >::resize(), and LcsRowScanBaseExecStream::ridRuns.

00034 :
00035     LcsRowScanBaseExecStream(),
00036     ridRunIter(&ridRuns)
00037 {
00038     ridRuns.resize(4000);
00039 }


Member Function Documentation

void LcsRowScanExecStream::buildOutputProj ( TupleProjection outputProj,
LcsRowScanBaseExecStreamParams const &  params 
) [private, virtual]

Builds outputProj from params.

Parameters:
outputProj the projection to be built
params the LcsRowScanBaseExecStreamParams

Reimplemented from LcsRowScanBaseExecStream.

Definition at line 681 of file LcsRowScanExecStream.cpp.

References outputProj, LcsRowScanBaseExecStreamParams::outputProj, and LcsRowScanExecStreamParams::residualFilterCols.

00684 {
00685     LcsRowScanExecStreamParams const &rowScanParams =
00686         dynamic_cast<const LcsRowScanExecStreamParams&>(params);
00687 
00688     /*
00689      * Build a projection that contains filter columns
00690      */
00691     for (uint i = 0; i < rowScanParams.outputProj.size(); i++) {
00692         outputProj.push_back(rowScanParams.outputProj[i]);
00693     }
00694     for (uint i = 0; i < rowScanParams.residualFilterCols.size(); i++) {
00695         uint j;
00696         for (j = 0; j < rowScanParams.outputProj.size(); j++) {
00697             if (rowScanParams.outputProj[j] ==
00698                 rowScanParams.residualFilterCols[i])
00699             {
00700                 break;
00701             }
00702         }
00703 
00704         if (j >= rowScanParams.outputProj.size()) {
00705             outputProj.push_back(rowScanParams.residualFilterCols[i]);
00706         }
00707     }
00708 }

bool LcsRowScanExecStream::initializeFiltersIfNeeded (  )  [private]

initializes the filter data structures

Returns:
false iff input under flows.

Definition at line 256 of file LcsRowScanExecStream.cpp.

References EXECBUF_EOS, filters, FixedBuffer, TupleAccessor::getCurrentByteCount(), TupleAccessor::getCurrentTupleBuf(), iFilterToInitialize, ConfluenceExecStream::inAccessors, nFilters, TupleAccessor::setCurrentTupleBuf(), and TupleAccessor::unmarshal().

Referenced by execute().

00257 {
00258     /*
00259      * initialize the filters local data
00260      */
00261     for (; iFilterToInitialize < nFilters; iFilterToInitialize++) {
00262         SharedExecStreamBufAccessor &pInAccessor =
00263             inAccessors[iFilterToInitialize + 1];
00264         TupleAccessor &inputAccessor =
00265             pInAccessor->getConsumptionTupleAccessor();
00266 
00267         if (pInAccessor->getState() != EXECBUF_EOS) {
00268             PLcsResidualColumnFilters filter = filters[iFilterToInitialize];
00269 
00270             while (pInAccessor->demandData()) {
00271                 SharedLcsResidualFilter filterData(new LcsResidualFilter);
00272 
00273                 pInAccessor->accessConsumptionTuple();
00274 
00275                 /*
00276                  * Build lower and upper bound data
00277                  */
00278                 filterData->boundData.compute(pInAccessor->getTupleDesc());
00279                 filterData->boundBuf.reset(
00280                     new FixedBuffer[inputAccessor.getCurrentByteCount()]);
00281 
00282                 memcpy(
00283                     filterData->boundBuf.get(),
00284                     pInAccessor->getConsumptionStart(),
00285                     inputAccessor.getCurrentByteCount());
00286 
00287                 /*
00288                  * inputAccessor is used to unmarshal into boundData.
00289                  * in order to do this, its current buffer is set to
00290                  * boundBuf and restored.
00291                  */
00292                 PConstBuffer tmpBuf;
00293                 tmpBuf = inputAccessor.getCurrentTupleBuf();
00294                 inputAccessor.setCurrentTupleBuf(filterData->boundBuf.get());
00295                 inputAccessor.unmarshal(filterData->boundData);
00296                 inputAccessor.setCurrentTupleBuf(tmpBuf);
00297 
00298                 /*
00299                  * record directives.
00300                  */
00301                 filterData->lowerBoundDirective =
00302                     SearchEndpoint(*filterData->boundData[0].pData);
00303                 filterData->upperBoundDirective =
00304                     SearchEndpoint(*filterData->boundData[2].pData);
00305 
00306                 filter->filterData.push_back(filterData);
00307 
00308                 pInAccessor->consumeTuple();
00309             }
00310 
00311             if (pInAccessor->getState() != EXECBUF_EOS) {
00312                 return false;
00313             }
00314         }
00315         filters[iFilterToInitialize]->filterDataInitialized = true;
00316     }
00317     return true;
00318 }

void LcsRowScanExecStream::prepareResidualFilters ( LcsRowScanExecStreamParams const &  params  )  [private]

initializes the filter data structures during prepare time

Parameters:
params the LcsRowScanExecStreamParams

Definition at line 41 of file LcsRowScanExecStream.cpp.

References LcsResidualColumnFilters::attrAccessor, UnalignedAttributeAccessor::compute(), TupleDataWithBuffer::computeAndAllocate(), filters, LcsResidualColumnFilters::hasResidualFilters, LcsResidualColumnFilters::inputKeyDesc, LcsRowScanBaseExecStreamParams::lcsClusterScanDefs, LcsResidualColumnFilters::lowerBoundProj, LcsRowScanBaseExecStream::nClusters, nFilters, LcsRowScanBaseExecStream::nonClusterCols, LcsRowScanBaseExecStreamParams::outputProj, LcsRowScanBaseExecStream::pClusters, LcsRowScanBaseExecStream::projDescriptor, TupleDescriptor::projectFrom(), LcsRowScanBaseExecStream::projMap, LcsResidualColumnFilters::readerKeyData, LcsResidualColumnFilters::readerKeyProj, LcsRowScanExecStreamParams::residualFilterCols, and LcsResidualColumnFilters::upperBoundProj.

Referenced by prepare().

00043 {
00044     nFilters = params.residualFilterCols.size();
00045 
00046     /*
00047      * compute the outputTupleData position of filter columns
00048      */
00049     VectorOfUint valueCols;
00050     uint j, k = 0;
00051     for (uint i = 0;  i < nFilters; i++) {
00052         for (j = 0; j < params.outputProj.size(); j++) {
00053             if (params.outputProj[j] == params.residualFilterCols[i]) {
00054                 valueCols.push_back(j);
00055                 break;
00056             }
00057         }
00058 
00059         if (j >= params.outputProj.size()) {
00060             valueCols.push_back(params.outputProj.size() +  k);
00061             k++;
00062         }
00063     }
00064 
00065     /*
00066      * compute the cluster id and cluster position
00067      */
00068     uint valueClus;
00069     uint clusterPos;
00070     uint clusterStart = 0;
00071     uint realClusterStart = 0;
00072 
00073     filters.reset(new PLcsResidualColumnFilters[nFilters]);
00074 
00075     for (uint i = 0; i < nClusters; i++) {
00076         uint clusterEnd = clusterStart +
00077             params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1;
00078 
00079         for (uint j = 0; j < nFilters; j++) {
00080             if (params.residualFilterCols[j] >= clusterStart &&
00081                 params.residualFilterCols[j] <= clusterEnd)
00082             {
00083                 valueClus = i;
00084 
00085                 /*
00086                  * find the position within the cluster
00087                  */
00088                 for (uint k = 0; k < projMap.size(); k++) {
00089                     if (projMap[k] == valueCols[j]) {
00090                         clusterPos = k - realClusterStart -
00091                             nonClusterCols.size();
00092 
00093                         LcsResidualColumnFilters &filter =
00094                             pClusters[valueClus]->
00095                             clusterCols[clusterPos].
00096                             getFilters();
00097 
00098                         filters[j] = &filter;
00099 
00100                         filter.hasResidualFilters = true;
00101 
00102                         filter.readerKeyProj.push_back(valueCols[j]);
00103                         filter.inputKeyDesc.projectFrom(
00104                             projDescriptor,
00105                             filter.readerKeyProj);
00106                         filter.attrAccessor.compute(
00107                             filter.inputKeyDesc[0]);
00108 
00109                         filter.lowerBoundProj.push_back(1);
00110                         filter.upperBoundProj.push_back(3);
00111                         filter.readerKeyData.computeAndAllocate(
00112                             filter.inputKeyDesc);
00113 
00114                         break;
00115                     }
00116                 }
00117                 // Continue with the same cluster for more filters
00118             }
00119         }
00120         // Look for filters in the next cluster; modify cluster boundaries
00121         clusterStart = clusterEnd + 1;
00122         realClusterStart += pClusters[i]->nColsToRead;
00123     }
00124 }

void LcsRowScanExecStream::initializeSystemSampling (  )  [private]

Initializes the system sampling data structures during open time.

Definition at line 321 of file LcsRowScanExecStream.cpp.

References clumpDistance, clumpPos, clumpSize, clumpSkipPos, numClumps, rowCount, samplingClumps, samplingRate, and TRACE_FINE.

Referenced by open().

00322 {
00323     clumpPos = 0;
00324     clumpSkipPos = 0;
00325 
00326     FENNEL_TRACE(TRACE_FINE, "rowCount = " << rowCount);
00327     FENNEL_TRACE(
00328         TRACE_FINE, "samplingRate = " << static_cast<double>(samplingRate));
00329 
00330     if (rowCount <= 0) {
00331         // Handle empty table or non-sense input.
00332         clumpSize = 1;
00333         clumpDistance = 0;
00334         numClumps = 0;
00335         return;
00336     }
00337 
00338     // Manipulate this value in a separate member field so we don't
00339     // mistakenly modify our stored copy of the parameter.
00340     numClumps = samplingClumps;
00341 
00342     // Compute clump size and distance
00343     int64_t sampleSize =
00344         static_cast<uint64_t>(
00345             round(
00346                 static_cast<double>(rowCount) *
00347                 static_cast<double>(samplingRate)));
00348     if (sampleSize < numClumps) {
00349         // Read at least as many rows as there are clumps, even if sample rate
00350         // is very small.
00351         sampleSize = numClumps;
00352     }
00353 
00354     if (sampleSize > rowCount) {
00355         // samplingRate should be < 1.0, but handle the case where it isn't,
00356         // or where there are fewer rows than clumps.
00357         sampleSize = rowCount;
00358         numClumps = 1;
00359     }
00360 
00361     FENNEL_TRACE(TRACE_FINE, "sampleSize = " << sampleSize);
00362 
00363     clumpSize =
00364         static_cast<uint64_t>(
00365             round(
00366                 static_cast<double>(sampleSize) /
00367                 static_cast<double>(numClumps)));
00368     assert(sampleSize >= clumpSize);
00369     assert(clumpSize >= 1);
00370 
00371     FENNEL_TRACE(TRACE_FINE, "clumpSize = " << clumpSize);
00372 
00373     if (numClumps > 1) {
00374         // Arrange for the last clump to end at the end of the table.
00375         clumpDistance =
00376             static_cast<uint64_t>(
00377                 round(
00378                     static_cast<double>(rowCount - sampleSize) /
00379                     static_cast<double>(numClumps - 1)));
00380 
00381         // Rounding can cause us to push the final clump past the end of the
00382         // table.  Avoid this when possible.
00383         uint64_t rowsRequired =
00384             (clumpSize + clumpDistance) * (numClumps - 1) + clumpSize;
00385         if (rowsRequired > rowCount && clumpDistance > 0) {
00386             clumpDistance--;
00387         }
00388     } else {
00389         // The entire sample will come from the beginning of the table.
00390         clumpDistance = (rowCount - sampleSize);
00391     }
00392 
00393     FENNEL_TRACE(TRACE_FINE, "clumpDistance = " << clumpDistance);
00394 }

ExecStreamResult LcsRowScanExecStream::fillRidRunBuffer (  )  [private]

Populates the circular rid run buffer.

Returns:
EXECRC_YIELD if buffer successfully populated

Definition at line 520 of file LcsRowScanExecStream.cpp.

References clumpDistance, clumpPos, clumpSize, clumpSkipPos, currRidRun, deletedRid, deletedRidEos, EXECRC_EOS, EXECRC_YIELD, inputRid, isFullScan, MAXU, LcsRidRun::nRids, numClumps, numClumpsBuilt, opaqueToInt(), CircularBuffer< T >::push_back(), readDeletedRid, LbmRidReaderBase::readRidAndAdvance(), ridReader, LcsRowScanBaseExecStream::ridRuns, ridRunsBuilt, rowCount, SAMPLING_OFF, SAMPLING_SYSTEM, samplingMode, samplingRng, CircularBuffer< T >::setReadOnly(), CircularBuffer< T >::spaceAvailable(), and LcsRidRun::startRid.

Referenced by execute().

00521 {
00522     ExecStreamResult rc;
00523     RecordNum nRows;
00524 
00525     do {
00526         if (!isFullScan) {
00527             rc = ridReader.readRidAndAdvance(inputRid);
00528             if (rc == EXECRC_EOS) {
00529                 ridRunsBuilt = true;
00530                 break;
00531             }
00532             if (rc != EXECRC_YIELD) {
00533                 return rc;
00534             }
00535             nRows = 1;
00536 
00537         } else {
00538             if (!deletedRidEos && readDeletedRid) {
00539                 rc = ridReader.readRidAndAdvance(deletedRid);
00540                 if (rc == EXECRC_EOS) {
00541                     deletedRidEos = true;
00542                     if (samplingMode == SAMPLING_OFF) {
00543                         ridRunsBuilt = true;
00544                     } else if (samplingMode == SAMPLING_SYSTEM &&
00545                         numClumps == 0)
00546                     {
00547                         ridRunsBuilt = true;
00548                         break;
00549                     }
00550                 } else if (rc != EXECRC_YIELD) {
00551                     return rc;
00552                 } else {
00553                     readDeletedRid = false;
00554                 }
00555             }
00556             // skip over deleted rids
00557             if (!deletedRidEos && inputRid == deletedRid) {
00558                 inputRid++;
00559                 readDeletedRid = true;
00560                 continue;
00561             } else {
00562                 if (deletedRidEos) {
00563                     nRows = MAXU;
00564                 } else {
00565                     nRows = opaqueToInt(deletedRid - inputRid);
00566                 }
00567             }
00568         }
00569 
00570         if (samplingMode != SAMPLING_OFF) {
00571             if (samplingMode == SAMPLING_SYSTEM) {
00572                 if (clumpSkipPos > 0) {
00573                     // We need to skip clumpSkipPos RIDs, taking into
00574                     // account deleted RIDs.  If all deleted RIDs have been
00575                     // processed (a), we can just skip forward to the next
00576                     // clump.  If we know the next deleted RID, skip to the
00577                     // next clump if we can (b), else skip to the deleted
00578                     // RID (c).  Processing will return here to handle the
00579                     // remaining clumpSkipPos rows when we reach the next
00580                     // live RID.  If we don't know the next deleted RID
00581                     // (d), skip the current live RID, let the deleted RID
00582                     // processing occur above and then processing will
00583                     // return here to deal with the remaining clumpSkipPos
00584                     // rows.
00585                     if (deletedRidEos) {
00586                         // (a)
00587                         inputRid += clumpSkipPos;
00588                         clumpSkipPos = 0;
00589                     } else if (!readDeletedRid) {
00590                         if (deletedRid > inputRid + clumpSkipPos) {
00591                             // (b)
00592                             inputRid += clumpSkipPos;
00593                             clumpSkipPos = 0;
00594                             nRows = opaqueToInt(deletedRid - inputRid);
00595                         } else {
00596                             // (c)
00597                             clumpSkipPos -= opaqueToInt(deletedRid - inputRid);
00598                             inputRid = deletedRid;
00599                             continue;
00600                         }
00601                     } else {
00602                         // (d)
00603                         clumpSkipPos--;
00604                         inputRid++;
00605                         continue;
00606                     }
00607                 }
00608 
00609                 if (nRows >= clumpSize - clumpPos) {
00610                     // Scale back the size of the rid run based on the
00611                     // clump size
00612                     nRows = clumpSize - clumpPos;
00613                     clumpPos = 0;
00614                     clumpSkipPos = clumpDistance;
00615                     if (++numClumpsBuilt == numClumps) {
00616                         ridRunsBuilt = true;
00617                     }
00618                 } else {
00619                     // We only have enough rids for a partial clump
00620                     clumpPos += nRows;
00621                 }
00622             } else {
00623                 // Bernoulli sampling
00624                 if (opaqueToInt(inputRid) >= opaqueToInt(rowCount)) {
00625                     ridRunsBuilt = true;
00626                     break;
00627                 }
00628                 if (!samplingRng->nextValue()) {
00629                     inputRid++;
00630                     continue;
00631                 }
00632                 nRows = 1;
00633             }
00634         }
00635 
00636         if (currRidRun.startRid == LcsRid(MAXU)) {
00637             currRidRun.startRid = inputRid;
00638             currRidRun.nRids = nRows;
00639         } else if (currRidRun.startRid + currRidRun.nRids == inputRid) {
00640             // If the next set of rids is contiguous with the previous,
00641             // continue adding on to the current run
00642             if (nRows == RecordNum(MAXU)) {
00643                 currRidRun.nRids = MAXU;
00644             } else {
00645                 currRidRun.nRids += nRows;
00646             }
00647         } else {
00648             // Otherwise, end the current one
00649             ridRuns.push_back(currRidRun);
00650 
00651             // And start a new one
00652             currRidRun.startRid = inputRid;
00653             currRidRun.nRids = nRows;
00654         }
00655 
00656         if (isFullScan) {
00657             inputRid += nRows;
00658         }
00659     } while (ridRuns.spaceAvailable() && !ridRunsBuilt);
00660 
00661     // Write out the last run
00662     if (ridRunsBuilt && currRidRun.startRid != LcsRid(MAXU)) {
00663         ridRuns.push_back(currRidRun);
00664     }
00665 
00666     if (ridRunsBuilt) {
00667         ridRuns.setReadOnly();
00668     }
00669     return EXECRC_YIELD;
00670 }

void LcsRowScanExecStream::prepare ( LcsRowScanExecStreamParams const &  params  )  [virtual]

Definition at line 126 of file LcsRowScanExecStream.cpp.

References LcsRowScanBaseExecStream::attrAccessors, TupleData::compute(), TupleDataWithBuffer::computeAndAllocate(), LcsRowScanExecStreamParams::hasExtraFilter, hasExtraFilter, ConfluenceExecStream::inAccessors, LcsRowScanExecStreamParams::isFullScan, isFullScan, isSamplingRepeatable, nFilters, outputProj, LcsRowScanBaseExecStreamParams::outputProj, outputTupleData, SingleOutputExecStream::pOutAccessor, LcsRowScanBaseExecStream::prepare(), prepareResidualFilters(), LcsRowScanBaseExecStream::projDescriptor, projOutputTupleData, repeatableSeed, ridTupleData, rowCount, SAMPLING_BERNOULLI, SAMPLING_OFF, LcsRowScanExecStreamParams::samplingClumps, samplingClumps, LcsRowScanExecStreamParams::samplingIsRepeatable, LcsRowScanExecStreamParams::samplingMode, samplingMode, LcsRowScanExecStreamParams::samplingRate, samplingRate, LcsRowScanExecStreamParams::samplingRepeatableSeed, samplingRng, LcsRowScanExecStreamParams::samplingRowCount, and STANDARD_TYPE_RECORDNUM.

00127 {
00128     LcsRowScanBaseExecStream::prepare(params);
00129 
00130     isFullScan = params.isFullScan;
00131     hasExtraFilter = params.hasExtraFilter;
00132 
00133     // Set up rid bitmap input stream
00134     ridTupleData.compute(inAccessors[0]->getTupleDesc());
00135 
00136     // validate input stream parameters
00137     TupleDescriptor inputDesc = inAccessors[0]->getTupleDesc();
00138     assert(inputDesc.size() == 3);
00139     StandardTypeDescriptorFactory stdTypeFactory;
00140     TupleAttributeDescriptor expectedRidDesc(
00141         stdTypeFactory.newDataType(STANDARD_TYPE_RECORDNUM));
00142     assert(inputDesc[0] == expectedRidDesc);
00143 
00144     assert(hasExtraFilter == (inAccessors.size() > 1));
00145 
00146     if (hasExtraFilter) {
00147         prepareResidualFilters(params);
00148     } else {
00149         nFilters = 0;
00150     }
00151 
00152     /*
00153      * projDescriptor now also includes filter columns
00154      */
00155     for (uint i = 0; i < params.outputProj.size(); i++) {
00156         outputProj.push_back(i);
00157     }
00158 
00159     pOutAccessor->setTupleShape(pOutAccessor->getTupleDesc());
00160     outputTupleData.computeAndAllocate(projDescriptor);
00161 
00162     /*
00163      * build the real output accessor
00164      * it will be used to unmarshal data into the
00165      * real output row: projOutputTuple.
00166      */
00167     projOutputTupleData.compute(pOutAccessor->getTupleDesc());
00168 
00169     attrAccessors.resize(projDescriptor.size());
00170     for (uint i = 0; i < projDescriptor.size(); ++i) {
00171         attrAccessors[i].compute(projDescriptor[i]);
00172     }
00173 
00174     /* configure sampling */
00175     samplingMode = params.samplingMode;
00176 
00177     if (samplingMode != SAMPLING_OFF) {
00178         samplingRate = params.samplingRate;
00179         rowCount = params.samplingRowCount;
00180 
00181         if (samplingMode == SAMPLING_BERNOULLI) {
00182             isSamplingRepeatable = params.samplingIsRepeatable;
00183             repeatableSeed = params.samplingRepeatableSeed;
00184             samplingClumps = -1;
00185 
00186             samplingRng.reset(new BernoulliRng(samplingRate));
00187         } else {
00188             assert(isFullScan);
00189 
00190             samplingClumps = params.samplingClumps;
00191             assert(samplingClumps > 0);
00192 
00193             isSamplingRepeatable = false;
00194         }
00195     }
00196 }

void LcsRowScanExecStream::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from LcsRowScanBaseExecStream.

Definition at line 198 of file LcsRowScanExecStream.cpp.

References CircularBuffer< T >::clear(), clumpDistance, clumpPos, clumpSize, currRidRun, deletedRidEos, filters, iFilterToInitialize, ConfluenceExecStream::inAccessors, LbmRidReader::init(), initializeSystemSampling(), inputRid, isFullScan, isSamplingRepeatable, MAXU, nextRid, nFilters, LcsRidRun::nRids, nRidsRead, numClumpsBuilt, LcsRowScanBaseExecStream::open(), producePending, readDeletedRid, repeatableSeed, CircularBufferIter< T >::reset(), ridReader, ridRunIter, LcsRowScanBaseExecStream::ridRuns, ridRunsBuilt, ridTupleData, SAMPLING_BERNOULLI, SAMPLING_SYSTEM, samplingMode, samplingRng, LcsRidRun::startRid, and tupleFound.

00199 {
00200     LcsRowScanBaseExecStream::open(restart);
00201     producePending = false;
00202     tupleFound = false;
00203     nRidsRead = 0;
00204     ridRunsBuilt = false;
00205     currRidRun.startRid = LcsRid(MAXU);
00206     currRidRun.nRids = 0;
00207     ridRuns.clear();
00208     ridRunIter.reset();
00209 
00210     if (isFullScan) {
00211         inputRid = LcsRid(0);
00212         readDeletedRid = true;
00213         deletedRidEos = false;
00214     }
00215     nextRid = LcsRid(0);
00216     ridReader.init(inAccessors[0], ridTupleData);
00217 
00218     /*
00219      * Read from the 1st input, but only if we're not doing a restart.
00220      * Restarts can reuse the structures set up on the initial open
00221      * because the current assumption is that the residual filter
00222      * values don't change in between restarts.  If on restart, if a filter
00223      * wasn't completely initialized, then reinitialize it.
00224      */
00225     if (!restart) {
00226         iFilterToInitialize = 0;
00227     } else if (iFilterToInitialize < nFilters) {
00228         if (!filters[iFilterToInitialize]->filterDataInitialized) {
00229             filters[iFilterToInitialize]->filterData.clear();
00230         }
00231     }
00232 
00233     if (samplingMode == SAMPLING_BERNOULLI) {
00234         if (isSamplingRepeatable) {
00235             samplingRng->reseed(repeatableSeed);
00236         } else if (!restart) {
00237             samplingRng->reseed(static_cast<uint32_t>(time(0)));
00238         }
00239     } else if (samplingMode == SAMPLING_SYSTEM) {
00240         clumpSize = 0;
00241         clumpDistance = 0;
00242         clumpPos = 0;
00243         numClumpsBuilt = 0;
00244 
00245         initializeSystemSampling();
00246     }
00247 }

ExecStreamResult LcsRowScanExecStream::execute ( ExecStreamQuantum const &  quantum  )  [virtual]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 397 of file LcsRowScanExecStream.cpp.

References CircularBufferIter< T >::done(), EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, fillRidRunBuffer(), CircularBufferIter< T >::getCurrPos(), LcsClusterReader::getFetchRids(), initializeFiltersIfNeeded(), isFullScan, LCS_RID_COLUMN_ID, MAXU, LcsRowScanBaseExecStream::nClusters, nextRid, CircularBuffer< T >::nFreeSpace(), LcsRowScanBaseExecStream::nonClusterCols, nRidsRead, ExecStreamQuantum::nTuplesMax, opaqueToInt(), outputProj, outputTupleData, LcsRowScanBaseExecStream::pClusters, SingleOutputExecStream::pOutAccessor, producePending, TupleData::projectFrom(), LcsRowScanBaseExecStream::projMap, projOutputTupleData, LcsRowScanBaseExecStream::readColVals(), TupleDataWithBuffer::resetBuffer(), ridRunIter, LcsRowScanBaseExecStream::ridRuns, ridRunsBuilt, LcsRowScanBaseExecStream::syncColumns(), and tupleFound.

00398 {
00399     if (!initializeFiltersIfNeeded()) {
00400         return EXECRC_BUF_UNDERFLOW;
00401     }
00402 
00403     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00404         uint iClu;
00405         bool passedFilter;
00406 
00407         while (!producePending) {
00408             // No need to fill the rid run buffer each time through the loop
00409             if (!ridRunsBuilt && ridRuns.nFreeSpace() > 100) {
00410                 ExecStreamResult rc = fillRidRunBuffer();
00411                 if (rc != EXECRC_YIELD) {
00412                     return rc;
00413                 }
00414             }
00415 
00416             // Determine the rid that needs to be fetched based on the
00417             // contents of the rid run buffer.
00418             LcsRid rid =
00419                 LcsClusterReader::getFetchRids(ridRunIter, nextRid, true);
00420             if (rid == LcsRid(MAXU)) {
00421                 assert(ridRunIter.done());
00422                 pOutAccessor->markEOS();
00423                 return EXECRC_EOS;
00424             }
00425 
00426             uint prevClusterEnd = 0;
00427             // reset datum pointers, in case previous tuple had nulls
00428             outputTupleData.resetBuffer();
00429 
00430             // Read the non-cluster columns first
00431             for (uint j = 0; j < nonClusterCols.size(); j++) {
00432                 if (nonClusterCols[j] == LCS_RID_COLUMN_ID) {
00433                     memcpy(
00434                         const_cast<PBuffer>(outputTupleData[projMap[j]].pData),
00435                         (PBuffer) &rid, sizeof(LcsRid));
00436                     prevClusterEnd++;
00437                 } else {
00438                     permAssert(false);
00439                 }
00440             }
00441 
00442             // Then go through each cluster, forming rows and checking ranges
00443             for (iClu = 0, passedFilter = true; iClu <  nClusters; iClu++) {
00444                 SharedLcsClusterReader &pScan = pClusters[iClu];
00445 
00446                 // Resync the cluster reader to the current rid position
00447                 pScan->catchUp(ridRunIter.getCurrPos(), nextRid);
00448 
00449                 // if we have not read a batch yet or we've reached the
00450                 // end of a batch, position to the rid we want to read
00451 
00452                 if (!pScan->isPositioned() || rid >= pScan->getRangeEndRid()) {
00453                     bool rc = pScan->position(rid);
00454 
00455                     // rid not found, so just consume the rid and
00456                     // continue
00457                     if (rc == false)
00458                         break;
00459 
00460                     assert(rid >= pScan->getRangeStartRid()
00461                            && rid < pScan->getRangeEndRid());
00462 
00463                     // Tell all column scans that the batch has changed.
00464                     syncColumns(pScan);
00465                 } else {
00466                     // Should not have moved into previous batch.
00467                     assert(rid > pScan->getRangeStartRid());
00468 
00469                     // move to correct position within scan; we know we
00470                     // will not fall off end of batch, so use non-checking
00471                     // function (for speed)
00472                     pScan->advanceWithinBatch(
00473                         opaqueToInt(rid - pScan->getCurrentRid()));
00474                 }
00475 
00476                 passedFilter =
00477                     readColVals(
00478                         pScan,
00479                         outputTupleData,
00480                         prevClusterEnd);
00481                 if (!passedFilter) {
00482                     break;
00483                 }
00484                 prevClusterEnd += pScan->nColsToRead;
00485             }
00486 
00487             if (!passedFilter) {
00488                 continue;
00489             }
00490             if (iClu == nClusters) {
00491                 tupleFound = true;
00492             }
00493             producePending = true;
00494         }
00495 
00496         // produce tuple
00497         projOutputTupleData.projectFrom(outputTupleData, outputProj);
00498         if (tupleFound) {
00499             if (!pOutAccessor->produceTuple(projOutputTupleData)) {
00500                 return EXECRC_BUF_OVERFLOW;
00501             }
00502         }
00503         producePending = false;
00504 
00505         if (isFullScan) {
00506             // if tuple not found, reached end of table
00507             if (!tupleFound) {
00508                 pOutAccessor->markEOS();
00509                 return EXECRC_EOS;
00510             }
00511         }
00512 
00513         tupleFound = false;
00514         nRidsRead++;
00515     }
00516 
00517     return EXECRC_QUANTUM_EXPIRED;
00518 }

void LcsRowScanExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual]

Reimplemented from LcsRowScanBaseExecStream.

Definition at line 249 of file LcsRowScanExecStream.cpp.

References LcsRowScanBaseExecStream::getResourceRequirements().

00252 {
00253     LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity);
00254 }

void LcsRowScanExecStream::closeImpl (  )  [virtual]

Implements ClosableObject.

ExecStream implementations may override this to release any resources acquired while open.

Reimplemented from LcsRowScanBaseExecStream.

Definition at line 672 of file LcsRowScanExecStream.cpp.

References LcsRowScanBaseExecStream::closeImpl(), filters, and nFilters.

00673 {
00674     LcsRowScanBaseExecStream::closeImpl();
00675 
00676     for (uint i = 0; i < nFilters; i++) {
00677         filters[i]->filterData.clear();
00678     }
00679 }

void LcsRowScanBaseExecStream::syncColumns ( SharedLcsClusterReader pScan  )  [protected, inherited]

Positions column readers based on new cluster reader position.

Parameters:
pScan cluster reader

Definition at line 172 of file LcsRowScanBaseExecStream.cpp.

Referenced by LbmGeneratorExecStream::advanceReader(), execute(), LbmGeneratorExecStream::execute(), and LbmGeneratorExecStream::generateMultiKeyBitmaps().

00173 {
00174     for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00175         pScan->clusterCols[iCluCol].sync();
00176     }
00177 }

bool LcsRowScanBaseExecStream::readColVals ( SharedLcsClusterReader pScan,
TupleDataWithBuffer tupleData,
uint  colStart 
) [protected, inherited]

Reads column values based on current position of cluster reader.

Parameters:
pScan cluster reader
tupleData tupledata where data will be loaded
colStart starting column offset where first column will be loaded
Returns:
false if column filters failed; true otherwise

Definition at line 179 of file LcsRowScanBaseExecStream.cpp.

References LcsRowScanBaseExecStream::allSpecial, LcsRowScanBaseExecStream::attrAccessors, LcsRowScanBaseExecStream::projDescriptor, and LcsRowScanBaseExecStream::projMap.

Referenced by execute(), and LbmGeneratorExecStream::generateMultiKeyBitmaps().

00183 {
00184     if (!allSpecial) {
00185         for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00186             // Get value of each column and load it to the appropriate
00187             // tuple datum entry
00188             PBuffer curValue = pScan->clusterCols[iCluCol].getCurrentValue();
00189             uint idx = projMap[colStart + iCluCol];
00190 
00191             attrAccessors[idx].loadValue(tupleData[idx], curValue);
00192             if (pScan->clusterCols[iCluCol].getFilters().hasResidualFilters) {
00193                 if (!pScan->clusterCols[iCluCol].applyFilters(
00194                     projDescriptor,
00195                     tupleData))
00196                 {
00197                     return false;
00198                 }
00199             }
00200         }
00201     }
00202     return true;
00203 }

void LcsRowScanBaseExecStream::prepare ( LcsRowScanBaseExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 33 of file LcsRowScanBaseExecStream.cpp.

References LcsRowScanBaseExecStream::allSpecial, LcsRowScanBaseExecStream::buildOutputProj(), BTreeParams::keyProj, BTreeDescriptor::keyProjection, LCS_RID_COLUMN_ID, LcsRowScanBaseExecStreamParams::lcsClusterScanDefs, LcsRowScanBaseExecStream::nClusters, LcsRowScanBaseExecStream::nonClusterCols, LcsRowScanBaseExecStreamParams::outputProj, BTreeParams::pageOwnerId, BTreeDescriptor::pageOwnerId, ExecStreamParams::pCacheAccessor, SegmentAccessor::pCacheAccessor, LcsRowScanBaseExecStream::pClusters, SingleOutputExecStream::pOutAccessor, ConfluenceExecStream::prepare(), LcsRowScanBaseExecStream::projDescriptor, LcsRowScanBaseExecStream::projMap, BTreeParams::pSegment, SegmentAccessor::pSegment, LcsRowScanBaseExecStream::ridRuns, BTreeParams::rootPageId, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeParams::segmentId, BTreeDescriptor::segmentId, BTreeParams::tupleDesc, and BTreeDescriptor::tupleDescriptor.

Referenced by prepare(), and LbmGeneratorExecStream::prepare().

00035 {
00036     ConfluenceExecStream::prepare(params);
00037 
00038     // Copy cluster definition parameters and setup btree readers for each
00039     // cluster.  Also, setup the full output tuple based on the ordered
00040     // list of cluster descriptors.
00041 
00042     nClusters = params.lcsClusterScanDefs.size();
00043     pClusters.reset(new SharedLcsClusterReader[nClusters]);
00044 
00045     uint clusterStart = 0;
00046     uint projCount = 0;
00047     TupleDescriptor allClusterTupleDesc;
00048     TupleProjection newProj, outputProj;
00049 
00050     buildOutputProj(outputProj, params);
00051 
00052     newProj.resize(outputProj.size());
00053 
00054     // if we're projecting non-cluster columns, keep track of them separately;
00055     TupleDescriptor outputTupleDesc = pOutAccessor->getTupleDesc();
00056     for (uint i = 0; i < outputProj.size(); i++) {
00057         if (outputProj[i] == LCS_RID_COLUMN_ID) {
00058             newProj[i] = projCount++;
00059             allClusterTupleDesc.push_back(outputTupleDesc[i]);
00060             nonClusterCols.push_back(params.outputProj[i]);
00061         }
00062     }
00063 
00064     allSpecial = (nonClusterCols.size() == newProj.size());
00065 
00066     for (uint i = 0; i < nClusters; i++) {
00067         SharedLcsClusterReader &pClu = pClusters[i];
00068 
00069         BTreeExecStreamParams const &bTreeParams = params.lcsClusterScanDefs[i];
00070 
00071         BTreeDescriptor treeDescriptor;
00072         treeDescriptor.segmentAccessor.pSegment = bTreeParams.pSegment;
00073         treeDescriptor.segmentAccessor.pCacheAccessor =
00074             bTreeParams.pCacheAccessor;
00075         treeDescriptor.tupleDescriptor = bTreeParams.tupleDesc;
00076         treeDescriptor.keyProjection = bTreeParams.keyProj;
00077         treeDescriptor.rootPageId = bTreeParams.rootPageId;
00078         treeDescriptor.segmentId = bTreeParams.segmentId;
00079         treeDescriptor.pageOwnerId = bTreeParams.pageOwnerId;
00080 
00081         pClu =
00082             SharedLcsClusterReader(
00083                 new LcsClusterReader(treeDescriptor, &ridRuns));
00084 
00085         // setup the cluster and column readers to only read the columns
00086         // that are going to be projected
00087         uint clusterEnd = clusterStart +
00088             params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1;
00089 
00090         // create a vector of the columns that are projected from
00091         // this cluster and recompute the projection list
00092         // based on the individual cluster projections
00093         TupleProjection clusterProj;
00094         for (uint j = 0; j < newProj.size(); j++) {
00095             if (outputProj[j] >= clusterStart &&
00096                 outputProj[j] <= clusterEnd)
00097             {
00098                 clusterProj.push_back(outputProj[j] - clusterStart);
00099                 newProj[j] = projCount++;
00100             }
00101         }
00102         clusterStart = clusterEnd + 1;
00103 
00104         // need to select at least one column from cluster, except in the
00105         // cases where we're only selecting special columns or when there
00106         // are filter columns; in the former case, we'll just arbitrarily
00107         // read the first column, but not actually project it
00108         if (allSpecial) {
00109            clusterProj.push_back(0);
00110         }
00111         pClu->initColumnReaders(
00112             params.lcsClusterScanDefs[i].clusterTupleDesc.size(),
00113             clusterProj);
00114         if (!allSpecial) {
00115             for (uint j = 0; j < pClu->nColsToRead; j++) {
00116                 allClusterTupleDesc.push_back(
00117                     params.lcsClusterScanDefs[i].
00118                         clusterTupleDesc[clusterProj[j]]);
00119             }
00120         }
00121     }
00122 
00123     // setup projected tuple descriptor, by reshuffling allClusterTupleDesc
00124     // built above, into the correct projection order
00125 
00126     for (uint i = 0; i < newProj.size(); i++) {
00127         projDescriptor.push_back(allClusterTupleDesc[newProj[i]]);
00128     }
00129 
00130     // create a projection map to map cluster data read to the output
00131     // projection
00132     projMap.resize(newProj.size());
00133     for (uint i = 0; i < projMap.size(); i++) {
00134         for (uint j = 0; j < newProj.size(); j++) {
00135             if (newProj[j] == i) {
00136                 projMap[i] = j;
00137             }
00138         }
00139     }
00140 }

void ConfluenceExecStream::prepare ( ConfluenceExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 37 of file ConfluenceExecStream.cpp.

References ConfluenceExecStream::getInputBufProvision(), ConfluenceExecStream::inAccessors, and SingleOutputExecStream::prepare().

Referenced by LcsRowScanBaseExecStream::prepare(), LbmUnionExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), CartesianJoinExecStream::prepare(), and BarrierExecStream::prepare().

00038 {
00039     SingleOutputExecStream::prepare(params);
00040 
00041     for (uint i = 0; i < inAccessors.size(); ++i) {
00042         assert(inAccessors[i]->getProvision() == getInputBufProvision());
00043     }
00044 }

void SingleOutputExecStream::prepare ( SingleOutputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 48 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().

Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().

00049 {
00050     ExecStream::prepare(params);
00051     assert(pOutAccessor);
00052     assert(pOutAccessor->getProvision() == getOutputBufProvision());
00053     if (pOutAccessor->getTupleDesc().empty()) {
00054         assert(!params.outputTupleDesc.empty());
00055         pOutAccessor->setTupleShape(
00056             params.outputTupleDesc,
00057             params.outputTupleFormat);
00058     }
00059 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual, inherited]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 93 of file ExecStream.cpp.

References EXEC_RESOURCE_ACCURATE.

Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().

00097 {
00098     getResourceRequirements(minQuantity, optQuantity);
00099     optType = EXEC_RESOURCE_ACCURATE;
00100 }

void ConfluenceExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Reimplemented from SingleOutputExecStream.

Definition at line 31 of file ConfluenceExecStream.cpp.

References ConfluenceExecStream::inAccessors.

00033 {
00034     inAccessors = inAccessorsInit;
00035 }

ExecStreamBufProvision ConfluenceExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Definition at line 58 of file ConfluenceExecStream.cpp.

References BUFPROV_PRODUCER.

Referenced by ConfluenceExecStream::prepare().

00059 {
00060     return BUFPROV_PRODUCER;
00061 }

void SingleOutputExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Implements ExecStream.

Reimplemented in ConduitExecStream.

Definition at line 41 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::pOutAccessor.

Referenced by ConduitExecStream::setOutputBufAccessors().

00043 {
00044     assert(outAccessors.size() == 1);
00045     pOutAccessor = outAccessors[0];
00046 }

ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.

Definition at line 69 of file SingleOutputExecStream.cpp.

References BUFPROV_CONSUMER.

Referenced by SingleOutputExecStream::prepare().

00070 {
00071     return BUFPROV_CONSUMER;
00072 }

bool ExecStream::canEarlyClose (  )  [virtual, inherited]

Returns:
true if the stream can be closed early

Reimplemented in SegBufferWriterExecStream.

Definition at line 49 of file ExecStream.cpp.

00050 {
00051     return true;
00052 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual, inherited]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 111 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.

Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().

00113 {
00114     resourceAllocation = quantity;
00115     if (pQuotaAccessor) {
00116         pQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00117     }
00118     if (pScratchQuotaAccessor) {
00119         pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00120     }
00121 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }


Member Data Documentation

TupleDataWithBuffer LcsRowScanExecStream::outputTupleData [private]

Tuple data for all columns read from all clusters, including filter columns.

Definition at line 117 of file LcsRowScanExecStream.h.

Referenced by execute(), and prepare().

uint LcsRowScanExecStream::iFilterToInitialize [private]

This variable is used to control the initialization of residual filters.

It's 1 less than the index of the first filtering input to read. After open, it's initializaed to 0. On execute, the filtering inputs are read sequentially, while this variable is incremented, until an underflow or all filtering inputs have been read. On return due to an underflow, this variable allows reading to resume where it had left off.

Definition at line 129 of file LcsRowScanExecStream.h.

Referenced by initializeFiltersIfNeeded(), and open().

TupleData LcsRowScanExecStream::projOutputTupleData [private]

Definition at line 134 of file LcsRowScanExecStream.h.

Referenced by execute(), and prepare().

TupleProjection LcsRowScanExecStream::outputProj [private]

Definition at line 139 of file LcsRowScanExecStream.h.

Referenced by buildOutputProj(), execute(), and prepare().

TupleData LcsRowScanExecStream::ridTupleData [private]

Tuple data for input stream.

Definition at line 144 of file LcsRowScanExecStream.h.

Referenced by open(), and prepare().

LbmRidReader LcsRowScanExecStream::ridReader [private]

Rid reader.

Definition at line 149 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and open().

RecordNum LcsRowScanExecStream::nRidsRead [private]

Number of rids read.

Definition at line 154 of file LcsRowScanExecStream.h.

Referenced by execute(), and open().

LcsRid LcsRowScanExecStream::inputRid [private]

Current rid read from the input stream.

Definition at line 159 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and open().

LcsRid LcsRowScanExecStream::nextRid [private]

Next rid that needs to be fetched.

Definition at line 164 of file LcsRowScanExecStream.h.

Referenced by execute(), and open().

bool LcsRowScanExecStream::readDeletedRid [private]

True if need to read a new deleted rid from the input stream.

Definition at line 169 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and open().

bool LcsRowScanExecStream::deletedRidEos [private]

True if reached EOS on deleted rid input stream.

Definition at line 174 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and open().

LcsRid LcsRowScanExecStream::deletedRid [private]

Current deleted rid.

Definition at line 179 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer().

bool LcsRowScanExecStream::tupleFound [private]

true if tuple has been read and not yet produced

Definition at line 184 of file LcsRowScanExecStream.h.

Referenced by execute(), and open().

bool LcsRowScanExecStream::isFullScan [private]

true if executing full table scan

Definition at line 189 of file LcsRowScanExecStream.h.

Referenced by execute(), fillRidRunBuffer(), open(), and prepare().

bool LcsRowScanExecStream::hasExtraFilter [private]

true if there's extra range list filter(as the last input)

Definition at line 194 of file LcsRowScanExecStream.h.

Referenced by prepare().

bool LcsRowScanExecStream::producePending [private]

true if produceTuple pending

Definition at line 199 of file LcsRowScanExecStream.h.

Referenced by execute(), and open().

boost::scoped_array<LcsResidualColumnFilters *> LcsRowScanExecStream::filters [private]

The local filter data structure.

Note that these are aliasing pointers to facilitate filter data initialization and memory deallocation.

Definition at line 207 of file LcsRowScanExecStream.h.

Referenced by closeImpl(), initializeFiltersIfNeeded(), open(), and prepareResidualFilters().

int32_t LcsRowScanExecStream::nFilters [private]

The number of residual column filters configured.

Definition at line 212 of file LcsRowScanExecStream.h.

Referenced by closeImpl(), initializeFiltersIfNeeded(), open(), prepare(), and prepareResidualFilters().

TableSamplingMode LcsRowScanExecStream::samplingMode [private]

One of SAMPLING_OFF, SAMPLING_BERNOULLI or SAMPLING_SYSTEM.

Definition at line 217 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), open(), and prepare().

float LcsRowScanExecStream::samplingRate [private]

the sampling rate (0.0 to 1.0)

Definition at line 222 of file LcsRowScanExecStream.h.

Referenced by initializeSystemSampling(), and prepare().

bool LcsRowScanExecStream::isSamplingRepeatable [private]

true if the sample should be repeatable

Definition at line 227 of file LcsRowScanExecStream.h.

Referenced by open(), and prepare().

int32_t LcsRowScanExecStream::repeatableSeed [private]

seed for repeatable sampling

Definition at line 232 of file LcsRowScanExecStream.h.

Referenced by open(), and prepare().

int32_t LcsRowScanExecStream::samplingClumps [private]

number of clumps for system sampling

Definition at line 237 of file LcsRowScanExecStream.h.

Referenced by initializeSystemSampling(), and prepare().

uint64_t LcsRowScanExecStream::clumpSize [private]

size of each sampling clump

Definition at line 242 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), initializeSystemSampling(), and open().

uint64_t LcsRowScanExecStream::clumpDistance [private]

distance (in rows) between each clump

Definition at line 247 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), initializeSystemSampling(), and open().

uint64_t LcsRowScanExecStream::clumpPos [private]

position (0 to clumpSize) in current clump

Definition at line 252 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), initializeSystemSampling(), and open().

uint64_t LcsRowScanExecStream::clumpSkipPos [private]

position (clumpDistance to 0) in between clumps

Definition at line 257 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and initializeSystemSampling().

uint LcsRowScanExecStream::numClumps [private]

The number of clumps that need to be built.

Definition at line 262 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and initializeSystemSampling().

uint LcsRowScanExecStream::numClumpsBuilt [private]

Running counter of the number of clumps built.

Definition at line 267 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and open().

boost::scoped_ptr<BernoulliRng> LcsRowScanExecStream::samplingRng [private]

RNG for Bernoulli sampling.

Definition at line 272 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), open(), and prepare().

int64_t LcsRowScanExecStream::rowCount [private]

Number of rows in the table.

Used only for sampling. In the case of Bernoulli sampling, includes count of deleted rows.

Definition at line 278 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), initializeSystemSampling(), and prepare().

bool LcsRowScanExecStream::ridRunsBuilt [private]

True if completed building rid runs.

Definition at line 283 of file LcsRowScanExecStream.h.

Referenced by execute(), fillRidRunBuffer(), and open().

LcsRidRun LcsRowScanExecStream::currRidRun [private]

Current rid run being constructed.

Definition at line 288 of file LcsRowScanExecStream.h.

Referenced by fillRidRunBuffer(), and open().

CircularBufferIter<LcsRidRun> LcsRowScanExecStream::ridRunIter [private]

Iterator over the circular buffer containing rid runs.

Definition at line 293 of file LcsRowScanExecStream.h.

Referenced by execute(), and open().

VectorOfUint LcsRowScanBaseExecStream::projMap [protected, inherited]

Projection map that maps columns read from cluster to their position in the output projection.

Definition at line 77 of file LcsRowScanBaseExecStream.h.

Referenced by execute(), LbmGeneratorExecStream::generateSingletons(), LcsRowScanBaseExecStream::prepare(), prepareResidualFilters(), and LcsRowScanBaseExecStream::readColVals().

uint LcsRowScanBaseExecStream::nClusters [protected, inherited]

Number of clusters to be scanned.

Definition at line 82 of file LcsRowScanBaseExecStream.h.

Referenced by LcsRowScanBaseExecStream::closeImpl(), execute(), LbmGeneratorExecStream::execute(), LbmGeneratorExecStream::generateMultiKeyBitmaps(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsRowScanBaseExecStream::LcsRowScanBaseExecStream(), LcsRowScanBaseExecStream::open(), LbmGeneratorExecStream::open(), LcsRowScanBaseExecStream::prepare(), LbmGeneratorExecStream::prepare(), and prepareResidualFilters().

boost::scoped_array<SharedLcsClusterReader> LcsRowScanBaseExecStream::pClusters [protected, inherited]

Array containing cluster readers.

Definition at line 87 of file LcsRowScanBaseExecStream.h.

Referenced by LcsRowScanBaseExecStream::closeImpl(), execute(), LbmGeneratorExecStream::execute(), LbmGeneratorExecStream::generateBitmaps(), LbmGeneratorExecStream::generateMultiKeyBitmaps(), LbmGeneratorExecStream::generateSingleKeyBitmaps(), LbmGeneratorExecStream::generateSingletons(), LcsRowScanBaseExecStream::open(), LbmGeneratorExecStream::open(), LcsRowScanBaseExecStream::prepare(), and prepareResidualFilters().

TupleDescriptor LcsRowScanBaseExecStream::projDescriptor [protected, inherited]

Tuple descriptor representing columns to be projected from scans.

Definition at line 92 of file LcsRowScanBaseExecStream.h.

Referenced by prepare(), LcsRowScanBaseExecStream::prepare(), prepareResidualFilters(), and LcsRowScanBaseExecStream::readColVals().

std::vector<int> LcsRowScanBaseExecStream::nonClusterCols [protected, inherited]

List of the non-cluster columns that need to be projected.

Definition at line 97 of file LcsRowScanBaseExecStream.h.

Referenced by execute(), LcsRowScanBaseExecStream::prepare(), and prepareResidualFilters().

bool LcsRowScanBaseExecStream::allSpecial [protected, inherited]

True in the special case where we are only reading special columns.

I.e., we don't actually have to read the underlying cluster data.

Definition at line 103 of file LcsRowScanBaseExecStream.h.

Referenced by LcsRowScanBaseExecStream::prepare(), and LcsRowScanBaseExecStream::readColVals().

CircularBuffer<LcsRidRun> LcsRowScanBaseExecStream::ridRuns [protected, inherited]

Circular buffer of rid runs.

Definition at line 108 of file LcsRowScanBaseExecStream.h.

Referenced by execute(), LbmGeneratorExecStream::execute(), fillRidRunBuffer(), LcsRowScanExecStream(), open(), LbmGeneratorExecStream::open(), LcsRowScanBaseExecStream::prepare(), and LbmGeneratorExecStream::prepare().

std::vector<UnalignedAttributeAccessor> LcsRowScanBaseExecStream::attrAccessors [protected, inherited]

Accessors used for loading actual column values.

Definition at line 120 of file LcsRowScanBaseExecStream.h.

Referenced by LbmGeneratorExecStream::generateBitmaps(), LbmGeneratorExecStream::generateSingletons(), prepare(), LbmGeneratorExecStream::prepare(), and LcsRowScanBaseExecStream::readColVals().

std::vector<SharedExecStreamBufAccessor> ConfluenceExecStream::inAccessors [protected, inherited]

Definition at line 50 of file ConfluenceExecStream.h.

Referenced by NestedLoopJoinExecStream::checkNumInputs(), CartesianJoinExecStream::checkNumInputs(), LbmMinusExecStream::comparePrefixes(), LbmGeneratorExecStream::execute(), MergeExecStream::execute(), BarrierExecStream::execute(), LbmMinusExecStream::findMinInput(), initializeFiltersIfNeeded(), open(), LbmUnionExecStream::open(), LbmMinusExecStream::open(), LbmGeneratorExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), ConfluenceExecStream::open(), prepare(), LbmUnionExecStream::prepare(), LbmMinusExecStream::prepare(), LbmGeneratorExecStream::prepare(), LbmChopperExecStream::prepare(), LbmBitOpExecStream::prepare(), LhxJoinExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConfluenceExecStream::prepare(), CartesianJoinExecStream::prepare(), BarrierExecStream::prepare(), NestedLoopJoinExecStream::preProcessRightInput(), BarrierExecStream::processInputTuple(), LbmBitOpExecStream::producePendingOutput(), LbmMinusExecStream::restartSubtrahends(), LhxJoinExecStream::setHashInfo(), and ConfluenceExecStream::setInputBufAccessors().

SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited]

Definition at line 56 of file SingleOutputExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:38 2009 for Fennel by  doxygen 1.5.1