LcsClusterReplaceExecStream Class Reference

Given a stream of tuples corresponding to the column values in a cluster, creates a new cluster, replacing the pre-existing cluster with the new input tuples. More...

#include <LcsClusterReplaceExecStream.h>

Inheritance diagram for LcsClusterReplaceExecStream:

LcsClusterAppendExecStream BTreeExecStream ConduitExecStream SingleOutputExecStream SingleInputExecStream SingleOutputExecStream ExecStream ExecStream ExecStream ErrorSource TraceSource ClosableObject ErrorSource TraceSource ClosableObject ErrorSource TraceSource ClosableObject List of all members.

Public Member Functions

virtual void prepare (LcsClusterReplaceExecStreamParams const &params)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void prepare (LcsClusterAppendExecStreamParams const &params)
virtual void prepare (BTreeExecStreamParams const &params)
virtual void prepare (SingleOutputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void prepare (ConduitExecStreamParams const &params)
virtual void prepare (SingleInputExecStreamParams const &params)
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void closeImpl ()
 Implements ClosableObject.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
bool isClosed () const
 
Returns:
whether the object has been closed

virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Static Public Member Functions

static SharedBTreeWriter newWriter (BTreeExecStreamParams const &params)
static void copyParamsToDescriptor (BTreeDescriptor &, BTreeParams const &, SharedCacheAccessor const &)

Protected Member Functions

void allocArrays ()
 Allocate memory for arrays.
void initLoad ()
 Initializes the load.
void loadExistingBlock ()
 Populates row and hash arrays from existing index block.
void startNewBlock ()
 Prepare to write a fresh block.
void convertTuplesToCols ()
 Given a TupleData representing all columns in a cluster, converts each column into its own TupleData.
void addValueOrdinal (uint column, uint16_t vOrd)
 Adds value ordinal to row array for new row.
bool isRowArrayFull ()
 True if row array is full.
void writeBatch (bool lastBatch)
 Writes a batch(run) to index block.
void writeBlock ()
 Writes block to index when the block is full or this is the last block in the load.
bool getLastBlock (PLcsClusterNode &pBlock)
 Gets last block written to disk so we can append to it, reading in the first rid value stored on the page.
void init ()
 Initializes and sets up object with content specific to the load that will be carried out.
ExecStreamResult compress (ExecStreamQuantum const &quantum)
 Processes rows for loading.
virtual SharedBTreeReader newReader ()
SharedBTreeWriter newWriter (bool monotonic=false)
virtual void endSearch ()
 Forgets the current reader or writer's search, releasing any page locks.
ExecStreamResult precheckConduitBuffers ()
 Checks the state of the input and output buffers.

Protected Attributes

uint blockSize
 Space available on page blocks for writing cluster data.
TupleDescriptor tableColsTupleDesc
 Tuple descriptor for the tuple representing all cluster columns across the table that this cluster is a part of.
TupleData clusterColsTupleData
 Tuple data for the tuple datums representing only this cluster.
TupleDescriptor clusterColsTupleDesc
 Tuple descriptors for the columns that are part of this cluster.
boost::scoped_array< TupleDescriptorcolTupleDesc
 Individual tuple descriptors for each column in the cluster.
SegmentAccessor scratchAccessor
 Scratch accessor for allocating large buffer pages.
ClusterPageLock bufferLock
 Lock on scratch page.
bool overwrite
 True if overwriting all existing data.
bool isDone
 Whether row count has been produced.
TupleData outputTuple
 Output tuple containing count of number of rows loaded.
TupleAccessoroutputTupleAccessor
 A reference to the output accessor contained in SingleOutputExecStream::pOutAccessor.
boost::scoped_array< FixedBufferoutputTupleBuffer
 buffer holding the outputTuple to provide to the consumers
bool compressCalled
 True if execute has been called at least once.
boost::scoped_array< LcsHashhash
 Array of hashes, one per cluster column.
uint numColumns
 Number of columns in the cluster.
boost::scoped_array< PBufferrowBlock
 Array of temporary blocks for row array.
uint nRowsMax
 Maximum number of values that can be stored in m_rowBlock.
boost::scoped_array< PBufferhashBlock
 Array of temporary blocks for hash table.
boost::scoped_array< PBufferbuilderBlock
 Array of temporary blocks used by ClusterNodeWriter.
uint rowCnt
 Number of rows loaded into the current set of batches.
bool indexBlockDirty
 True if index blocks need to be written to disk.
LcsRid firstRow
 Starting rowid in a cluster page.
LcsRid lastRow
 Last rowid in the last batch.
LcsRid startRow
SharedLcsClusterNodeWriter lcsBlockBuilder
 Page builder object.
boost::scoped_array< LcsHashValOrdhashValOrd
 Row value ordinal returned from hash, one per cluster column.
boost::scoped_array< boost::scoped_array<
FixedBuffer > > 
tempBuf
 Temporary buffers used by WriteBatch.
boost::scoped_array< uintmaxValueSize
 Max size for each column cluster used by WriteBatch.
bool arraysAlloced
 Indicates where or not we have already allocated arrays.
PLcsClusterNode pIndexBlock
 Buffer pointing to cluster page that will actually be written.
RecordNum numRowCompressed
 Total number of rows loaded by this object.
BTreeDescriptor treeDescriptor
BTreeOwnerRootMappRootMap
SharedBTreeAccessBase pBTreeAccessBase
SharedBTreeReader pBTreeReader
DynamicParamId rootPageIdParamId
SharedExecStreamBufAccessor pOutAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose
SharedExecStreamBufAccessor pInAccessor

Private Member Functions

virtual void initTupleLoadParams (const TupleProjection &inputProj)
 Initializes member fields corresponding to the data to be loaded, taking into account the extra rid column that identifies each input tuple.
virtual ExecStreamResult getTupleForLoad ()
 Retrieves the tuple that will be loaded into the cluster.
virtual void postProcessTuple ()
 Performs post-processing after a tuple has been loaded.
void readOrigClusterRow ()
 Reads the cluster columns for the current row being loaded from the original cluster.
virtual void close ()
 Writes out the last pending batches and btree pages.

Private Attributes

DynamicParamId newClusterRootParamId
 Dynamic parameter id corresponding to the root pageId of the new cluster, if the pageId is required downstream.
TupleDescriptor projInputTupleDesc
 Tuple descriptor representing the rid column plus the cluster columns to be loaded.
TupleData projInputTupleData
 Tuple data for the projected input tuple.
std::vector< UnalignedAttributeAccessorattrAccessors
 Accessors for loading column values from the original cluster.
SnapshotRandomAllocationSegmentpSnapshotSegment
 The underlying snapshot segment for the cluster.
SharedLcsClusterReader pOrigClusterReader
 Reader for the original cluster.
RecordNum origNumRows
 Number of rows in the original cluster.
PageId origRootPageId
 The rootPageId of the original rid to pageId btree map.
LcsRid currLoadRid
 The current rid being loaded.
bool needTuple
 True if a new tuple needs to be provided for the load.
LcsRid currInputRid
 The rid value of the last input row read.
TupleProjectionAccessor clusterColsTupleAccessor
 Accessor for projecting cluster tuple data from the input row.
TupleDataWithBuffer origClusterTupleData
 TupleData used to load column values from the original cluster.
bool newData
 True if at least one existing row is being replaced with a new value.

Detailed Description

Given a stream of tuples corresponding to the column values in a cluster, creates a new cluster, replacing the pre-existing cluster with the new input tuples.

Each tuple contains in its first column a rid value that identifies which row will be replaced. If there are gaps in the rid sequence, then the row corresponding to that gap will be replaced with a tuple that has the same values as the existing tuple in the original cluster at that same rid position.

After processing all input, the rid to cluster pageId btree map corresponding to the cluster is versioned off of the original btree's rootPageId. So, this execution stream requires the underlying segment corresponding to the cluster to be a snapshot segment.

Definition at line 53 of file LcsClusterReplaceExecStream.h.


Member Function Documentation

void LcsClusterReplaceExecStream::initTupleLoadParams ( const TupleProjection inputProj  )  [private, virtual]

Initializes member fields corresponding to the data to be loaded, taking into account the extra rid column that identifies each input tuple.

Parameters:
inputProj projection of the input tuple that's relevant to this cluster append

Reimplemented from LcsClusterAppendExecStream.

Definition at line 42 of file LcsClusterReplaceExecStream.cpp.

References attrAccessors, TupleProjectionAccessor::bind(), clusterColsTupleAccessor, LcsClusterAppendExecStream::clusterColsTupleData, LcsClusterAppendExecStream::clusterColsTupleDesc, LcsClusterAppendExecStream::colTupleDesc, TupleData::compute(), TupleDataWithBuffer::computeAndAllocate(), LcsClusterAppendExecStream::numColumns, origClusterTupleData, SingleInputExecStream::pInAccessor, pOrigClusterReader, TupleDescriptor::projectFrom(), projInputTupleData, projInputTupleDesc, LcsClusterAppendExecStream::tableColsTupleDesc, and BTreeExecStream::treeDescriptor.

00044 {
00045     numColumns = inputProj.size() - 1;
00046 
00047     projInputTupleDesc.projectFrom(tableColsTupleDesc, inputProj);
00048     projInputTupleData.compute(projInputTupleDesc);
00049 
00050     // Setup the cluster reader to read all columns from the original cluster
00051     // without any pre-fetch
00052     //
00053     // TODO - Extend this class to use pre-fetches when reading from the
00054     // original cluster.  This will require reading ahead from the input
00055     // stream to detect gaps in the rid values and then setting up rid runs
00056     // for each block of missing rids.
00057     pOrigClusterReader =
00058         SharedLcsClusterReader(new LcsClusterReader(treeDescriptor));
00059     TupleProjection proj;
00060     proj.resize(numColumns);
00061     for (uint i = 0; i < numColumns; i++) {
00062         proj[i] = i;
00063     }
00064     pOrigClusterReader->initColumnReaders(numColumns, proj);
00065 
00066     // Setup the objects for accessing just the cluster columns by excluding
00067     // the rid column
00068     std::copy(inputProj.begin() + 1, inputProj.end(), proj.begin());
00069     TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor();
00070     clusterColsTupleAccessor.bind(inputAccessor, proj);
00071     clusterColsTupleDesc.projectFrom(pInAccessor->getTupleDesc(), proj);
00072     clusterColsTupleData.compute(clusterColsTupleDesc);
00073 
00074     attrAccessors.resize(clusterColsTupleDesc.size());
00075     for (uint i = 0; i < clusterColsTupleDesc.size(); i++) {
00076         attrAccessors[i].compute(clusterColsTupleDesc[i]);
00077     }
00078 
00079     origClusterTupleData.computeAndAllocate(clusterColsTupleDesc);
00080 
00081     // setup one tuple descriptor per cluster column
00082     colTupleDesc.reset(new TupleDescriptor[numColumns]);
00083     for (int i = 0; i < numColumns; i++) {
00084         // +1 to skip over the rid column
00085         colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i + 1]]);
00086     }
00087 }

ExecStreamResult LcsClusterReplaceExecStream::getTupleForLoad (  )  [private, virtual]

Retrieves the tuple that will be loaded into the cluster.

The tuple either originates from the input stream or contains the original values at the current rid position being loaded.

Reimplemented from LcsClusterAppendExecStream.

Definition at line 148 of file LcsClusterReplaceExecStream.cpp.

References clusterColsTupleAccessor, LcsClusterAppendExecStream::clusterColsTupleData, BTreeBuilder::createEmptyRoot(), currInputRid, currLoadRid, EXECBUF_EOS, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, BTreeAccessBase::getRootPageId(), LcsClusterAppendExecStream::initLoad(), needTuple, newData, NULL_PAGE_ID, opaqueToInt(), origNumRows, origRootPageId, SingleInputExecStream::pInAccessor, projInputTupleData, SegmentAccessor::pSegment, pSnapshotSegment, readOrigClusterRow(), BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeExecStream::treeDescriptor, TupleProjectionAccessor::unmarshal(), and SnapshotRandomAllocationSegment::versionPage().

00149 {
00150     // If the last tuple provided has not been processed yet, then there's no
00151     // work to be done
00152     if (!needTuple) {
00153         return EXECRC_YIELD;
00154     }
00155 
00156     if (pInAccessor->getState() == EXECBUF_EOS) {
00157         // No more input rows, but that doesn't mean we're finished because
00158         // we have to match the number of rows in the original cluster.
00159         // Therefore, if there's a gap at the end of the cluster, read the
00160         // original rows until we read the rid corresponding to the last tuple
00161         // tuple in the original cluster, at which point, we can finally
00162         // say that we're done.  However, if there wasn't at least one new
00163         // row, then there's no need to replace the column.  We can simply
00164         // keep the original.
00165         if (!newData) {
00166             return EXECRC_EOS;
00167         }
00168         if (opaqueToInt(currLoadRid) < origNumRows) {
00169             readOrigClusterRow();
00170             needTuple = false;
00171             // in case this wasn't already called
00172             initLoad();
00173             return EXECRC_YIELD;
00174         } else {
00175             pSnapshotSegment->versionPage(
00176                 origRootPageId,
00177                 treeDescriptor.rootPageId);
00178             return EXECRC_EOS;
00179         }
00180     }
00181 
00182     if (!pInAccessor->demandData()) {
00183         return EXECRC_BUF_UNDERFLOW;
00184     }
00185 
00186     // Create a new rid to pageId btree map for this cluster, once we know
00187     // at least one row is being updated
00188     if (!newData) {
00189         treeDescriptor.rootPageId = NULL_PAGE_ID;
00190         BTreeBuilder builder(
00191             treeDescriptor,
00192             treeDescriptor.segmentAccessor.pSegment);
00193         builder.createEmptyRoot();
00194         treeDescriptor.rootPageId = builder.getRootPageId();
00195         newData = true;
00196     }
00197 
00198     initLoad();
00199 
00200     if (currLoadRid == LcsRid(0) || currLoadRid > currInputRid) {
00201         assert(!pInAccessor->isTupleConsumptionPending());
00202         pInAccessor->unmarshalProjectedTuple(projInputTupleData);
00203         currInputRid =
00204             *reinterpret_cast<LcsRid const *> (projInputTupleData[0].pData);
00205     }
00206 
00207     // If there's a gap between the last input tuple read and the
00208     // current row that needs to be loaded, then read the original
00209     // cluster data; otherwise, unmarshal the last input row read.
00210     if (currInputRid > currLoadRid) {
00211         readOrigClusterRow();
00212     } else {
00213         assert(currInputRid == currLoadRid);
00214         clusterColsTupleAccessor.unmarshal(clusterColsTupleData);
00215     }
00216 
00217     needTuple = false;
00218     return EXECRC_YIELD;
00219 }

void LcsClusterReplaceExecStream::postProcessTuple (  )  [private, virtual]

Performs post-processing after a tuple has been loaded.

Reimplemented from LcsClusterAppendExecStream.

Definition at line 247 of file LcsClusterReplaceExecStream.cpp.

References currInputRid, currLoadRid, needTuple, and LcsClusterAppendExecStream::postProcessTuple().

00248 {
00249     // Consume the current input tuple if we've completed processing that
00250     // input.
00251     if (currInputRid == currLoadRid) {
00252         LcsClusterAppendExecStream::postProcessTuple();
00253     }
00254     currLoadRid++;
00255     needTuple = true;
00256 }

void LcsClusterReplaceExecStream::readOrigClusterRow (  )  [private]

Reads the cluster columns for the current row being loaded from the original cluster.

Definition at line 221 of file LcsClusterReplaceExecStream.cpp.

References attrAccessors, LcsClusterAppendExecStream::clusterColsTupleData, currLoadRid, origClusterTupleData, pOrigClusterReader, and TupleDataWithBuffer::resetBuffer().

Referenced by getTupleForLoad().

00222 {
00223     origClusterTupleData.resetBuffer();
00224 
00225     // Position to the current rid we want to load.  Then read each of the
00226     // column values, load them into the TupleDataWithBuffer, and then copy
00227     // those TupleDatum's into the TupleData that's used to load the
00228     // cluster.
00229     bool needSync = true;
00230     if (pOrigClusterReader->isPositioned() &&
00231         currLoadRid < pOrigClusterReader->getRangeEndRid())
00232     {
00233         needSync = false;
00234     }
00235     bool rc = pOrigClusterReader->position(currLoadRid);
00236     assert(rc);
00237     for (uint i = 0; i < pOrigClusterReader->nColsToRead; i++) {
00238         if (needSync) {
00239             pOrigClusterReader->clusterCols[i].sync();
00240         }
00241         PBuffer colValue = pOrigClusterReader->clusterCols[i].getCurrentValue();
00242         attrAccessors[i].loadValue(origClusterTupleData[i], colValue);
00243         clusterColsTupleData[i] = origClusterTupleData[i];
00244     }
00245 }

void LcsClusterReplaceExecStream::close (  )  [private, virtual]

Writes out the last pending batches and btree pages.

Deallocates temporary memory and buffer pages. Allows resources to be freed before the execution stream is actually closed.

Reimplemented from LcsClusterAppendExecStream.

Definition at line 258 of file LcsClusterReplaceExecStream.cpp.

References LcsClusterAppendExecStream::close(), and pOrigClusterReader.

00259 {
00260     LcsClusterAppendExecStream::close();
00261     pOrigClusterReader->close();
00262 }

void LcsClusterReplaceExecStream::prepare ( LcsClusterReplaceExecStreamParams const &  params  )  [virtual]

Definition at line 31 of file LcsClusterReplaceExecStream.cpp.

References newClusterRootParamId, origRootPageId, LcsClusterAppendExecStream::prepare(), BTreeDescriptor::rootPageId, and BTreeExecStream::treeDescriptor.

00033 {
00034     LcsClusterAppendExecStream::prepare(params);
00035     newClusterRootParamId = params.rootPageIdParamId;
00036 
00037     // Save the original root pageId at prepare time because the treeDescriptor
00038     // will be reset at open time with the new cluster's rootPageId
00039     origRootPageId = treeDescriptor.rootPageId;
00040 }

void LcsClusterReplaceExecStream::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from LcsClusterAppendExecStream.

Definition at line 105 of file LcsClusterReplaceExecStream.cpp.

References TupleDatum::cbData, currInputRid, currLoadRid, SegmentFactory::getSnapshotSegment(), MAXU, needTuple, newClusterRootParamId, newData, opaqueToInt(), LcsClusterAppendExecStream::open(), origNumRows, TupleDatum::pData, ExecStream::pDynamicParamManager, SingleInputExecStream::pInAccessor, pOrigClusterReader, SegmentAccessor::pSegment, pSnapshotSegment, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, and BTreeExecStream::treeDescriptor.

00106 {
00107     newData = false;
00108 
00109     // Need to call this after the setup above because the cluster append
00110     // stream depends on the new cluster being in place
00111     LcsClusterAppendExecStream::open(restart);
00112 
00113     // Determine how many rows are in the original cluster
00114     origNumRows = pOrigClusterReader->getNumRows();
00115 
00116     if (!restart) {
00117         // Save the root pageId in a dynamic parameter so it can be read
00118         // downstream, if a parameter is specified
00119         if (opaqueToInt(newClusterRootParamId) > 0) {
00120             pDynamicParamManager->createParam(
00121                 newClusterRootParamId,
00122                 pInAccessor->getTupleDesc()[0]);
00123         }
00124 
00125         // Retrieve the snapshot segment.  This needs to be done at open time
00126         // because the segment changes across transaction boundaries.
00127         pSnapshotSegment =
00128             SegmentFactory::getSnapshotSegment(
00129                 treeDescriptor.segmentAccessor.pSegment);
00130         assert(pSnapshotSegment != NULL);
00131     }
00132 
00133     if (opaqueToInt(newClusterRootParamId) > 0) {
00134         TupleDatum rootPageIdDatum;
00135         rootPageIdDatum.pData = (PConstBuffer) &(treeDescriptor.rootPageId);
00136         rootPageIdDatum.cbData = sizeof(treeDescriptor.rootPageId);
00137         pDynamicParamManager->writeParam(
00138             newClusterRootParamId,
00139             rootPageIdDatum);
00140     }
00141 
00142     pOrigClusterReader->open();
00143     currLoadRid = LcsRid(0);
00144     currInputRid = LcsRid(MAXU);
00145     needTuple = true;
00146 }

void LcsClusterReplaceExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual]

Reimplemented from LcsClusterAppendExecStream.

Definition at line 89 of file LcsClusterReplaceExecStream.cpp.

References LcsClusterAppendExecStream::getResourceRequirements(), and ExecStreamResourceQuantity::nCachePages.

00092 {
00093     LcsClusterAppendExecStream::getResourceRequirements(
00094         minQuantity,
00095         optQuantity);
00096 
00097     // Need to allocate two more pages for the cluster reader that reads
00098     // original cluster values -- one for the rid to pageId btree and another
00099     // for the actual cluster page.
00100     minQuantity.nCachePages += 2;
00101 
00102     optQuantity = minQuantity;
00103 }

void LcsClusterAppendExecStream::allocArrays (  )  [protected, inherited]

Allocate memory for arrays.

Definition at line 714 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::arraysAlloced, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashBlock, LcsClusterAppendExecStream::hashValOrd, LcsClusterAppendExecStream::maxValueSize, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::rowBlock, and LcsClusterAppendExecStream::tempBuf.

Referenced by LcsClusterAppendExecStream::initLoad().

00715 {
00716     // allocate arrays only if they have not been allocated already
00717     if (arraysAlloced) {
00718         return;
00719     }
00720     arraysAlloced = true;
00721 
00722     // instantiate hashes
00723     hash.reset(new LcsHash[numColumns]);
00724 
00725     // allocate pointers for row, hash blocks, other arrays
00726     rowBlock.reset(new PBuffer[numColumns]);
00727     hashBlock.reset(new PBuffer[numColumns]);
00728 
00729     builderBlock.reset(new PBuffer[numColumns]);
00730 
00731     hashValOrd.reset(new LcsHashValOrd[numColumns]);
00732     tempBuf.reset(new boost::scoped_array<FixedBuffer>[numColumns]);
00733     maxValueSize.reset(new uint[numColumns]);
00734 }

void LcsClusterAppendExecStream::initLoad (  )  [protected, inherited]

Initializes the load.

This method should only be called when the input stream has data available to read.

Definition at line 285 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::allocArrays(), SegNodeLock< Node >::allocatePage(), LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::bufferLock, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::clusterColsTupleDesc, LcsClusterAppendExecStream::colTupleDesc, LcsClusterAppendExecStream::compressCalled, LcsClusterAppendExecStream::getLastBlock(), SegPageLock::getPage(), TraceSource::getSharedTraceTarget(), TraceSource::getTraceSourceName(), CachePage::getWritableData(), LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashBlock, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::nRowsMax, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::pIndexBlock, LcsClusterAppendExecStream::rowBlock, LcsClusterAppendExecStream::scratchAccessor, LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::startRow, BTreeExecStream::treeDescriptor, and SegPageLock::unlock().

Referenced by getTupleForLoad(), and LcsClusterAppendExecStream::getTupleForLoad().

00286 {
00287     // If this is the first time this method is called, then
00288     // start a new block (for the new table), or load the last block
00289     // (of the existing table).  We do this here rather than in
00290     // init() because for INSERT into T as SELECT * from T,
00291     // we need to make sure that we extract all the data from
00292     // T before modifying the blocks there; hence that's why this
00293     // method should not be called until there is input available.
00294     // Use the boolean to ensure that initialization of cluster page
00295     // is only done once.
00296 
00297     if (!compressCalled) {
00298         compressCalled = true;
00299 
00300         // The dynamic allocated memory in lcsBlockBuilder is allocated for
00301         // every LcsClusterAppendExecStream.open() and deallocated for every
00302         // LcsClusterAppendExecStream.closeImpl(). The dynamic memory is not
00303         // reused across calls(e.g. when issueing the same statement twice).
00304         lcsBlockBuilder = SharedLcsClusterNodeWriter(
00305             new LcsClusterNodeWriter(
00306                 treeDescriptor,
00307                 scratchAccessor,
00308                 clusterColsTupleDesc,
00309                 getSharedTraceTarget(),
00310                 getTraceSourceName()));
00311 
00312         allocArrays();
00313 
00314         // get blocks from cache to use as temporary space and initialize arrays
00315         for (uint i = 0; i < numColumns; i++) {
00316             bufferLock.allocatePage();
00317             rowBlock[i] = bufferLock.getPage().getWritableData();
00318             bufferLock.unlock();
00319 
00320             bufferLock.allocatePage();
00321             hashBlock[i] = bufferLock.getPage().getWritableData();
00322             bufferLock.unlock();
00323 
00324             bufferLock.allocatePage();
00325             builderBlock[i] = bufferLock.getPage().getWritableData();
00326             bufferLock.unlock();
00327 
00328             hash[i].init(
00329                 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize);
00330         }
00331 
00332         nRowsMax = blockSize / sizeof(uint16_t);
00333 
00334         // if the index exists, get last block written
00335 
00336         PLcsClusterNode pExistingIndexBlock;
00337 
00338         bool found = getLastBlock(pExistingIndexBlock);
00339         if (found) {
00340             // indicate we are updating a leaf
00341             pIndexBlock = pExistingIndexBlock;
00342 
00343             // extract rows and values from last batch so we can
00344             // add to it.
00345             loadExistingBlock();
00346         } else {
00347             // Start writing a new block
00348             startNewBlock();
00349             startRow = LcsRid(0);
00350         }
00351     }
00352 }

void LcsClusterAppendExecStream::loadExistingBlock (  )  [protected, inherited]

Populates row and hash arrays from existing index block.

Definition at line 418 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::firstRow, FixedBuffer, LcsHashValOrd::getValOrd(), LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::lastRow, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::pIndexBlock, LcsClusterAppendExecStream::rowCnt, LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::startRow, and LcsClusterAppendExecStream::writeBlock().

Referenced by LcsClusterAppendExecStream::initLoad().

00419 {
00420     boost::scoped_array<uint> numVals;      // number of values in block
00421     boost::scoped_array<uint16_t> lastValOff;
00422     boost::scoped_array<boost::scoped_array<FixedBuffer> > aLeftOverBufs;
00423                                             // array of buffers to hold
00424                                             // rolled back data for each
00425                                             // column
00426     uint anLeftOvers;                       // number of leftover rows for
00427                                             // each col; since the value is
00428                                             // same for every column, no need
00429                                             // for this to be an array
00430     boost::scoped_array<uint> aiFixedSize;  // how much space was used for
00431                                             // each column; should be
00432                                             // equal for each value in a column
00433     LcsHashValOrd vOrd;
00434 
00435     uint i, j;
00436     RecordNum startRowCnt;
00437     RecordNum nrows;
00438 
00439     lcsBlockBuilder->init(
00440         numColumns, reinterpret_cast<uint8_t *> (pIndexBlock),
00441         builderBlock.get(), blockSize);
00442 
00443     lastValOff.reset(new uint16_t[numColumns]);
00444     numVals.reset(new uint[numColumns]);
00445 
00446     // REVIEW jvs 28-Nov-2005:  A simpler approach to this whole problem
00447     // might be to pretend we were starting an entirely new block,
00448     // use an LcsClusterReader to read the old one logically and append
00449     // the old rows into the new block, and then carry on from there with
00450     // the new rows.
00451 
00452     // Append to an existing cluster page.  Set the last rowid based on
00453     // the first rowid and the number of rows currently on the page.
00454     // As rows are "rolled back", lastRow is decremented accordingly
00455 
00456     bool bStartNewBlock =
00457         lcsBlockBuilder->openAppend(numVals.get(), lastValOff.get(), nrows);
00458     lastRow = firstRow + nrows;
00459     startRow = lastRow;
00460 
00461     // Setup structures to hold rolled back information
00462     aiFixedSize.reset(new uint[numColumns]);
00463     aLeftOverBufs.reset(new boost::scoped_array<FixedBuffer>[numColumns]);
00464 
00465     startRowCnt = rowCnt;
00466 
00467     // Rollback the final batch for each column
00468     // We need to rollback all
00469     // the batches before we can start the new batches because
00470     //  1) in openAppend() we adjust m_szLeft to not include space
00471     //     for numColumns * sizeof(RIBatch).  So if the
00472     //     block was full, then m_szLeft would be negative,
00473     //     since we decreased it by numColumns * sizeof(LcsBatch)
00474     //  2) the rollback code will add sizeof(LcsBatch) to szLeft
00475     //                      for each batch it rolls back
00476     //  3) the code to add values to a batch gets upset if
00477     //                      szLeft < 0
00478     for (i = 0; i < numColumns; i++) {
00479         //reset everytime through loop
00480         rowCnt = startRowCnt;
00481         lcsBlockBuilder->describeLastBatch(i, anLeftOvers, aiFixedSize[i]);
00482 
00483         // if we have left overs from the last batch (ie. batch did not end on
00484         // an 8 boundary), rollback and store in temporary mem
00485         // aLeftOverBufs[i]
00486         if (anLeftOvers > 0) {
00487             aLeftOverBufs[i].reset(
00488                 new FixedBuffer[anLeftOvers * aiFixedSize[i]]);
00489             lcsBlockBuilder->rollBackLastBatch(i, aLeftOverBufs[i].get());
00490             indexBlockDirty = true;
00491         }
00492     }
00493 
00494     // Decrement lastRow if there was a rollback of the last batch
00495     lastRow -= anLeftOvers;
00496 
00497     // If the last page is already full, then write it out and start a new one
00498     if (bStartNewBlock) {
00499         writeBlock();
00500         startNewBlock();
00501     }
00502 
00503     // Start a new batch for each column.
00504     for (i = 0; i < numColumns; i++) {
00505         //reset everytime through loop
00506         rowCnt = startRowCnt;
00507 
00508         if (!bStartNewBlock) {
00509             // Repopulate the hash table with the values already in the
00510             // data segment at the bottom of the block (because we didn't
00511             // roll back these values, we only roll back the pointers to
00512             // these values).  But only do this if we haven't started a new
00513             // block.
00514             hash[i].restore(numVals[i], lastValOff[i]);
00515         }
00516 
00517         // if we had left overs from the last batch, start a new batch
00518         // NOTE: we are guaranteed to be able to add these values back
00519         // to the current block
00520         if (anLeftOvers > 0) {
00521             uint8_t *val;
00522             bool undoInsert = false;
00523 
00524             // There is a very small probability that when the hash is
00525             // restored, using the existing values in the block, that the
00526             // hash will be full and some of the left over values
00527             // can not be stored in the hash.  If this is true then clear
00528             // the hash.
00529             if (hash[i].isHashFull(anLeftOvers)) {
00530                 hash[i].startNewBatch(anLeftOvers);
00531             }
00532 
00533             for (j = 0, val = aLeftOverBufs[i].get();
00534                 j < anLeftOvers;
00535                 j++, val += aiFixedSize[i])
00536             {
00537                 hash[i].insert(val, &vOrd, &undoInsert);
00538 
00539                 //If we have left overs they should fit in the block
00540                 assert(!undoInsert);
00541                 addValueOrdinal(i, vOrd.getValOrd());
00542                 rowCnt++;
00543             }
00544         }
00545     }
00546 }

void LcsClusterAppendExecStream::startNewBlock (  )  [protected, inherited]

Prepare to write a fresh block.

Definition at line 378 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::colTupleDesc, LcsClusterAppendExecStream::firstRow, LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashBlock, LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::lastRow, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::pIndexBlock, and LcsClusterAppendExecStream::rowCnt.

Referenced by LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::writeBatch().

00379 {
00380     firstRow = lastRow;
00381 
00382     // Get a new cluster page from the btree segment
00383     pIndexBlock = lcsBlockBuilder->allocateClusterPage(firstRow);
00384 
00385     // Reset index block and block builder.
00386     lcsBlockBuilder->init(
00387         numColumns, reinterpret_cast<uint8_t *> (pIndexBlock),
00388         builderBlock.get(), blockSize);
00389 
00390     // reset Hashes
00391     for (uint i = 0; i < numColumns; i++) {
00392         hash[i].init(
00393             hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize);
00394     }
00395 
00396     // reset row count
00397     // NOTE:  if the rowCnt is less than eight then we know we are carrying
00398     //        over rows from previous block because the count did not end
00399     //        on a boundary of 8
00400     if (rowCnt >= 8) {
00401         rowCnt = 0;
00402     }
00403     indexBlockDirty = false;
00404 
00405     // Start writing a new block.
00406     lcsBlockBuilder->openNew(firstRow);
00407 }

void LcsClusterAppendExecStream::convertTuplesToCols (  )  [protected, inherited]

Given a TupleData representing all columns in a cluster, converts each column into its own TupleData.

void LcsClusterAppendExecStream::addValueOrdinal ( uint  column,
uint16_t  vOrd 
) [protected, inherited]

Adds value ordinal to row array for new row.

Definition at line 548 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::rowBlock, and LcsClusterAppendExecStream::rowCnt.

Referenced by LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::writeBatch().

00549 {
00550     uint16_t *rowWordArray = (uint16_t *) rowBlock[column];
00551     rowWordArray[rowCnt] = vOrd;
00552 
00553     // since we added a row mark block as dirty
00554     indexBlockDirty = true;
00555 }

bool LcsClusterAppendExecStream::isRowArrayFull (  )  [protected, inherited]

True if row array is full.

Definition at line 557 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::nRowsMax, and LcsClusterAppendExecStream::rowCnt.

Referenced by LcsClusterAppendExecStream::compress().

00558 {
00559     if (rowCnt >= nRowsMax) {
00560         return true;
00561     } else {
00562         return false;
00563     }
00564 }

void LcsClusterAppendExecStream::writeBatch ( bool  lastBatch  )  [protected, inherited]

Writes a batch(run) to index block.

Batches have a multiple of 8 rows.

Parameters:
lastBatch true if last batch

Definition at line 566 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::addValueOrdinal(), count(), FixedBuffer, LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::lastRow, LCS_FIXED, LCS_VARIABLE, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::maxValueSize, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::rowBlock, LcsClusterAppendExecStream::rowCnt, LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::tempBuf, and LcsClusterAppendExecStream::writeBlock().

Referenced by LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::writeBlock().

00567 {
00568     uint16_t *oVals;
00569     uint leftOvers;
00570     PBuffer val;
00571     LcsBatchMode mode;
00572     uint i, j;
00573     uint origRowCnt, count = 0;
00574 
00575     lastRow += rowCnt;
00576 
00577     for (origRowCnt = rowCnt, i = 0; i < numColumns; i++) {
00578         rowCnt = origRowCnt;
00579 
00580         // save max value size so we can read leftovers
00581         maxValueSize[i] = hash[i].getMaxValueSize();
00582 
00583         // Pick which compression mode to use (fixed, variable, or compressed)
00584         lcsBlockBuilder->pickCompressionMode(
00585             i, maxValueSize[i], rowCnt, &oVals, mode);
00586         leftOvers = rowCnt > 8 ? rowCnt % 8 : 0;
00587 
00588         // all batches must end on an eight boundary so we move
00589         // values over eight boundary to the next batch.
00590         // if there are leftOvers or if the there are less than
00591         // eight values in this batch allocate buffer to store
00592         // values to be written to next batch
00593         if (leftOvers) {
00594             tempBuf[i].reset(new FixedBuffer[leftOvers * maxValueSize[i]]);
00595             count = leftOvers;
00596 
00597         } else if (origRowCnt < 8) {
00598             tempBuf[i].reset(new FixedBuffer[origRowCnt * maxValueSize[i]]);
00599             count = origRowCnt;
00600         } else {
00601             // no values to write to next batch (ie on boundary of 8)
00602             tempBuf[i].reset();
00603         }
00604 
00605         // Write out the batch and collect the leftover rows in tempBuf
00606         if (LCS_FIXED == mode || LCS_VARIABLE == mode) {
00607             hash[i].prepareFixedOrVariableBatch(
00608                 (PBuffer) rowBlock[i], rowCnt);
00609             lcsBlockBuilder->putFixedVarBatch(
00610                 i, (uint16_t *) rowBlock[i], tempBuf[i].get());
00611             if (mode == LCS_FIXED) {
00612                 hash[i].clearFixedEntries();
00613             }
00614 
00615         } else {
00616             uint16_t numVals;
00617 
00618             // write orderVals to oVals and remap val ords in row array
00619             hash[i].prepareCompressedBatch(
00620                 (PBuffer) rowBlock[i], rowCnt, (uint16_t *) &numVals, oVals);
00621             lcsBlockBuilder->putCompressedBatch(
00622                 i, (PBuffer) rowBlock[i], tempBuf[i].get());
00623         }
00624 
00625         // setup next batch
00626         rowCnt = 0;
00627         hash[i].startNewBatch(!lastBatch ? count : 0);
00628     }
00629 
00630     //compensate for left over and rolled back rows
00631     if (!lastBatch) {
00632         lastRow -= count;
00633     }
00634     bool bStartNewBlock;
00635     bStartNewBlock = false;
00636 
00637     // If we couldn't even fit 8 values into the batch (and this is not the
00638     // final batch), then the block must be full.  putCompressedBatch()/
00639     // putFixedVarBatch() assumed that this was the last batch, so they wrote
00640     // out these rows in a small batch.  Roll back the entire batch (putting
00641     // rolled back results in tempBuf) and move to next block
00642     if (!lastBatch && origRowCnt < 8) {
00643         // rollback each batch
00644         for (i = 0; i < numColumns; i++) {
00645             lcsBlockBuilder->rollBackLastBatch(i, tempBuf[i].get());
00646         }
00647         bStartNewBlock = true;
00648     }
00649 
00650     // Should we move to a new block?  Move if
00651     //    (a) bStartNewBlock (we need to move just to write the current batch)
00652     // or (b) lcsBlockBuilder->isEndOfBlock() (there isn't room to even start
00653     // the next batch)
00654     if (bStartNewBlock || (!lastBatch && lcsBlockBuilder->isEndOfBlock())) {
00655         writeBlock();
00656         startNewBlock();
00657     }
00658 
00659     // Add leftOvers or rolled back values to new batch
00660     if (!lastBatch) {
00661         for (i = 0; i < numColumns; i++) {
00662             rowCnt = 0;
00663             for (j = 0, val = tempBuf[i].get(); j < count; j++) {
00664                 LcsHashValOrd vOrd;
00665                 bool undoInsert = false;
00666 
00667                 hash[i].insert(val, &vOrd, &undoInsert);
00668 
00669                 // If we have leftovers they should fit in the current block
00670                 // (because we moved to a new block above, if it was necessary)
00671                 assert(!undoInsert);
00672                 addValueOrdinal(i, vOrd.getValOrd());
00673                 rowCnt++;
00674                 val += maxValueSize[i];
00675             }
00676         }
00677     }
00678 
00679     for (i = 0; i < numColumns; i++) {
00680         if (tempBuf[i].get()) {
00681             tempBuf[i].reset();
00682         }
00683     }
00684 }

void LcsClusterAppendExecStream::writeBlock (  )  [protected, inherited]

Writes block to index when the block is full or this is the last block in the load.

Definition at line 686 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::rowCnt, and LcsClusterAppendExecStream::writeBatch().

Referenced by LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::writeBatch().

00687 {
00688     if (indexBlockDirty) {
00689         // If the rowCnt is not zero, then the last batch was not on
00690         // a boundary of 8 so we need to write the last batch
00691         if (rowCnt) {
00692             writeBatch(true);
00693 
00694             // REVIEW jvs 28-Nov-2005:  it must be possible to eliminate
00695             // this circularity between writeBlock and writeBatch.
00696 
00697             // Handle corner case. writeBatch may have written this block
00698             // to the btree
00699             if (!indexBlockDirty) {
00700                 return;
00701             }
00702         }
00703 
00704         // Tell block builder we are done so it can wrap up writing to the
00705         // index block
00706         lcsBlockBuilder->endBlock();
00707 
00708         // Dump out the page contents to trace if appropriate
00709 
00710         indexBlockDirty = false;
00711     }
00712 }

bool LcsClusterAppendExecStream::getLastBlock ( PLcsClusterNode pBlock  )  [protected, inherited]

Gets last block written to disk so we can append to it, reading in the first rid value stored on the page.

Parameters:
pBlock returns pointer to last cluster block
Returns:
true if cluster is non-empty

Definition at line 409 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::firstRow, and LcsClusterAppendExecStream::lcsBlockBuilder.

Referenced by LcsClusterAppendExecStream::initLoad().

00410 {
00411     if (!lcsBlockBuilder->getLastClusterPageForWrite(pBlock, firstRow)) {
00412         return false;
00413     } else {
00414         return true;
00415     }
00416 }

void LcsClusterAppendExecStream::init (  )  [protected, inherited]

Initializes and sets up object with content specific to the load that will be carried out.

Definition at line 131 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::arraysAlloced, LcsClusterAppendExecStream::compressCalled, LcsClusterAppendExecStream::firstRow, LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::lastRow, LcsClusterAppendExecStream::numRowCompressed, LcsClusterAppendExecStream::pIndexBlock, LcsClusterAppendExecStream::rowCnt, and LcsClusterAppendExecStream::startRow.

Referenced by LcsClusterAppendExecStream::open(), and LcsClusterAppendExecStream::startNewBlock().

00132 {
00133     pIndexBlock = 0;
00134     firstRow = LcsRid(0);
00135     lastRow = LcsRid(0);
00136     startRow = LcsRid(0);
00137     rowCnt = 0;
00138     indexBlockDirty = false;
00139     arraysAlloced = false;
00140     compressCalled = false;
00141     numRowCompressed = 0;
00142 }

ExecStreamResult LcsClusterAppendExecStream::compress ( ExecStreamQuantum const &  quantum  )  [protected, inherited]

Processes rows for loading.

Calls WriteBatch once values cannot fit into a page

Parameters:
quantum ExecStream quantum
Returns:
ExecStreamResult value

Definition at line 144 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::clusterColsTupleData, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, TupleAccessor::getCurrentByteCount(), LcsClusterAppendExecStream::getTupleForLoad(), LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashValOrd, LcsClusterAppendExecStream::isDone, LcsClusterAppendExecStream::isRowArrayFull(), LcsClusterAppendExecStream::lcsBlockBuilder, TupleAccessor::marshal(), ExecStreamQuantum::nTuplesMax, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::outputTuple, LcsClusterAppendExecStream::outputTupleAccessor, LcsClusterAppendExecStream::outputTupleBuffer, LcsClusterAppendExecStream::postProcessTuple(), SingleOutputExecStream::pOutAccessor, LcsClusterAppendExecStream::rowCnt, LcsClusterAppendExecStream::writeBatch(), and LcsClusterAppendExecStream::writeBlock().

Referenced by LcsClusterAppendExecStream::execute().

00146 {
00147     uint i, j, k;
00148     bool canFit = false;
00149     bool undoInsert = false;
00150 
00151     if (isDone) {
00152         // already returned final result
00153         pOutAccessor->markEOS();
00154         return EXECRC_EOS;
00155     }
00156 
00157     for (i = 0; i < quantum.nTuplesMax; i++) {
00158         // if we have finished processing the previous row, retrieve
00159         // the next cluster tuple and then convert the columns in the
00160         // cluster into individual tuples, one per cluster column
00161         ExecStreamResult rc = getTupleForLoad();
00162 
00163         // no more input; produce final row count
00164         if (rc == EXECRC_EOS) {
00165             // since we're done adding rows to the index, write the last batch
00166             // and block
00167             if (rowCnt) {
00168                 // if rowCnt < 8 or a multiple of 8, force writeBatch to
00169                 // treat this as the last batch
00170                 if (rowCnt < 8 || (rowCnt % 8) == 0) {
00171                     writeBatch(true);
00172                 } else {
00173                     writeBatch(false);
00174                 }
00175             }
00176 
00177             // Write out the last block and then free up resources
00178             // rather than waiting until stream close. This will keep
00179             // resource usage window smaller and avoid interference with
00180             // downstream processing such as writing to unclustered indexes.
00181             writeBlock();
00182             if (lcsBlockBuilder) {
00183                 lcsBlockBuilder->close();
00184             }
00185             close();
00186 
00187             // outputTuple was already initialized to point to numRowCompressed/
00188             // startRow in prepare()
00189             // Write a single outputTuple(numRowCompressed, [startRow])
00190             // and indicate OVERFLOW.
00191 
00192             outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get());
00193             pOutAccessor->provideBufferForConsumption(
00194                 outputTupleBuffer.get(),
00195                 outputTupleBuffer.get() +
00196                     outputTupleAccessor->getCurrentByteCount());
00197 
00198             isDone = true;
00199             return EXECRC_BUF_OVERFLOW;
00200         } else if (rc != EXECRC_YIELD) {
00201             return rc;
00202         }
00203 
00204         // Go through each column value for current row and insert it.
00205         // If we run out of space then rollback all the columns that
00206         // I already inserted.
00207         undoInsert = false;
00208 
00209         for (j = 0; j < numColumns; j++) {
00210             hash[j].insert(
00211                 clusterColsTupleData[j], &hashValOrd[j], &undoInsert);
00212 
00213             if (undoInsert) {
00214                 // rollback cluster columns already inserted
00215                 // j has not been incremented yet, so the condition should be
00216                 //     k <= j
00217                 for (k = 0; k <= j; k++) {
00218                     hash[k].undoInsert(clusterColsTupleData[k]);
00219                 }
00220                 break;
00221             }
00222         }
00223 
00224         // Was there enough space to add this row?  Note that the Insert()
00225         // calls above accounted for the space needed by addValueOrdinal()
00226         // below, so we don't have to worry about addValueOrdinal() running
00227         // out of space
00228         if (!undoInsert) {
00229             canFit = true;
00230         } else {
00231             canFit = false;
00232         }
00233 
00234         if (canFit) {
00235             // Add the pointers from the batch to the data values
00236             for (j = 0; j < numColumns; j++) {
00237                 addValueOrdinal(j, hashValOrd[j].getValOrd());
00238             }
00239 
00240             rowCnt++;
00241 
00242             // if reach max rows that can fit in row array then write batch
00243             if (isRowArrayFull()) {
00244                 writeBatch(false);
00245             }
00246         } else {
00247             // since we can't fit anymore values write out current batch
00248             writeBatch(false);
00249 
00250             // restart using last value retrieved from stream because it
00251             // could not fit in the batch; by continuing we can avoid
00252             // a goto to jump back to the top of this for loop at the
00253             // expense of a harmless increment of the quantum
00254             continue;
00255         }
00256 
00257         // only consume the tuple after we know the row can fit
00258         // on the current page
00259         postProcessTuple();
00260     }
00261 
00262     return EXECRC_QUANTUM_EXPIRED;
00263 }

void LcsClusterAppendExecStream::prepare ( LcsClusterAppendExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 31 of file LcsClusterAppendExecStream.cpp.

References SegPageLock::accessSegment(), LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::bufferLock, TupleData::compute(), LcsClusterAppendExecStream::initTupleLoadParams(), LcsClusterAppendExecStreamParams::inputProj, LcsClusterAppendExecStream::numRowCompressed, LcsClusterAppendExecStream::outputTuple, LcsClusterAppendExecStream::outputTupleAccessor, SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, ConduitExecStream::prepare(), BTreeExecStream::prepare(), SegmentAccessor::pSegment, ExecStreamParams::scratchAccessor, LcsClusterAppendExecStream::scratchAccessor, BTreeDescriptor::segmentAccessor, LcsClusterAppendExecStream::startRow, LcsClusterAppendExecStream::tableColsTupleDesc, and BTreeExecStream::treeDescriptor.

Referenced by prepare().

00033 {
00034     BTreeExecStream::prepare(params);
00035     ConduitExecStream::prepare(params);
00036 
00037     tableColsTupleDesc = pInAccessor->getTupleDesc();
00038     initTupleLoadParams(params.inputProj);
00039 
00040     // setup descriptors, accessors and data to access only the columns
00041     // for this cluster, based on the input projection
00042 
00043     pInAccessor->bindProjection(params.inputProj);
00044 
00045     // setup bufferLock to access temporary large page blocks
00046 
00047     scratchAccessor = params.scratchAccessor;
00048     bufferLock.accessSegment(scratchAccessor);
00049 
00050     // The output stream from the loader is either a single column representing
00051     // the number of rows loaded or two columns -- number of rows loaded and
00052     // starting rid value.  The latter applies when there are
00053     // downstream create indexes
00054 
00055     TupleDescriptor outputTupleDesc;
00056 
00057     outputTupleDesc = pOutAccessor->getTupleDesc();
00058     outputTuple.compute(outputTupleDesc);
00059     outputTuple[0].pData = (PConstBuffer) &numRowCompressed;
00060     if (outputTupleDesc.size() > 1) {
00061         outputTuple[1].pData = (PConstBuffer) &startRow;
00062     }
00063 
00064     outputTupleAccessor = & pOutAccessor->getScratchTupleAccessor();
00065 
00066     blockSize = treeDescriptor.segmentAccessor.pSegment->getUsablePageSize();
00067 
00068 }

void BTreeExecStream::prepare ( BTreeExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 30 of file BTreeExecStream.cpp.

References BTreeExecStream::copyParamsToDescriptor(), ExecStreamParams::pCacheAccessor, SingleOutputExecStream::prepare(), BTreeParams::pRootMap, BTreeExecStream::pRootMap, BTreeParams::rootPageIdParamId, BTreeExecStream::rootPageIdParamId, ExecStreamParams::scratchAccessor, BTreeExecStream::scratchAccessor, and BTreeExecStream::treeDescriptor.

Referenced by LcsClusterAppendExecStream::prepare(), LbmGeneratorExecStream::prepare(), BTreeReadExecStream::prepare(), and BTreeInsertExecStream::prepare().

00031 {
00032     SingleOutputExecStream::prepare(params);
00033 
00034     copyParamsToDescriptor(treeDescriptor,params,params.pCacheAccessor);
00035     scratchAccessor = params.scratchAccessor;
00036     pRootMap = params.pRootMap;
00037     rootPageIdParamId = params.rootPageIdParamId;
00038 }

void SingleOutputExecStream::prepare ( SingleOutputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 48 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().

Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().

00049 {
00050     ExecStream::prepare(params);
00051     assert(pOutAccessor);
00052     assert(pOutAccessor->getProvision() == getOutputBufProvision());
00053     if (pOutAccessor->getTupleDesc().empty()) {
00054         assert(!params.outputTupleDesc.empty());
00055         pOutAccessor->setTupleShape(
00056             params.outputTupleDesc,
00057             params.outputTupleFormat);
00058     }
00059 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void ConduitExecStream::prepare ( ConduitExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 42 of file ConduitExecStream.cpp.

References SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

Referenced by ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmNormalizerExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SegBufferReaderExecStream::prepare(), SegBufferExecStream::prepare(), ScratchBufferExecStream::prepare(), ReshapeExecStream::prepare(), DoubleBufferExecStream::prepare(), CopyExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), and CalcExecStream::prepare().

00043 {
00044     SingleInputExecStream::prepare(params);
00045 
00046     if (params.outputTupleDesc.empty()) {
00047         pOutAccessor->setTupleShape(
00048             pInAccessor->getTupleDesc(),
00049             pInAccessor->getTupleFormat());
00050     }
00051 
00052     SingleOutputExecStream::prepare(params);
00053 }

void SingleInputExecStream::prepare ( SingleInputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 44 of file SingleInputExecStream.cpp.

References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().

Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().

00045 {
00046     ExecStream::prepare(params);
00047 
00048     assert(pInAccessor);
00049     assert(pInAccessor->getProvision() == getInputBufProvision());
00050 }

ExecStreamResult LcsClusterAppendExecStream::execute ( ExecStreamQuantum const &  quantum  )  [virtual, inherited]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 117 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::compress().

00119 {
00120     return compress(quantum);
00121 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual, inherited]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 93 of file ExecStream.cpp.

References EXEC_RESOURCE_ACCURATE.

Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().

00097 {
00098     getResourceRequirements(minQuantity, optQuantity);
00099     optType = EXEC_RESOURCE_ACCURATE;
00100 }

void LcsClusterAppendExecStream::closeImpl (  )  [virtual, inherited]

Implements ClosableObject.

ExecStream implementations may override this to release any resources acquired while open.

Reimplemented from BTreeExecStream.

Definition at line 123 of file LcsClusterAppendExecStream.cpp.

References LcsClusterAppendExecStream::close(), ExecStream::closeImpl(), BTreeExecStream::closeImpl(), and LcsClusterAppendExecStream::outputTupleBuffer.

00124 {
00125     BTreeExecStream::closeImpl();
00126     ConduitExecStream::closeImpl();
00127     outputTupleBuffer.reset();
00128     close();
00129 }

ExecStreamBufProvision LcsClusterAppendExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented from SingleOutputExecStream.

Definition at line 737 of file LcsClusterAppendExecStream.cpp.

References BUFPROV_PRODUCER.

00738 {
00739     return BUFPROV_PRODUCER;
00740 }

SharedBTreeReader BTreeExecStream::newReader (  )  [protected, virtual, inherited]

Reimplemented in BTreePrefetchSearchExecStream.

Definition at line 67 of file BTreeExecStream.cpp.

References BTreeExecStream::pBTreeAccessBase, BTreeExecStream::pBTreeReader, and BTreeExecStream::treeDescriptor.

Referenced by BTreeReadExecStream::open().

00068 {
00069     SharedBTreeReader pReader = SharedBTreeReader(
00070         new BTreeReader(treeDescriptor));
00071     pBTreeAccessBase = pBTreeReader = pReader;
00072     return pReader;
00073 }

SharedBTreeWriter BTreeExecStream::newWriter ( bool  monotonic = false  )  [protected, inherited]

Definition at line 75 of file BTreeExecStream.cpp.

References BTreeExecStream::pBTreeAccessBase, BTreeExecStream::pBTreeReader, BTreeExecStream::scratchAccessor, and BTreeExecStream::treeDescriptor.

Referenced by FtrsTableWriter::createIndexWriter(), and BTreeInsertExecStream::open().

00076 {
00077     SharedBTreeWriter pWriter = SharedBTreeWriter(
00078         new BTreeWriter(treeDescriptor,scratchAccessor,monotonic));
00079     pBTreeAccessBase = pBTreeReader = pWriter;
00080     return pWriter;
00081 }

SharedBTreeWriter BTreeExecStream::newWriter ( BTreeExecStreamParams const &  params  )  [static, inherited]

Definition at line 83 of file BTreeExecStream.cpp.

References BTreeExecStream::copyParamsToDescriptor(), ExecStreamParams::pCacheAccessor, ExecStreamParams::scratchAccessor, and BTreeExecStream::treeDescriptor.

00085 {
00086     BTreeDescriptor treeDescriptor;
00087     copyParamsToDescriptor(treeDescriptor,params,params.pCacheAccessor);
00088     return SharedBTreeWriter(
00089         new BTreeWriter(
00090             treeDescriptor,params.scratchAccessor));
00091 }

void BTreeExecStream::endSearch (  )  [protected, virtual, inherited]

Forgets the current reader or writer's search, releasing any page locks.

Definition at line 107 of file BTreeExecStream.cpp.

References BTreeExecStream::pBTreeReader.

Referenced by BTreeExecStream::closeImpl(), and BTreeExecStream::open().

00108 {
00109     if (pBTreeReader && pBTreeReader->isSingular() == false) {
00110         pBTreeReader->endSearch();
00111     }
00112 }

void BTreeExecStream::copyParamsToDescriptor ( BTreeDescriptor ,
BTreeParams const &  ,
SharedCacheAccessor const &   
) [static, inherited]

Definition at line 93 of file BTreeExecStream.cpp.

References BTreeParams::keyProj, BTreeDescriptor::keyProjection, BTreeParams::pageOwnerId, BTreeDescriptor::pageOwnerId, SegmentAccessor::pCacheAccessor, BTreeParams::pSegment, SegmentAccessor::pSegment, BTreeParams::rootPageId, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeParams::segmentId, BTreeDescriptor::segmentId, BTreeExecStream::treeDescriptor, BTreeParams::tupleDesc, and BTreeDescriptor::tupleDescriptor.

Referenced by BTreeExecStream::newWriter(), LbmSplicerExecStream::prepare(), and BTreeExecStream::prepare().

00097 {
00098     treeDescriptor.segmentAccessor.pSegment = params.pSegment;
00099     treeDescriptor.segmentAccessor.pCacheAccessor = pCacheAccessor;
00100     treeDescriptor.tupleDescriptor = params.tupleDesc;
00101     treeDescriptor.keyProjection = params.keyProj;
00102     treeDescriptor.rootPageId = params.rootPageId;
00103     treeDescriptor.segmentId = params.segmentId;
00104     treeDescriptor.pageOwnerId = params.pageOwnerId;
00105 }

void SingleOutputExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Implements ExecStream.

Reimplemented in ConduitExecStream, and ConfluenceExecStream.

Definition at line 35 of file SingleOutputExecStream.cpp.

00037 {
00038     assert(inAccessors.size() == 0);
00039 }

void ConduitExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Reimplemented from SingleInputExecStream.

Definition at line 30 of file ConduitExecStream.cpp.

References SingleInputExecStream::setInputBufAccessors().

00032 {
00033     SingleInputExecStream::setInputBufAccessors(inAccessors);
00034 }

void SingleOutputExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Implements ExecStream.

Reimplemented in ConduitExecStream.

Definition at line 41 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::pOutAccessor.

Referenced by ConduitExecStream::setOutputBufAccessors().

00043 {
00044     assert(outAccessors.size() == 1);
00045     pOutAccessor = outAccessors[0];
00046 }

void ConduitExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Reimplemented from SingleInputExecStream.

Definition at line 36 of file ConduitExecStream.cpp.

References SingleOutputExecStream::setOutputBufAccessors().

00038 {
00039     SingleOutputExecStream::setOutputBufAccessors(outAccessors);
00040 }

bool ExecStream::canEarlyClose (  )  [virtual, inherited]

Returns:
true if the stream can be closed early

Reimplemented in SegBufferWriterExecStream.

Definition at line 49 of file ExecStream.cpp.

00050 {
00051     return true;
00052 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual, inherited]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 111 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.

Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().

00113 {
00114     resourceAllocation = quantity;
00115     if (pQuotaAccessor) {
00116         pQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00117     }
00118     if (pScratchQuotaAccessor) {
00119         pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00120     }
00121 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

ExecStreamBufProvision ExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented in ConfluenceExecStream, DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferWriterExecStream, SingleInputExecStream, and JavaTransformExecStream.

Definition at line 182 of file ExecStream.cpp.

References BUFPROV_NONE.

00183 {
00184     return BUFPROV_NONE;
00185 }

ExecStreamBufProvision SingleInputExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, and SegBufferWriterExecStream.

Definition at line 62 of file SingleInputExecStream.cpp.

References BUFPROV_PRODUCER.

Referenced by SingleInputExecStream::prepare().

00063 {
00064     return BUFPROV_PRODUCER;
00065 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }

ExecStreamResult ConduitExecStream::precheckConduitBuffers (  )  [protected, inherited]

Checks the state of the input and output buffers.

If input empty, requests production. If input EOS, propagates that to output buffer. If output full, returns EXECRC_OVERFLOW.

Returns:
result of precheck; anything but EXECRC_YIELD indicates that execution should terminate immediately with returned code

Definition at line 61 of file ConduitExecStream.cpp.

References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, SingleInputExecStream::pInAccessor, and SingleOutputExecStream::pOutAccessor.

Referenced by ExternalSortExecStreamImpl::execute(), FtrsTableWriterExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreeInsertExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), ReshapeExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), and CalcExecStream::execute().

00062 {
00063     switch (pInAccessor->getState()) {
00064     case EXECBUF_EMPTY:
00065         pInAccessor->requestProduction();
00066         return EXECRC_BUF_UNDERFLOW;
00067     case EXECBUF_UNDERFLOW:
00068         return EXECRC_BUF_UNDERFLOW;
00069     case EXECBUF_EOS:
00070         pOutAccessor->markEOS();
00071         return EXECRC_EOS;
00072     case EXECBUF_NONEMPTY:
00073     case EXECBUF_OVERFLOW:
00074         break;
00075     default:
00076         permAssert(false);
00077     }
00078     if (pOutAccessor->getState() == EXECBUF_OVERFLOW) {
00079         return EXECRC_BUF_OVERFLOW;
00080     }
00081     return EXECRC_YIELD;
00082 }


Member Data Documentation

DynamicParamId LcsClusterReplaceExecStream::newClusterRootParamId [private]

Dynamic parameter id corresponding to the root pageId of the new cluster, if the pageId is required downstream.

Set to 0 if there's no need for the parameter.

Definition at line 61 of file LcsClusterReplaceExecStream.h.

Referenced by open(), and prepare().

TupleDescriptor LcsClusterReplaceExecStream::projInputTupleDesc [private]

Tuple descriptor representing the rid column plus the cluster columns to be loaded.

Definition at line 67 of file LcsClusterReplaceExecStream.h.

Referenced by initTupleLoadParams().

TupleData LcsClusterReplaceExecStream::projInputTupleData [private]

Tuple data for the projected input tuple.

Definition at line 72 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), and initTupleLoadParams().

std::vector<UnalignedAttributeAccessor> LcsClusterReplaceExecStream::attrAccessors [private]

Accessors for loading column values from the original cluster.

Definition at line 77 of file LcsClusterReplaceExecStream.h.

Referenced by initTupleLoadParams(), and readOrigClusterRow().

SnapshotRandomAllocationSegment* LcsClusterReplaceExecStream::pSnapshotSegment [private]

The underlying snapshot segment for the cluster.

Definition at line 82 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), and open().

SharedLcsClusterReader LcsClusterReplaceExecStream::pOrigClusterReader [private]

Reader for the original cluster.

Definition at line 87 of file LcsClusterReplaceExecStream.h.

Referenced by close(), initTupleLoadParams(), open(), and readOrigClusterRow().

RecordNum LcsClusterReplaceExecStream::origNumRows [private]

Number of rows in the original cluster.

Definition at line 92 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), and open().

PageId LcsClusterReplaceExecStream::origRootPageId [private]

The rootPageId of the original rid to pageId btree map.

Definition at line 97 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), and prepare().

LcsRid LcsClusterReplaceExecStream::currLoadRid [private]

The current rid being loaded.

Definition at line 102 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), open(), postProcessTuple(), and readOrigClusterRow().

bool LcsClusterReplaceExecStream::needTuple [private]

True if a new tuple needs to be provided for the load.

Definition at line 107 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), open(), and postProcessTuple().

LcsRid LcsClusterReplaceExecStream::currInputRid [private]

The rid value of the last input row read.

Definition at line 112 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), open(), and postProcessTuple().

TupleProjectionAccessor LcsClusterReplaceExecStream::clusterColsTupleAccessor [private]

Accessor for projecting cluster tuple data from the input row.

Definition at line 117 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), and initTupleLoadParams().

TupleDataWithBuffer LcsClusterReplaceExecStream::origClusterTupleData [private]

TupleData used to load column values from the original cluster.

Definition at line 122 of file LcsClusterReplaceExecStream.h.

Referenced by initTupleLoadParams(), and readOrigClusterRow().

bool LcsClusterReplaceExecStream::newData [private]

True if at least one existing row is being replaced with a new value.

Definition at line 127 of file LcsClusterReplaceExecStream.h.

Referenced by getTupleForLoad(), and open().

uint LcsClusterAppendExecStream::blockSize [protected, inherited]

Space available on page blocks for writing cluster data.

Definition at line 58 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::prepare(), and LcsClusterAppendExecStream::startNewBlock().

TupleDescriptor LcsClusterAppendExecStream::tableColsTupleDesc [protected, inherited]

Tuple descriptor for the tuple representing all cluster columns across the table that this cluster is a part of.

Definition at line 64 of file LcsClusterAppendExecStream.h.

Referenced by initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), and LcsClusterAppendExecStream::prepare().

TupleData LcsClusterAppendExecStream::clusterColsTupleData [protected, inherited]

Tuple data for the tuple datums representing only this cluster.

Definition at line 69 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), and readOrigClusterRow().

TupleDescriptor LcsClusterAppendExecStream::clusterColsTupleDesc [protected, inherited]

Tuple descriptors for the columns that are part of this cluster.

Definition at line 74 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), and LcsClusterAppendExecStream::initTupleLoadParams().

boost::scoped_array<TupleDescriptor> LcsClusterAppendExecStream::colTupleDesc [protected, inherited]

Individual tuple descriptors for each column in the cluster.

Definition at line 79 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), and LcsClusterAppendExecStream::startNewBlock().

SegmentAccessor LcsClusterAppendExecStream::scratchAccessor [protected, inherited]

Scratch accessor for allocating large buffer pages.

Reimplemented from BTreeExecStream.

Definition at line 84 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::prepare().

ClusterPageLock LcsClusterAppendExecStream::bufferLock [protected, inherited]

Lock on scratch page.

Definition at line 89 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::prepare().

bool LcsClusterAppendExecStream::overwrite [protected, inherited]

True if overwriting all existing data.

Definition at line 94 of file LcsClusterAppendExecStream.h.

bool LcsClusterAppendExecStream::isDone [protected, inherited]

Whether row count has been produced.

Definition at line 99 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::open().

TupleData LcsClusterAppendExecStream::outputTuple [protected, inherited]

Output tuple containing count of number of rows loaded.

Definition at line 104 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::prepare().

TupleAccessor* LcsClusterAppendExecStream::outputTupleAccessor [protected, inherited]

A reference to the output accessor contained in SingleOutputExecStream::pOutAccessor.

Definition at line 110 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::open(), and LcsClusterAppendExecStream::prepare().

boost::scoped_array<FixedBuffer> LcsClusterAppendExecStream::outputTupleBuffer [protected, inherited]

buffer holding the outputTuple to provide to the consumers

Definition at line 115 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::closeImpl(), LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::open().

bool LcsClusterAppendExecStream::compressCalled [protected, inherited]

True if execute has been called at least once.

Definition at line 120 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::init(), and LcsClusterAppendExecStream::initLoad().

boost::scoped_array<LcsHash> LcsClusterAppendExecStream::hash [protected, inherited]

Array of hashes, one per cluster column.

Definition at line 125 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBatch().

uint LcsClusterAppendExecStream::numColumns [protected, inherited]

Number of columns in the cluster.

Definition at line 130 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::getResourceRequirements(), LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBatch().

boost::scoped_array<PBuffer> LcsClusterAppendExecStream::rowBlock [protected, inherited]

Array of temporary blocks for row array.

Definition at line 135 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::writeBatch().

uint LcsClusterAppendExecStream::nRowsMax [protected, inherited]

Maximum number of values that can be stored in m_rowBlock.

Definition at line 140 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::isRowArrayFull().

boost::scoped_array<PBuffer> LcsClusterAppendExecStream::hashBlock [protected, inherited]

Array of temporary blocks for hash table.

Definition at line 145 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::startNewBlock().

boost::scoped_array<PBuffer> LcsClusterAppendExecStream::builderBlock [protected, inherited]

Array of temporary blocks used by ClusterNodeWriter.

Definition at line 150 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::startNewBlock().

uint LcsClusterAppendExecStream::rowCnt [protected, inherited]

Number of rows loaded into the current set of batches.

Definition at line 155 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::isRowArrayFull(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::writeBatch(), and LcsClusterAppendExecStream::writeBlock().

bool LcsClusterAppendExecStream::indexBlockDirty [protected, inherited]

True if index blocks need to be written to disk.

Definition at line 160 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBlock().

LcsRid LcsClusterAppendExecStream::firstRow [protected, inherited]

Starting rowid in a cluster page.

Definition at line 165 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::getLastBlock(), LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::startNewBlock().

LcsRid LcsClusterAppendExecStream::lastRow [protected, inherited]

Last rowid in the last batch.

Definition at line 170 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBatch().

LcsRid LcsClusterAppendExecStream::startRow [protected, inherited]

Definition at line 174 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::prepare().

SharedLcsClusterNodeWriter LcsClusterAppendExecStream::lcsBlockBuilder [protected, inherited]

Page builder object.

Definition at line 179 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::getLastBlock(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::writeBatch(), and LcsClusterAppendExecStream::writeBlock().

boost::scoped_array<LcsHashValOrd> LcsClusterAppendExecStream::hashValOrd [protected, inherited]

Row value ordinal returned from hash, one per cluster column.

Definition at line 184 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), and LcsClusterAppendExecStream::compress().

boost::scoped_array<boost::scoped_array<FixedBuffer> > LcsClusterAppendExecStream::tempBuf [protected, inherited]

Temporary buffers used by WriteBatch.

Definition at line 189 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), and LcsClusterAppendExecStream::writeBatch().

boost::scoped_array<uint> LcsClusterAppendExecStream::maxValueSize [protected, inherited]

Max size for each column cluster used by WriteBatch.

Definition at line 194 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), and LcsClusterAppendExecStream::writeBatch().

bool LcsClusterAppendExecStream::arraysAlloced [protected, inherited]

Indicates where or not we have already allocated arrays.

Definition at line 199 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::allocArrays(), and LcsClusterAppendExecStream::init().

PLcsClusterNode LcsClusterAppendExecStream::pIndexBlock [protected, inherited]

Buffer pointing to cluster page that will actually be written.

Definition at line 204 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::startNewBlock().

RecordNum LcsClusterAppendExecStream::numRowCompressed [protected, inherited]

Total number of rows loaded by this object.

Definition at line 209 of file LcsClusterAppendExecStream.h.

Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::postProcessTuple(), and LcsClusterAppendExecStream::prepare().

BTreeDescriptor BTreeExecStream::treeDescriptor [protected, inherited]

Definition at line 113 of file BTreeExecStream.h.

Referenced by BTreeInsertExecStream::buildTree(), BTreeExecStream::closeImpl(), BTreeExecStream::copyParamsToDescriptor(), getTupleForLoad(), LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), BTreePrefetchSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), BTreeExecStream::newWriter(), open(), BTreeSearchExecStream::open(), BTreePrefetchSearchExecStream::open(), BTreeInsertExecStream::open(), BTreeExecStream::open(), prepare(), LcsClusterAppendExecStream::prepare(), LbmSearchExecStream::prepare(), LbmGeneratorExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), BTreeExecStream::prepare(), and BTreeInsertExecStream::truncateTree().

BTreeOwnerRootMap* BTreeExecStream::pRootMap [protected, inherited]

Definition at line 115 of file BTreeExecStream.h.

Referenced by BTreeExecStream::closeImpl(), BTreeExecStream::open(), and BTreeExecStream::prepare().

SharedBTreeAccessBase BTreeExecStream::pBTreeAccessBase [protected, inherited]

Definition at line 116 of file BTreeExecStream.h.

Referenced by BTreeInsertExecStream::closeImpl(), BTreeExecStream::closeImpl(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), BTreeExecStream::newWriter(), and BTreeExecStream::open().

SharedBTreeReader BTreeExecStream::pBTreeReader [protected, inherited]

Definition at line 117 of file BTreeExecStream.h.

Referenced by BTreeExecStream::endSearch(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), and BTreeExecStream::newWriter().

DynamicParamId BTreeExecStream::rootPageIdParamId [protected, inherited]

Definition at line 118 of file BTreeExecStream.h.

Referenced by BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), BTreeInsertExecStream::prepare(), and BTreeExecStream::prepare().

SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited]

Definition at line 56 of file SingleOutputExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().

SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited]

Definition at line 51 of file SingleInputExecStream.h.

Referenced by SortedAggExecStream::compareGroupByKeys(), ExternalSortExecStreamImpl::computeFirstResult(), ExternalSortExecStreamImpl::execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), SegBufferWriterExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:37 2009 for Fennel by  doxygen 1.5.1