#include <LcsClusterReplaceExecStream.h>
Inheritance diagram for LcsClusterReplaceExecStream:
Public Member Functions | |
virtual void | prepare (LcsClusterReplaceExecStreamParams const ¶ms) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | prepare (LcsClusterAppendExecStreamParams const ¶ms) |
virtual void | prepare (BTreeExecStreamParams const ¶ms) |
virtual void | prepare (SingleOutputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | prepare (ConduitExecStreamParams const ¶ms) |
virtual void | prepare (SingleInputExecStreamParams const ¶ms) |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | closeImpl () |
Implements ClosableObject. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual bool | canEarlyClose () |
| |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
bool | isClosed () const |
| |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
Static Public Member Functions | |
static SharedBTreeWriter | newWriter (BTreeExecStreamParams const ¶ms) |
static void | copyParamsToDescriptor (BTreeDescriptor &, BTreeParams const &, SharedCacheAccessor const &) |
Protected Member Functions | |
void | allocArrays () |
Allocate memory for arrays. | |
void | initLoad () |
Initializes the load. | |
void | loadExistingBlock () |
Populates row and hash arrays from existing index block. | |
void | startNewBlock () |
Prepare to write a fresh block. | |
void | convertTuplesToCols () |
Given a TupleData representing all columns in a cluster, converts each column into its own TupleData. | |
void | addValueOrdinal (uint column, uint16_t vOrd) |
Adds value ordinal to row array for new row. | |
bool | isRowArrayFull () |
True if row array is full. | |
void | writeBatch (bool lastBatch) |
Writes a batch(run) to index block. | |
void | writeBlock () |
Writes block to index when the block is full or this is the last block in the load. | |
bool | getLastBlock (PLcsClusterNode &pBlock) |
Gets last block written to disk so we can append to it, reading in the first rid value stored on the page. | |
void | init () |
Initializes and sets up object with content specific to the load that will be carried out. | |
ExecStreamResult | compress (ExecStreamQuantum const &quantum) |
Processes rows for loading. | |
virtual SharedBTreeReader | newReader () |
SharedBTreeWriter | newWriter (bool monotonic=false) |
virtual void | endSearch () |
Forgets the current reader or writer's search, releasing any page locks. | |
ExecStreamResult | precheckConduitBuffers () |
Checks the state of the input and output buffers. | |
Protected Attributes | |
uint | blockSize |
Space available on page blocks for writing cluster data. | |
TupleDescriptor | tableColsTupleDesc |
Tuple descriptor for the tuple representing all cluster columns across the table that this cluster is a part of. | |
TupleData | clusterColsTupleData |
Tuple data for the tuple datums representing only this cluster. | |
TupleDescriptor | clusterColsTupleDesc |
Tuple descriptors for the columns that are part of this cluster. | |
boost::scoped_array< TupleDescriptor > | colTupleDesc |
Individual tuple descriptors for each column in the cluster. | |
SegmentAccessor | scratchAccessor |
Scratch accessor for allocating large buffer pages. | |
ClusterPageLock | bufferLock |
Lock on scratch page. | |
bool | overwrite |
True if overwriting all existing data. | |
bool | isDone |
Whether row count has been produced. | |
TupleData | outputTuple |
Output tuple containing count of number of rows loaded. | |
TupleAccessor * | outputTupleAccessor |
A reference to the output accessor contained in SingleOutputExecStream::pOutAccessor. | |
boost::scoped_array< FixedBuffer > | outputTupleBuffer |
buffer holding the outputTuple to provide to the consumers | |
bool | compressCalled |
True if execute has been called at least once. | |
boost::scoped_array< LcsHash > | hash |
Array of hashes, one per cluster column. | |
uint | numColumns |
Number of columns in the cluster. | |
boost::scoped_array< PBuffer > | rowBlock |
Array of temporary blocks for row array. | |
uint | nRowsMax |
Maximum number of values that can be stored in m_rowBlock. | |
boost::scoped_array< PBuffer > | hashBlock |
Array of temporary blocks for hash table. | |
boost::scoped_array< PBuffer > | builderBlock |
Array of temporary blocks used by ClusterNodeWriter. | |
uint | rowCnt |
Number of rows loaded into the current set of batches. | |
bool | indexBlockDirty |
True if index blocks need to be written to disk. | |
LcsRid | firstRow |
Starting rowid in a cluster page. | |
LcsRid | lastRow |
Last rowid in the last batch. | |
LcsRid | startRow |
SharedLcsClusterNodeWriter | lcsBlockBuilder |
Page builder object. | |
boost::scoped_array< LcsHashValOrd > | hashValOrd |
Row value ordinal returned from hash, one per cluster column. | |
boost::scoped_array< boost::scoped_array< FixedBuffer > > | tempBuf |
Temporary buffers used by WriteBatch. | |
boost::scoped_array< uint > | maxValueSize |
Max size for each column cluster used by WriteBatch. | |
bool | arraysAlloced |
Indicates where or not we have already allocated arrays. | |
PLcsClusterNode | pIndexBlock |
Buffer pointing to cluster page that will actually be written. | |
RecordNum | numRowCompressed |
Total number of rows loaded by this object. | |
BTreeDescriptor | treeDescriptor |
BTreeOwnerRootMap * | pRootMap |
SharedBTreeAccessBase | pBTreeAccessBase |
SharedBTreeReader | pBTreeReader |
DynamicParamId | rootPageIdParamId |
SharedExecStreamBufAccessor | pOutAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
SharedExecStreamBufAccessor | pInAccessor |
Private Member Functions | |
virtual void | initTupleLoadParams (const TupleProjection &inputProj) |
Initializes member fields corresponding to the data to be loaded, taking into account the extra rid column that identifies each input tuple. | |
virtual ExecStreamResult | getTupleForLoad () |
Retrieves the tuple that will be loaded into the cluster. | |
virtual void | postProcessTuple () |
Performs post-processing after a tuple has been loaded. | |
void | readOrigClusterRow () |
Reads the cluster columns for the current row being loaded from the original cluster. | |
virtual void | close () |
Writes out the last pending batches and btree pages. | |
Private Attributes | |
DynamicParamId | newClusterRootParamId |
Dynamic parameter id corresponding to the root pageId of the new cluster, if the pageId is required downstream. | |
TupleDescriptor | projInputTupleDesc |
Tuple descriptor representing the rid column plus the cluster columns to be loaded. | |
TupleData | projInputTupleData |
Tuple data for the projected input tuple. | |
std::vector< UnalignedAttributeAccessor > | attrAccessors |
Accessors for loading column values from the original cluster. | |
SnapshotRandomAllocationSegment * | pSnapshotSegment |
The underlying snapshot segment for the cluster. | |
SharedLcsClusterReader | pOrigClusterReader |
Reader for the original cluster. | |
RecordNum | origNumRows |
Number of rows in the original cluster. | |
PageId | origRootPageId |
The rootPageId of the original rid to pageId btree map. | |
LcsRid | currLoadRid |
The current rid being loaded. | |
bool | needTuple |
True if a new tuple needs to be provided for the load. | |
LcsRid | currInputRid |
The rid value of the last input row read. | |
TupleProjectionAccessor | clusterColsTupleAccessor |
Accessor for projecting cluster tuple data from the input row. | |
TupleDataWithBuffer | origClusterTupleData |
TupleData used to load column values from the original cluster. | |
bool | newData |
True if at least one existing row is being replaced with a new value. |
Each tuple contains in its first column a rid value that identifies which row will be replaced. If there are gaps in the rid sequence, then the row corresponding to that gap will be replaced with a tuple that has the same values as the existing tuple in the original cluster at that same rid position.
After processing all input, the rid to cluster pageId btree map corresponding to the cluster is versioned off of the original btree's rootPageId. So, this execution stream requires the underlying segment corresponding to the cluster to be a snapshot segment.
Definition at line 53 of file LcsClusterReplaceExecStream.h.
void LcsClusterReplaceExecStream::initTupleLoadParams | ( | const TupleProjection & | inputProj | ) | [private, virtual] |
Initializes member fields corresponding to the data to be loaded, taking into account the extra rid column that identifies each input tuple.
inputProj | projection of the input tuple that's relevant to this cluster append |
Reimplemented from LcsClusterAppendExecStream.
Definition at line 42 of file LcsClusterReplaceExecStream.cpp.
References attrAccessors, TupleProjectionAccessor::bind(), clusterColsTupleAccessor, LcsClusterAppendExecStream::clusterColsTupleData, LcsClusterAppendExecStream::clusterColsTupleDesc, LcsClusterAppendExecStream::colTupleDesc, TupleData::compute(), TupleDataWithBuffer::computeAndAllocate(), LcsClusterAppendExecStream::numColumns, origClusterTupleData, SingleInputExecStream::pInAccessor, pOrigClusterReader, TupleDescriptor::projectFrom(), projInputTupleData, projInputTupleDesc, LcsClusterAppendExecStream::tableColsTupleDesc, and BTreeExecStream::treeDescriptor.
00044 { 00045 numColumns = inputProj.size() - 1; 00046 00047 projInputTupleDesc.projectFrom(tableColsTupleDesc, inputProj); 00048 projInputTupleData.compute(projInputTupleDesc); 00049 00050 // Setup the cluster reader to read all columns from the original cluster 00051 // without any pre-fetch 00052 // 00053 // TODO - Extend this class to use pre-fetches when reading from the 00054 // original cluster. This will require reading ahead from the input 00055 // stream to detect gaps in the rid values and then setting up rid runs 00056 // for each block of missing rids. 00057 pOrigClusterReader = 00058 SharedLcsClusterReader(new LcsClusterReader(treeDescriptor)); 00059 TupleProjection proj; 00060 proj.resize(numColumns); 00061 for (uint i = 0; i < numColumns; i++) { 00062 proj[i] = i; 00063 } 00064 pOrigClusterReader->initColumnReaders(numColumns, proj); 00065 00066 // Setup the objects for accessing just the cluster columns by excluding 00067 // the rid column 00068 std::copy(inputProj.begin() + 1, inputProj.end(), proj.begin()); 00069 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor(); 00070 clusterColsTupleAccessor.bind(inputAccessor, proj); 00071 clusterColsTupleDesc.projectFrom(pInAccessor->getTupleDesc(), proj); 00072 clusterColsTupleData.compute(clusterColsTupleDesc); 00073 00074 attrAccessors.resize(clusterColsTupleDesc.size()); 00075 for (uint i = 0; i < clusterColsTupleDesc.size(); i++) { 00076 attrAccessors[i].compute(clusterColsTupleDesc[i]); 00077 } 00078 00079 origClusterTupleData.computeAndAllocate(clusterColsTupleDesc); 00080 00081 // setup one tuple descriptor per cluster column 00082 colTupleDesc.reset(new TupleDescriptor[numColumns]); 00083 for (int i = 0; i < numColumns; i++) { 00084 // +1 to skip over the rid column 00085 colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i + 1]]); 00086 } 00087 }
ExecStreamResult LcsClusterReplaceExecStream::getTupleForLoad | ( | ) | [private, virtual] |
Retrieves the tuple that will be loaded into the cluster.
The tuple either originates from the input stream or contains the original values at the current rid position being loaded.
Reimplemented from LcsClusterAppendExecStream.
Definition at line 148 of file LcsClusterReplaceExecStream.cpp.
References clusterColsTupleAccessor, LcsClusterAppendExecStream::clusterColsTupleData, BTreeBuilder::createEmptyRoot(), currInputRid, currLoadRid, EXECBUF_EOS, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, BTreeAccessBase::getRootPageId(), LcsClusterAppendExecStream::initLoad(), needTuple, newData, NULL_PAGE_ID, opaqueToInt(), origNumRows, origRootPageId, SingleInputExecStream::pInAccessor, projInputTupleData, SegmentAccessor::pSegment, pSnapshotSegment, readOrigClusterRow(), BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeExecStream::treeDescriptor, TupleProjectionAccessor::unmarshal(), and SnapshotRandomAllocationSegment::versionPage().
00149 { 00150 // If the last tuple provided has not been processed yet, then there's no 00151 // work to be done 00152 if (!needTuple) { 00153 return EXECRC_YIELD; 00154 } 00155 00156 if (pInAccessor->getState() == EXECBUF_EOS) { 00157 // No more input rows, but that doesn't mean we're finished because 00158 // we have to match the number of rows in the original cluster. 00159 // Therefore, if there's a gap at the end of the cluster, read the 00160 // original rows until we read the rid corresponding to the last tuple 00161 // tuple in the original cluster, at which point, we can finally 00162 // say that we're done. However, if there wasn't at least one new 00163 // row, then there's no need to replace the column. We can simply 00164 // keep the original. 00165 if (!newData) { 00166 return EXECRC_EOS; 00167 } 00168 if (opaqueToInt(currLoadRid) < origNumRows) { 00169 readOrigClusterRow(); 00170 needTuple = false; 00171 // in case this wasn't already called 00172 initLoad(); 00173 return EXECRC_YIELD; 00174 } else { 00175 pSnapshotSegment->versionPage( 00176 origRootPageId, 00177 treeDescriptor.rootPageId); 00178 return EXECRC_EOS; 00179 } 00180 } 00181 00182 if (!pInAccessor->demandData()) { 00183 return EXECRC_BUF_UNDERFLOW; 00184 } 00185 00186 // Create a new rid to pageId btree map for this cluster, once we know 00187 // at least one row is being updated 00188 if (!newData) { 00189 treeDescriptor.rootPageId = NULL_PAGE_ID; 00190 BTreeBuilder builder( 00191 treeDescriptor, 00192 treeDescriptor.segmentAccessor.pSegment); 00193 builder.createEmptyRoot(); 00194 treeDescriptor.rootPageId = builder.getRootPageId(); 00195 newData = true; 00196 } 00197 00198 initLoad(); 00199 00200 if (currLoadRid == LcsRid(0) || currLoadRid > currInputRid) { 00201 assert(!pInAccessor->isTupleConsumptionPending()); 00202 pInAccessor->unmarshalProjectedTuple(projInputTupleData); 00203 currInputRid = 00204 *reinterpret_cast<LcsRid const *> (projInputTupleData[0].pData); 00205 } 00206 00207 // If there's a gap between the last input tuple read and the 00208 // current row that needs to be loaded, then read the original 00209 // cluster data; otherwise, unmarshal the last input row read. 00210 if (currInputRid > currLoadRid) { 00211 readOrigClusterRow(); 00212 } else { 00213 assert(currInputRid == currLoadRid); 00214 clusterColsTupleAccessor.unmarshal(clusterColsTupleData); 00215 } 00216 00217 needTuple = false; 00218 return EXECRC_YIELD; 00219 }
void LcsClusterReplaceExecStream::postProcessTuple | ( | ) | [private, virtual] |
Performs post-processing after a tuple has been loaded.
Reimplemented from LcsClusterAppendExecStream.
Definition at line 247 of file LcsClusterReplaceExecStream.cpp.
References currInputRid, currLoadRid, needTuple, and LcsClusterAppendExecStream::postProcessTuple().
00248 { 00249 // Consume the current input tuple if we've completed processing that 00250 // input. 00251 if (currInputRid == currLoadRid) { 00252 LcsClusterAppendExecStream::postProcessTuple(); 00253 } 00254 currLoadRid++; 00255 needTuple = true; 00256 }
void LcsClusterReplaceExecStream::readOrigClusterRow | ( | ) | [private] |
Reads the cluster columns for the current row being loaded from the original cluster.
Definition at line 221 of file LcsClusterReplaceExecStream.cpp.
References attrAccessors, LcsClusterAppendExecStream::clusterColsTupleData, currLoadRid, origClusterTupleData, pOrigClusterReader, and TupleDataWithBuffer::resetBuffer().
Referenced by getTupleForLoad().
00222 { 00223 origClusterTupleData.resetBuffer(); 00224 00225 // Position to the current rid we want to load. Then read each of the 00226 // column values, load them into the TupleDataWithBuffer, and then copy 00227 // those TupleDatum's into the TupleData that's used to load the 00228 // cluster. 00229 bool needSync = true; 00230 if (pOrigClusterReader->isPositioned() && 00231 currLoadRid < pOrigClusterReader->getRangeEndRid()) 00232 { 00233 needSync = false; 00234 } 00235 bool rc = pOrigClusterReader->position(currLoadRid); 00236 assert(rc); 00237 for (uint i = 0; i < pOrigClusterReader->nColsToRead; i++) { 00238 if (needSync) { 00239 pOrigClusterReader->clusterCols[i].sync(); 00240 } 00241 PBuffer colValue = pOrigClusterReader->clusterCols[i].getCurrentValue(); 00242 attrAccessors[i].loadValue(origClusterTupleData[i], colValue); 00243 clusterColsTupleData[i] = origClusterTupleData[i]; 00244 } 00245 }
void LcsClusterReplaceExecStream::close | ( | ) | [private, virtual] |
Writes out the last pending batches and btree pages.
Deallocates temporary memory and buffer pages. Allows resources to be freed before the execution stream is actually closed.
Reimplemented from LcsClusterAppendExecStream.
Definition at line 258 of file LcsClusterReplaceExecStream.cpp.
References LcsClusterAppendExecStream::close(), and pOrigClusterReader.
00259 { 00260 LcsClusterAppendExecStream::close(); 00261 pOrigClusterReader->close(); 00262 }
void LcsClusterReplaceExecStream::prepare | ( | LcsClusterReplaceExecStreamParams const & | params | ) | [virtual] |
Definition at line 31 of file LcsClusterReplaceExecStream.cpp.
References newClusterRootParamId, origRootPageId, LcsClusterAppendExecStream::prepare(), BTreeDescriptor::rootPageId, and BTreeExecStream::treeDescriptor.
00033 { 00034 LcsClusterAppendExecStream::prepare(params); 00035 newClusterRootParamId = params.rootPageIdParamId; 00036 00037 // Save the original root pageId at prepare time because the treeDescriptor 00038 // will be reset at open time with the new cluster's rootPageId 00039 origRootPageId = treeDescriptor.rootPageId; 00040 }
void LcsClusterReplaceExecStream::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from LcsClusterAppendExecStream.
Definition at line 105 of file LcsClusterReplaceExecStream.cpp.
References TupleDatum::cbData, currInputRid, currLoadRid, SegmentFactory::getSnapshotSegment(), MAXU, needTuple, newClusterRootParamId, newData, opaqueToInt(), LcsClusterAppendExecStream::open(), origNumRows, TupleDatum::pData, ExecStream::pDynamicParamManager, SingleInputExecStream::pInAccessor, pOrigClusterReader, SegmentAccessor::pSegment, pSnapshotSegment, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, and BTreeExecStream::treeDescriptor.
00106 { 00107 newData = false; 00108 00109 // Need to call this after the setup above because the cluster append 00110 // stream depends on the new cluster being in place 00111 LcsClusterAppendExecStream::open(restart); 00112 00113 // Determine how many rows are in the original cluster 00114 origNumRows = pOrigClusterReader->getNumRows(); 00115 00116 if (!restart) { 00117 // Save the root pageId in a dynamic parameter so it can be read 00118 // downstream, if a parameter is specified 00119 if (opaqueToInt(newClusterRootParamId) > 0) { 00120 pDynamicParamManager->createParam( 00121 newClusterRootParamId, 00122 pInAccessor->getTupleDesc()[0]); 00123 } 00124 00125 // Retrieve the snapshot segment. This needs to be done at open time 00126 // because the segment changes across transaction boundaries. 00127 pSnapshotSegment = 00128 SegmentFactory::getSnapshotSegment( 00129 treeDescriptor.segmentAccessor.pSegment); 00130 assert(pSnapshotSegment != NULL); 00131 } 00132 00133 if (opaqueToInt(newClusterRootParamId) > 0) { 00134 TupleDatum rootPageIdDatum; 00135 rootPageIdDatum.pData = (PConstBuffer) &(treeDescriptor.rootPageId); 00136 rootPageIdDatum.cbData = sizeof(treeDescriptor.rootPageId); 00137 pDynamicParamManager->writeParam( 00138 newClusterRootParamId, 00139 rootPageIdDatum); 00140 } 00141 00142 pOrigClusterReader->open(); 00143 currLoadRid = LcsRid(0); 00144 currInputRid = LcsRid(MAXU); 00145 needTuple = true; 00146 }
void LcsClusterReplaceExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual] |
Reimplemented from LcsClusterAppendExecStream.
Definition at line 89 of file LcsClusterReplaceExecStream.cpp.
References LcsClusterAppendExecStream::getResourceRequirements(), and ExecStreamResourceQuantity::nCachePages.
00092 { 00093 LcsClusterAppendExecStream::getResourceRequirements( 00094 minQuantity, 00095 optQuantity); 00096 00097 // Need to allocate two more pages for the cluster reader that reads 00098 // original cluster values -- one for the rid to pageId btree and another 00099 // for the actual cluster page. 00100 minQuantity.nCachePages += 2; 00101 00102 optQuantity = minQuantity; 00103 }
void LcsClusterAppendExecStream::allocArrays | ( | ) | [protected, inherited] |
Allocate memory for arrays.
Definition at line 714 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::arraysAlloced, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashBlock, LcsClusterAppendExecStream::hashValOrd, LcsClusterAppendExecStream::maxValueSize, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::rowBlock, and LcsClusterAppendExecStream::tempBuf.
Referenced by LcsClusterAppendExecStream::initLoad().
00715 { 00716 // allocate arrays only if they have not been allocated already 00717 if (arraysAlloced) { 00718 return; 00719 } 00720 arraysAlloced = true; 00721 00722 // instantiate hashes 00723 hash.reset(new LcsHash[numColumns]); 00724 00725 // allocate pointers for row, hash blocks, other arrays 00726 rowBlock.reset(new PBuffer[numColumns]); 00727 hashBlock.reset(new PBuffer[numColumns]); 00728 00729 builderBlock.reset(new PBuffer[numColumns]); 00730 00731 hashValOrd.reset(new LcsHashValOrd[numColumns]); 00732 tempBuf.reset(new boost::scoped_array<FixedBuffer>[numColumns]); 00733 maxValueSize.reset(new uint[numColumns]); 00734 }
void LcsClusterAppendExecStream::initLoad | ( | ) | [protected, inherited] |
Initializes the load.
This method should only be called when the input stream has data available to read.
Definition at line 285 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::allocArrays(), SegNodeLock< Node >::allocatePage(), LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::bufferLock, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::clusterColsTupleDesc, LcsClusterAppendExecStream::colTupleDesc, LcsClusterAppendExecStream::compressCalled, LcsClusterAppendExecStream::getLastBlock(), SegPageLock::getPage(), TraceSource::getSharedTraceTarget(), TraceSource::getTraceSourceName(), CachePage::getWritableData(), LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashBlock, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::nRowsMax, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::pIndexBlock, LcsClusterAppendExecStream::rowBlock, LcsClusterAppendExecStream::scratchAccessor, LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::startRow, BTreeExecStream::treeDescriptor, and SegPageLock::unlock().
Referenced by getTupleForLoad(), and LcsClusterAppendExecStream::getTupleForLoad().
00286 { 00287 // If this is the first time this method is called, then 00288 // start a new block (for the new table), or load the last block 00289 // (of the existing table). We do this here rather than in 00290 // init() because for INSERT into T as SELECT * from T, 00291 // we need to make sure that we extract all the data from 00292 // T before modifying the blocks there; hence that's why this 00293 // method should not be called until there is input available. 00294 // Use the boolean to ensure that initialization of cluster page 00295 // is only done once. 00296 00297 if (!compressCalled) { 00298 compressCalled = true; 00299 00300 // The dynamic allocated memory in lcsBlockBuilder is allocated for 00301 // every LcsClusterAppendExecStream.open() and deallocated for every 00302 // LcsClusterAppendExecStream.closeImpl(). The dynamic memory is not 00303 // reused across calls(e.g. when issueing the same statement twice). 00304 lcsBlockBuilder = SharedLcsClusterNodeWriter( 00305 new LcsClusterNodeWriter( 00306 treeDescriptor, 00307 scratchAccessor, 00308 clusterColsTupleDesc, 00309 getSharedTraceTarget(), 00310 getTraceSourceName())); 00311 00312 allocArrays(); 00313 00314 // get blocks from cache to use as temporary space and initialize arrays 00315 for (uint i = 0; i < numColumns; i++) { 00316 bufferLock.allocatePage(); 00317 rowBlock[i] = bufferLock.getPage().getWritableData(); 00318 bufferLock.unlock(); 00319 00320 bufferLock.allocatePage(); 00321 hashBlock[i] = bufferLock.getPage().getWritableData(); 00322 bufferLock.unlock(); 00323 00324 bufferLock.allocatePage(); 00325 builderBlock[i] = bufferLock.getPage().getWritableData(); 00326 bufferLock.unlock(); 00327 00328 hash[i].init( 00329 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize); 00330 } 00331 00332 nRowsMax = blockSize / sizeof(uint16_t); 00333 00334 // if the index exists, get last block written 00335 00336 PLcsClusterNode pExistingIndexBlock; 00337 00338 bool found = getLastBlock(pExistingIndexBlock); 00339 if (found) { 00340 // indicate we are updating a leaf 00341 pIndexBlock = pExistingIndexBlock; 00342 00343 // extract rows and values from last batch so we can 00344 // add to it. 00345 loadExistingBlock(); 00346 } else { 00347 // Start writing a new block 00348 startNewBlock(); 00349 startRow = LcsRid(0); 00350 } 00351 } 00352 }
void LcsClusterAppendExecStream::loadExistingBlock | ( | ) | [protected, inherited] |
Populates row and hash arrays from existing index block.
Definition at line 418 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::firstRow, FixedBuffer, LcsHashValOrd::getValOrd(), LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::lastRow, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::pIndexBlock, LcsClusterAppendExecStream::rowCnt, LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::startRow, and LcsClusterAppendExecStream::writeBlock().
Referenced by LcsClusterAppendExecStream::initLoad().
00419 { 00420 boost::scoped_array<uint> numVals; // number of values in block 00421 boost::scoped_array<uint16_t> lastValOff; 00422 boost::scoped_array<boost::scoped_array<FixedBuffer> > aLeftOverBufs; 00423 // array of buffers to hold 00424 // rolled back data for each 00425 // column 00426 uint anLeftOvers; // number of leftover rows for 00427 // each col; since the value is 00428 // same for every column, no need 00429 // for this to be an array 00430 boost::scoped_array<uint> aiFixedSize; // how much space was used for 00431 // each column; should be 00432 // equal for each value in a column 00433 LcsHashValOrd vOrd; 00434 00435 uint i, j; 00436 RecordNum startRowCnt; 00437 RecordNum nrows; 00438 00439 lcsBlockBuilder->init( 00440 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock), 00441 builderBlock.get(), blockSize); 00442 00443 lastValOff.reset(new uint16_t[numColumns]); 00444 numVals.reset(new uint[numColumns]); 00445 00446 // REVIEW jvs 28-Nov-2005: A simpler approach to this whole problem 00447 // might be to pretend we were starting an entirely new block, 00448 // use an LcsClusterReader to read the old one logically and append 00449 // the old rows into the new block, and then carry on from there with 00450 // the new rows. 00451 00452 // Append to an existing cluster page. Set the last rowid based on 00453 // the first rowid and the number of rows currently on the page. 00454 // As rows are "rolled back", lastRow is decremented accordingly 00455 00456 bool bStartNewBlock = 00457 lcsBlockBuilder->openAppend(numVals.get(), lastValOff.get(), nrows); 00458 lastRow = firstRow + nrows; 00459 startRow = lastRow; 00460 00461 // Setup structures to hold rolled back information 00462 aiFixedSize.reset(new uint[numColumns]); 00463 aLeftOverBufs.reset(new boost::scoped_array<FixedBuffer>[numColumns]); 00464 00465 startRowCnt = rowCnt; 00466 00467 // Rollback the final batch for each column 00468 // We need to rollback all 00469 // the batches before we can start the new batches because 00470 // 1) in openAppend() we adjust m_szLeft to not include space 00471 // for numColumns * sizeof(RIBatch). So if the 00472 // block was full, then m_szLeft would be negative, 00473 // since we decreased it by numColumns * sizeof(LcsBatch) 00474 // 2) the rollback code will add sizeof(LcsBatch) to szLeft 00475 // for each batch it rolls back 00476 // 3) the code to add values to a batch gets upset if 00477 // szLeft < 0 00478 for (i = 0; i < numColumns; i++) { 00479 //reset everytime through loop 00480 rowCnt = startRowCnt; 00481 lcsBlockBuilder->describeLastBatch(i, anLeftOvers, aiFixedSize[i]); 00482 00483 // if we have left overs from the last batch (ie. batch did not end on 00484 // an 8 boundary), rollback and store in temporary mem 00485 // aLeftOverBufs[i] 00486 if (anLeftOvers > 0) { 00487 aLeftOverBufs[i].reset( 00488 new FixedBuffer[anLeftOvers * aiFixedSize[i]]); 00489 lcsBlockBuilder->rollBackLastBatch(i, aLeftOverBufs[i].get()); 00490 indexBlockDirty = true; 00491 } 00492 } 00493 00494 // Decrement lastRow if there was a rollback of the last batch 00495 lastRow -= anLeftOvers; 00496 00497 // If the last page is already full, then write it out and start a new one 00498 if (bStartNewBlock) { 00499 writeBlock(); 00500 startNewBlock(); 00501 } 00502 00503 // Start a new batch for each column. 00504 for (i = 0; i < numColumns; i++) { 00505 //reset everytime through loop 00506 rowCnt = startRowCnt; 00507 00508 if (!bStartNewBlock) { 00509 // Repopulate the hash table with the values already in the 00510 // data segment at the bottom of the block (because we didn't 00511 // roll back these values, we only roll back the pointers to 00512 // these values). But only do this if we haven't started a new 00513 // block. 00514 hash[i].restore(numVals[i], lastValOff[i]); 00515 } 00516 00517 // if we had left overs from the last batch, start a new batch 00518 // NOTE: we are guaranteed to be able to add these values back 00519 // to the current block 00520 if (anLeftOvers > 0) { 00521 uint8_t *val; 00522 bool undoInsert = false; 00523 00524 // There is a very small probability that when the hash is 00525 // restored, using the existing values in the block, that the 00526 // hash will be full and some of the left over values 00527 // can not be stored in the hash. If this is true then clear 00528 // the hash. 00529 if (hash[i].isHashFull(anLeftOvers)) { 00530 hash[i].startNewBatch(anLeftOvers); 00531 } 00532 00533 for (j = 0, val = aLeftOverBufs[i].get(); 00534 j < anLeftOvers; 00535 j++, val += aiFixedSize[i]) 00536 { 00537 hash[i].insert(val, &vOrd, &undoInsert); 00538 00539 //If we have left overs they should fit in the block 00540 assert(!undoInsert); 00541 addValueOrdinal(i, vOrd.getValOrd()); 00542 rowCnt++; 00543 } 00544 } 00545 } 00546 }
void LcsClusterAppendExecStream::startNewBlock | ( | ) | [protected, inherited] |
Prepare to write a fresh block.
Definition at line 378 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::builderBlock, LcsClusterAppendExecStream::colTupleDesc, LcsClusterAppendExecStream::firstRow, LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashBlock, LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::lastRow, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::pIndexBlock, and LcsClusterAppendExecStream::rowCnt.
Referenced by LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::writeBatch().
00379 { 00380 firstRow = lastRow; 00381 00382 // Get a new cluster page from the btree segment 00383 pIndexBlock = lcsBlockBuilder->allocateClusterPage(firstRow); 00384 00385 // Reset index block and block builder. 00386 lcsBlockBuilder->init( 00387 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock), 00388 builderBlock.get(), blockSize); 00389 00390 // reset Hashes 00391 for (uint i = 0; i < numColumns; i++) { 00392 hash[i].init( 00393 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize); 00394 } 00395 00396 // reset row count 00397 // NOTE: if the rowCnt is less than eight then we know we are carrying 00398 // over rows from previous block because the count did not end 00399 // on a boundary of 8 00400 if (rowCnt >= 8) { 00401 rowCnt = 0; 00402 } 00403 indexBlockDirty = false; 00404 00405 // Start writing a new block. 00406 lcsBlockBuilder->openNew(firstRow); 00407 }
void LcsClusterAppendExecStream::convertTuplesToCols | ( | ) | [protected, inherited] |
void LcsClusterAppendExecStream::addValueOrdinal | ( | uint | column, | |
uint16_t | vOrd | |||
) | [protected, inherited] |
Adds value ordinal to row array for new row.
Definition at line 548 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::rowBlock, and LcsClusterAppendExecStream::rowCnt.
Referenced by LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::writeBatch().
00549 { 00550 uint16_t *rowWordArray = (uint16_t *) rowBlock[column]; 00551 rowWordArray[rowCnt] = vOrd; 00552 00553 // since we added a row mark block as dirty 00554 indexBlockDirty = true; 00555 }
bool LcsClusterAppendExecStream::isRowArrayFull | ( | ) | [protected, inherited] |
True if row array is full.
Definition at line 557 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::nRowsMax, and LcsClusterAppendExecStream::rowCnt.
Referenced by LcsClusterAppendExecStream::compress().
00558 { 00559 if (rowCnt >= nRowsMax) { 00560 return true; 00561 } else { 00562 return false; 00563 } 00564 }
void LcsClusterAppendExecStream::writeBatch | ( | bool | lastBatch | ) | [protected, inherited] |
Writes a batch(run) to index block.
Batches have a multiple of 8 rows.
lastBatch | true if last batch |
Definition at line 566 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::addValueOrdinal(), count(), FixedBuffer, LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::lastRow, LCS_FIXED, LCS_VARIABLE, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::maxValueSize, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::rowBlock, LcsClusterAppendExecStream::rowCnt, LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::tempBuf, and LcsClusterAppendExecStream::writeBlock().
Referenced by LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::writeBlock().
00567 { 00568 uint16_t *oVals; 00569 uint leftOvers; 00570 PBuffer val; 00571 LcsBatchMode mode; 00572 uint i, j; 00573 uint origRowCnt, count = 0; 00574 00575 lastRow += rowCnt; 00576 00577 for (origRowCnt = rowCnt, i = 0; i < numColumns; i++) { 00578 rowCnt = origRowCnt; 00579 00580 // save max value size so we can read leftovers 00581 maxValueSize[i] = hash[i].getMaxValueSize(); 00582 00583 // Pick which compression mode to use (fixed, variable, or compressed) 00584 lcsBlockBuilder->pickCompressionMode( 00585 i, maxValueSize[i], rowCnt, &oVals, mode); 00586 leftOvers = rowCnt > 8 ? rowCnt % 8 : 0; 00587 00588 // all batches must end on an eight boundary so we move 00589 // values over eight boundary to the next batch. 00590 // if there are leftOvers or if the there are less than 00591 // eight values in this batch allocate buffer to store 00592 // values to be written to next batch 00593 if (leftOvers) { 00594 tempBuf[i].reset(new FixedBuffer[leftOvers * maxValueSize[i]]); 00595 count = leftOvers; 00596 00597 } else if (origRowCnt < 8) { 00598 tempBuf[i].reset(new FixedBuffer[origRowCnt * maxValueSize[i]]); 00599 count = origRowCnt; 00600 } else { 00601 // no values to write to next batch (ie on boundary of 8) 00602 tempBuf[i].reset(); 00603 } 00604 00605 // Write out the batch and collect the leftover rows in tempBuf 00606 if (LCS_FIXED == mode || LCS_VARIABLE == mode) { 00607 hash[i].prepareFixedOrVariableBatch( 00608 (PBuffer) rowBlock[i], rowCnt); 00609 lcsBlockBuilder->putFixedVarBatch( 00610 i, (uint16_t *) rowBlock[i], tempBuf[i].get()); 00611 if (mode == LCS_FIXED) { 00612 hash[i].clearFixedEntries(); 00613 } 00614 00615 } else { 00616 uint16_t numVals; 00617 00618 // write orderVals to oVals and remap val ords in row array 00619 hash[i].prepareCompressedBatch( 00620 (PBuffer) rowBlock[i], rowCnt, (uint16_t *) &numVals, oVals); 00621 lcsBlockBuilder->putCompressedBatch( 00622 i, (PBuffer) rowBlock[i], tempBuf[i].get()); 00623 } 00624 00625 // setup next batch 00626 rowCnt = 0; 00627 hash[i].startNewBatch(!lastBatch ? count : 0); 00628 } 00629 00630 //compensate for left over and rolled back rows 00631 if (!lastBatch) { 00632 lastRow -= count; 00633 } 00634 bool bStartNewBlock; 00635 bStartNewBlock = false; 00636 00637 // If we couldn't even fit 8 values into the batch (and this is not the 00638 // final batch), then the block must be full. putCompressedBatch()/ 00639 // putFixedVarBatch() assumed that this was the last batch, so they wrote 00640 // out these rows in a small batch. Roll back the entire batch (putting 00641 // rolled back results in tempBuf) and move to next block 00642 if (!lastBatch && origRowCnt < 8) { 00643 // rollback each batch 00644 for (i = 0; i < numColumns; i++) { 00645 lcsBlockBuilder->rollBackLastBatch(i, tempBuf[i].get()); 00646 } 00647 bStartNewBlock = true; 00648 } 00649 00650 // Should we move to a new block? Move if 00651 // (a) bStartNewBlock (we need to move just to write the current batch) 00652 // or (b) lcsBlockBuilder->isEndOfBlock() (there isn't room to even start 00653 // the next batch) 00654 if (bStartNewBlock || (!lastBatch && lcsBlockBuilder->isEndOfBlock())) { 00655 writeBlock(); 00656 startNewBlock(); 00657 } 00658 00659 // Add leftOvers or rolled back values to new batch 00660 if (!lastBatch) { 00661 for (i = 0; i < numColumns; i++) { 00662 rowCnt = 0; 00663 for (j = 0, val = tempBuf[i].get(); j < count; j++) { 00664 LcsHashValOrd vOrd; 00665 bool undoInsert = false; 00666 00667 hash[i].insert(val, &vOrd, &undoInsert); 00668 00669 // If we have leftovers they should fit in the current block 00670 // (because we moved to a new block above, if it was necessary) 00671 assert(!undoInsert); 00672 addValueOrdinal(i, vOrd.getValOrd()); 00673 rowCnt++; 00674 val += maxValueSize[i]; 00675 } 00676 } 00677 } 00678 00679 for (i = 0; i < numColumns; i++) { 00680 if (tempBuf[i].get()) { 00681 tempBuf[i].reset(); 00682 } 00683 } 00684 }
void LcsClusterAppendExecStream::writeBlock | ( | ) | [protected, inherited] |
Writes block to index when the block is full or this is the last block in the load.
Definition at line 686 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::lcsBlockBuilder, LcsClusterAppendExecStream::rowCnt, and LcsClusterAppendExecStream::writeBatch().
Referenced by LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::writeBatch().
00687 { 00688 if (indexBlockDirty) { 00689 // If the rowCnt is not zero, then the last batch was not on 00690 // a boundary of 8 so we need to write the last batch 00691 if (rowCnt) { 00692 writeBatch(true); 00693 00694 // REVIEW jvs 28-Nov-2005: it must be possible to eliminate 00695 // this circularity between writeBlock and writeBatch. 00696 00697 // Handle corner case. writeBatch may have written this block 00698 // to the btree 00699 if (!indexBlockDirty) { 00700 return; 00701 } 00702 } 00703 00704 // Tell block builder we are done so it can wrap up writing to the 00705 // index block 00706 lcsBlockBuilder->endBlock(); 00707 00708 // Dump out the page contents to trace if appropriate 00709 00710 indexBlockDirty = false; 00711 } 00712 }
bool LcsClusterAppendExecStream::getLastBlock | ( | PLcsClusterNode & | pBlock | ) | [protected, inherited] |
Gets last block written to disk so we can append to it, reading in the first rid value stored on the page.
pBlock | returns pointer to last cluster block |
Definition at line 409 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::firstRow, and LcsClusterAppendExecStream::lcsBlockBuilder.
Referenced by LcsClusterAppendExecStream::initLoad().
00410 { 00411 if (!lcsBlockBuilder->getLastClusterPageForWrite(pBlock, firstRow)) { 00412 return false; 00413 } else { 00414 return true; 00415 } 00416 }
void LcsClusterAppendExecStream::init | ( | ) | [protected, inherited] |
Initializes and sets up object with content specific to the load that will be carried out.
Definition at line 131 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::arraysAlloced, LcsClusterAppendExecStream::compressCalled, LcsClusterAppendExecStream::firstRow, LcsClusterAppendExecStream::indexBlockDirty, LcsClusterAppendExecStream::lastRow, LcsClusterAppendExecStream::numRowCompressed, LcsClusterAppendExecStream::pIndexBlock, LcsClusterAppendExecStream::rowCnt, and LcsClusterAppendExecStream::startRow.
Referenced by LcsClusterAppendExecStream::open(), and LcsClusterAppendExecStream::startNewBlock().
00132 { 00133 pIndexBlock = 0; 00134 firstRow = LcsRid(0); 00135 lastRow = LcsRid(0); 00136 startRow = LcsRid(0); 00137 rowCnt = 0; 00138 indexBlockDirty = false; 00139 arraysAlloced = false; 00140 compressCalled = false; 00141 numRowCompressed = 0; 00142 }
ExecStreamResult LcsClusterAppendExecStream::compress | ( | ExecStreamQuantum const & | quantum | ) | [protected, inherited] |
Processes rows for loading.
Calls WriteBatch once values cannot fit into a page
quantum | ExecStream quantum |
Definition at line 144 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::clusterColsTupleData, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, TupleAccessor::getCurrentByteCount(), LcsClusterAppendExecStream::getTupleForLoad(), LcsClusterAppendExecStream::hash, LcsClusterAppendExecStream::hashValOrd, LcsClusterAppendExecStream::isDone, LcsClusterAppendExecStream::isRowArrayFull(), LcsClusterAppendExecStream::lcsBlockBuilder, TupleAccessor::marshal(), ExecStreamQuantum::nTuplesMax, LcsClusterAppendExecStream::numColumns, LcsClusterAppendExecStream::outputTuple, LcsClusterAppendExecStream::outputTupleAccessor, LcsClusterAppendExecStream::outputTupleBuffer, LcsClusterAppendExecStream::postProcessTuple(), SingleOutputExecStream::pOutAccessor, LcsClusterAppendExecStream::rowCnt, LcsClusterAppendExecStream::writeBatch(), and LcsClusterAppendExecStream::writeBlock().
Referenced by LcsClusterAppendExecStream::execute().
00146 { 00147 uint i, j, k; 00148 bool canFit = false; 00149 bool undoInsert = false; 00150 00151 if (isDone) { 00152 // already returned final result 00153 pOutAccessor->markEOS(); 00154 return EXECRC_EOS; 00155 } 00156 00157 for (i = 0; i < quantum.nTuplesMax; i++) { 00158 // if we have finished processing the previous row, retrieve 00159 // the next cluster tuple and then convert the columns in the 00160 // cluster into individual tuples, one per cluster column 00161 ExecStreamResult rc = getTupleForLoad(); 00162 00163 // no more input; produce final row count 00164 if (rc == EXECRC_EOS) { 00165 // since we're done adding rows to the index, write the last batch 00166 // and block 00167 if (rowCnt) { 00168 // if rowCnt < 8 or a multiple of 8, force writeBatch to 00169 // treat this as the last batch 00170 if (rowCnt < 8 || (rowCnt % 8) == 0) { 00171 writeBatch(true); 00172 } else { 00173 writeBatch(false); 00174 } 00175 } 00176 00177 // Write out the last block and then free up resources 00178 // rather than waiting until stream close. This will keep 00179 // resource usage window smaller and avoid interference with 00180 // downstream processing such as writing to unclustered indexes. 00181 writeBlock(); 00182 if (lcsBlockBuilder) { 00183 lcsBlockBuilder->close(); 00184 } 00185 close(); 00186 00187 // outputTuple was already initialized to point to numRowCompressed/ 00188 // startRow in prepare() 00189 // Write a single outputTuple(numRowCompressed, [startRow]) 00190 // and indicate OVERFLOW. 00191 00192 outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get()); 00193 pOutAccessor->provideBufferForConsumption( 00194 outputTupleBuffer.get(), 00195 outputTupleBuffer.get() + 00196 outputTupleAccessor->getCurrentByteCount()); 00197 00198 isDone = true; 00199 return EXECRC_BUF_OVERFLOW; 00200 } else if (rc != EXECRC_YIELD) { 00201 return rc; 00202 } 00203 00204 // Go through each column value for current row and insert it. 00205 // If we run out of space then rollback all the columns that 00206 // I already inserted. 00207 undoInsert = false; 00208 00209 for (j = 0; j < numColumns; j++) { 00210 hash[j].insert( 00211 clusterColsTupleData[j], &hashValOrd[j], &undoInsert); 00212 00213 if (undoInsert) { 00214 // rollback cluster columns already inserted 00215 // j has not been incremented yet, so the condition should be 00216 // k <= j 00217 for (k = 0; k <= j; k++) { 00218 hash[k].undoInsert(clusterColsTupleData[k]); 00219 } 00220 break; 00221 } 00222 } 00223 00224 // Was there enough space to add this row? Note that the Insert() 00225 // calls above accounted for the space needed by addValueOrdinal() 00226 // below, so we don't have to worry about addValueOrdinal() running 00227 // out of space 00228 if (!undoInsert) { 00229 canFit = true; 00230 } else { 00231 canFit = false; 00232 } 00233 00234 if (canFit) { 00235 // Add the pointers from the batch to the data values 00236 for (j = 0; j < numColumns; j++) { 00237 addValueOrdinal(j, hashValOrd[j].getValOrd()); 00238 } 00239 00240 rowCnt++; 00241 00242 // if reach max rows that can fit in row array then write batch 00243 if (isRowArrayFull()) { 00244 writeBatch(false); 00245 } 00246 } else { 00247 // since we can't fit anymore values write out current batch 00248 writeBatch(false); 00249 00250 // restart using last value retrieved from stream because it 00251 // could not fit in the batch; by continuing we can avoid 00252 // a goto to jump back to the top of this for loop at the 00253 // expense of a harmless increment of the quantum 00254 continue; 00255 } 00256 00257 // only consume the tuple after we know the row can fit 00258 // on the current page 00259 postProcessTuple(); 00260 } 00261 00262 return EXECRC_QUANTUM_EXPIRED; 00263 }
void LcsClusterAppendExecStream::prepare | ( | LcsClusterAppendExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 31 of file LcsClusterAppendExecStream.cpp.
References SegPageLock::accessSegment(), LcsClusterAppendExecStream::blockSize, LcsClusterAppendExecStream::bufferLock, TupleData::compute(), LcsClusterAppendExecStream::initTupleLoadParams(), LcsClusterAppendExecStreamParams::inputProj, LcsClusterAppendExecStream::numRowCompressed, LcsClusterAppendExecStream::outputTuple, LcsClusterAppendExecStream::outputTupleAccessor, SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, ConduitExecStream::prepare(), BTreeExecStream::prepare(), SegmentAccessor::pSegment, ExecStreamParams::scratchAccessor, LcsClusterAppendExecStream::scratchAccessor, BTreeDescriptor::segmentAccessor, LcsClusterAppendExecStream::startRow, LcsClusterAppendExecStream::tableColsTupleDesc, and BTreeExecStream::treeDescriptor.
Referenced by prepare().
00033 { 00034 BTreeExecStream::prepare(params); 00035 ConduitExecStream::prepare(params); 00036 00037 tableColsTupleDesc = pInAccessor->getTupleDesc(); 00038 initTupleLoadParams(params.inputProj); 00039 00040 // setup descriptors, accessors and data to access only the columns 00041 // for this cluster, based on the input projection 00042 00043 pInAccessor->bindProjection(params.inputProj); 00044 00045 // setup bufferLock to access temporary large page blocks 00046 00047 scratchAccessor = params.scratchAccessor; 00048 bufferLock.accessSegment(scratchAccessor); 00049 00050 // The output stream from the loader is either a single column representing 00051 // the number of rows loaded or two columns -- number of rows loaded and 00052 // starting rid value. The latter applies when there are 00053 // downstream create indexes 00054 00055 TupleDescriptor outputTupleDesc; 00056 00057 outputTupleDesc = pOutAccessor->getTupleDesc(); 00058 outputTuple.compute(outputTupleDesc); 00059 outputTuple[0].pData = (PConstBuffer) &numRowCompressed; 00060 if (outputTupleDesc.size() > 1) { 00061 outputTuple[1].pData = (PConstBuffer) &startRow; 00062 } 00063 00064 outputTupleAccessor = & pOutAccessor->getScratchTupleAccessor(); 00065 00066 blockSize = treeDescriptor.segmentAccessor.pSegment->getUsablePageSize(); 00067 00068 }
void BTreeExecStream::prepare | ( | BTreeExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 30 of file BTreeExecStream.cpp.
References BTreeExecStream::copyParamsToDescriptor(), ExecStreamParams::pCacheAccessor, SingleOutputExecStream::prepare(), BTreeParams::pRootMap, BTreeExecStream::pRootMap, BTreeParams::rootPageIdParamId, BTreeExecStream::rootPageIdParamId, ExecStreamParams::scratchAccessor, BTreeExecStream::scratchAccessor, and BTreeExecStream::treeDescriptor.
Referenced by LcsClusterAppendExecStream::prepare(), LbmGeneratorExecStream::prepare(), BTreeReadExecStream::prepare(), and BTreeInsertExecStream::prepare().
00031 { 00032 SingleOutputExecStream::prepare(params); 00033 00034 copyParamsToDescriptor(treeDescriptor,params,params.pCacheAccessor); 00035 scratchAccessor = params.scratchAccessor; 00036 pRootMap = params.pRootMap; 00037 rootPageIdParamId = params.rootPageIdParamId; 00038 }
void SingleOutputExecStream::prepare | ( | SingleOutputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 48 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().
Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().
00049 { 00050 ExecStream::prepare(params); 00051 assert(pOutAccessor); 00052 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00053 if (pOutAccessor->getTupleDesc().empty()) { 00054 assert(!params.outputTupleDesc.empty()); 00055 pOutAccessor->setTupleShape( 00056 params.outputTupleDesc, 00057 params.outputTupleFormat); 00058 } 00059 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void ConduitExecStream::prepare | ( | ConduitExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 42 of file ConduitExecStream.cpp.
References SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
Referenced by ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmNormalizerExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SegBufferReaderExecStream::prepare(), SegBufferExecStream::prepare(), ScratchBufferExecStream::prepare(), ReshapeExecStream::prepare(), DoubleBufferExecStream::prepare(), CopyExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), and CalcExecStream::prepare().
00043 { 00044 SingleInputExecStream::prepare(params); 00045 00046 if (params.outputTupleDesc.empty()) { 00047 pOutAccessor->setTupleShape( 00048 pInAccessor->getTupleDesc(), 00049 pInAccessor->getTupleFormat()); 00050 } 00051 00052 SingleOutputExecStream::prepare(params); 00053 }
void SingleInputExecStream::prepare | ( | SingleInputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 44 of file SingleInputExecStream.cpp.
References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().
Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().
00045 { 00046 ExecStream::prepare(params); 00047 00048 assert(pInAccessor); 00049 assert(pInAccessor->getProvision() == getInputBufProvision()); 00050 }
ExecStreamResult LcsClusterAppendExecStream::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual, inherited] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 117 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::compress().
00119 { 00120 return compress(quantum); 00121 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual, inherited] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 93 of file ExecStream.cpp.
References EXEC_RESOURCE_ACCURATE.
Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().
00097 { 00098 getResourceRequirements(minQuantity, optQuantity); 00099 optType = EXEC_RESOURCE_ACCURATE; 00100 }
void LcsClusterAppendExecStream::closeImpl | ( | ) | [virtual, inherited] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from BTreeExecStream.
Definition at line 123 of file LcsClusterAppendExecStream.cpp.
References LcsClusterAppendExecStream::close(), ExecStream::closeImpl(), BTreeExecStream::closeImpl(), and LcsClusterAppendExecStream::outputTupleBuffer.
00124 { 00125 BTreeExecStream::closeImpl(); 00126 ConduitExecStream::closeImpl(); 00127 outputTupleBuffer.reset(); 00128 close(); 00129 }
ExecStreamBufProvision LcsClusterAppendExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented from SingleOutputExecStream.
Definition at line 737 of file LcsClusterAppendExecStream.cpp.
References BUFPROV_PRODUCER.
00738 { 00739 return BUFPROV_PRODUCER; 00740 }
SharedBTreeReader BTreeExecStream::newReader | ( | ) | [protected, virtual, inherited] |
Reimplemented in BTreePrefetchSearchExecStream.
Definition at line 67 of file BTreeExecStream.cpp.
References BTreeExecStream::pBTreeAccessBase, BTreeExecStream::pBTreeReader, and BTreeExecStream::treeDescriptor.
Referenced by BTreeReadExecStream::open().
00068 { 00069 SharedBTreeReader pReader = SharedBTreeReader( 00070 new BTreeReader(treeDescriptor)); 00071 pBTreeAccessBase = pBTreeReader = pReader; 00072 return pReader; 00073 }
SharedBTreeWriter BTreeExecStream::newWriter | ( | bool | monotonic = false |
) | [protected, inherited] |
Definition at line 75 of file BTreeExecStream.cpp.
References BTreeExecStream::pBTreeAccessBase, BTreeExecStream::pBTreeReader, BTreeExecStream::scratchAccessor, and BTreeExecStream::treeDescriptor.
Referenced by FtrsTableWriter::createIndexWriter(), and BTreeInsertExecStream::open().
00076 { 00077 SharedBTreeWriter pWriter = SharedBTreeWriter( 00078 new BTreeWriter(treeDescriptor,scratchAccessor,monotonic)); 00079 pBTreeAccessBase = pBTreeReader = pWriter; 00080 return pWriter; 00081 }
SharedBTreeWriter BTreeExecStream::newWriter | ( | BTreeExecStreamParams const & | params | ) | [static, inherited] |
Definition at line 83 of file BTreeExecStream.cpp.
References BTreeExecStream::copyParamsToDescriptor(), ExecStreamParams::pCacheAccessor, ExecStreamParams::scratchAccessor, and BTreeExecStream::treeDescriptor.
00085 { 00086 BTreeDescriptor treeDescriptor; 00087 copyParamsToDescriptor(treeDescriptor,params,params.pCacheAccessor); 00088 return SharedBTreeWriter( 00089 new BTreeWriter( 00090 treeDescriptor,params.scratchAccessor)); 00091 }
void BTreeExecStream::endSearch | ( | ) | [protected, virtual, inherited] |
Forgets the current reader or writer's search, releasing any page locks.
Definition at line 107 of file BTreeExecStream.cpp.
References BTreeExecStream::pBTreeReader.
Referenced by BTreeExecStream::closeImpl(), and BTreeExecStream::open().
00108 { 00109 if (pBTreeReader && pBTreeReader->isSingular() == false) { 00110 pBTreeReader->endSearch(); 00111 } 00112 }
void BTreeExecStream::copyParamsToDescriptor | ( | BTreeDescriptor & | , | |
BTreeParams const & | , | |||
SharedCacheAccessor const & | ||||
) | [static, inherited] |
Definition at line 93 of file BTreeExecStream.cpp.
References BTreeParams::keyProj, BTreeDescriptor::keyProjection, BTreeParams::pageOwnerId, BTreeDescriptor::pageOwnerId, SegmentAccessor::pCacheAccessor, BTreeParams::pSegment, SegmentAccessor::pSegment, BTreeParams::rootPageId, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeParams::segmentId, BTreeDescriptor::segmentId, BTreeExecStream::treeDescriptor, BTreeParams::tupleDesc, and BTreeDescriptor::tupleDescriptor.
Referenced by BTreeExecStream::newWriter(), LbmSplicerExecStream::prepare(), and BTreeExecStream::prepare().
00097 { 00098 treeDescriptor.segmentAccessor.pSegment = params.pSegment; 00099 treeDescriptor.segmentAccessor.pCacheAccessor = pCacheAccessor; 00100 treeDescriptor.tupleDescriptor = params.tupleDesc; 00101 treeDescriptor.keyProjection = params.keyProj; 00102 treeDescriptor.rootPageId = params.rootPageId; 00103 treeDescriptor.segmentId = params.segmentId; 00104 treeDescriptor.pageOwnerId = params.pageOwnerId; 00105 }
void SingleOutputExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Implements ExecStream.
Reimplemented in ConduitExecStream, and ConfluenceExecStream.
Definition at line 35 of file SingleOutputExecStream.cpp.
void ConduitExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Reimplemented from SingleInputExecStream.
Definition at line 30 of file ConduitExecStream.cpp.
References SingleInputExecStream::setInputBufAccessors().
00032 { 00033 SingleInputExecStream::setInputBufAccessors(inAccessors); 00034 }
void SingleOutputExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Implements ExecStream.
Reimplemented in ConduitExecStream.
Definition at line 41 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::pOutAccessor.
Referenced by ConduitExecStream::setOutputBufAccessors().
00043 { 00044 assert(outAccessors.size() == 1); 00045 pOutAccessor = outAccessors[0]; 00046 }
void ConduitExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Reimplemented from SingleInputExecStream.
Definition at line 36 of file ConduitExecStream.cpp.
References SingleOutputExecStream::setOutputBufAccessors().
00038 { 00039 SingleOutputExecStream::setOutputBufAccessors(outAccessors); 00040 }
bool ExecStream::canEarlyClose | ( | ) | [virtual, inherited] |
Reimplemented in SegBufferWriterExecStream.
Definition at line 49 of file ExecStream.cpp.
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual, inherited] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 111 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.
Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().
00113 { 00114 resourceAllocation = quantity; 00115 if (pQuotaAccessor) { 00116 pQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00117 } 00118 if (pScratchQuotaAccessor) { 00119 pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00120 } 00121 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
ExecStreamBufProvision ExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented in ConfluenceExecStream, DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferWriterExecStream, SingleInputExecStream, and JavaTransformExecStream.
Definition at line 182 of file ExecStream.cpp.
References BUFPROV_NONE.
00183 { 00184 return BUFPROV_NONE; 00185 }
ExecStreamBufProvision SingleInputExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from ExecStream.
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, and SegBufferWriterExecStream.
Definition at line 62 of file SingleInputExecStream.cpp.
References BUFPROV_PRODUCER.
Referenced by SingleInputExecStream::prepare().
00063 { 00064 return BUFPROV_PRODUCER; 00065 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
ExecStreamResult ConduitExecStream::precheckConduitBuffers | ( | ) | [protected, inherited] |
Checks the state of the input and output buffers.
If input empty, requests production. If input EOS, propagates that to output buffer. If output full, returns EXECRC_OVERFLOW.
Definition at line 61 of file ConduitExecStream.cpp.
References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, SingleInputExecStream::pInAccessor, and SingleOutputExecStream::pOutAccessor.
Referenced by ExternalSortExecStreamImpl::execute(), FtrsTableWriterExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreeInsertExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), ReshapeExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), and CalcExecStream::execute().
00062 { 00063 switch (pInAccessor->getState()) { 00064 case EXECBUF_EMPTY: 00065 pInAccessor->requestProduction(); 00066 return EXECRC_BUF_UNDERFLOW; 00067 case EXECBUF_UNDERFLOW: 00068 return EXECRC_BUF_UNDERFLOW; 00069 case EXECBUF_EOS: 00070 pOutAccessor->markEOS(); 00071 return EXECRC_EOS; 00072 case EXECBUF_NONEMPTY: 00073 case EXECBUF_OVERFLOW: 00074 break; 00075 default: 00076 permAssert(false); 00077 } 00078 if (pOutAccessor->getState() == EXECBUF_OVERFLOW) { 00079 return EXECRC_BUF_OVERFLOW; 00080 } 00081 return EXECRC_YIELD; 00082 }
DynamicParamId LcsClusterReplaceExecStream::newClusterRootParamId [private] |
Dynamic parameter id corresponding to the root pageId of the new cluster, if the pageId is required downstream.
Set to 0 if there's no need for the parameter.
Definition at line 61 of file LcsClusterReplaceExecStream.h.
Tuple descriptor representing the rid column plus the cluster columns to be loaded.
Definition at line 67 of file LcsClusterReplaceExecStream.h.
Referenced by initTupleLoadParams().
Tuple data for the projected input tuple.
Definition at line 72 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), and initTupleLoadParams().
std::vector<UnalignedAttributeAccessor> LcsClusterReplaceExecStream::attrAccessors [private] |
Accessors for loading column values from the original cluster.
Definition at line 77 of file LcsClusterReplaceExecStream.h.
Referenced by initTupleLoadParams(), and readOrigClusterRow().
The underlying snapshot segment for the cluster.
Definition at line 82 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), and open().
Reader for the original cluster.
Definition at line 87 of file LcsClusterReplaceExecStream.h.
Referenced by close(), initTupleLoadParams(), open(), and readOrigClusterRow().
Number of rows in the original cluster.
Definition at line 92 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), and open().
PageId LcsClusterReplaceExecStream::origRootPageId [private] |
The rootPageId of the original rid to pageId btree map.
Definition at line 97 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), and prepare().
LcsRid LcsClusterReplaceExecStream::currLoadRid [private] |
The current rid being loaded.
Definition at line 102 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), open(), postProcessTuple(), and readOrigClusterRow().
bool LcsClusterReplaceExecStream::needTuple [private] |
True if a new tuple needs to be provided for the load.
Definition at line 107 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), open(), and postProcessTuple().
LcsRid LcsClusterReplaceExecStream::currInputRid [private] |
The rid value of the last input row read.
Definition at line 112 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), open(), and postProcessTuple().
Accessor for projecting cluster tuple data from the input row.
Definition at line 117 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), and initTupleLoadParams().
TupleData used to load column values from the original cluster.
Definition at line 122 of file LcsClusterReplaceExecStream.h.
Referenced by initTupleLoadParams(), and readOrigClusterRow().
bool LcsClusterReplaceExecStream::newData [private] |
True if at least one existing row is being replaced with a new value.
Definition at line 127 of file LcsClusterReplaceExecStream.h.
Referenced by getTupleForLoad(), and open().
uint LcsClusterAppendExecStream::blockSize [protected, inherited] |
Space available on page blocks for writing cluster data.
Definition at line 58 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::prepare(), and LcsClusterAppendExecStream::startNewBlock().
TupleDescriptor LcsClusterAppendExecStream::tableColsTupleDesc [protected, inherited] |
Tuple descriptor for the tuple representing all cluster columns across the table that this cluster is a part of.
Definition at line 64 of file LcsClusterAppendExecStream.h.
Referenced by initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), and LcsClusterAppendExecStream::prepare().
TupleData LcsClusterAppendExecStream::clusterColsTupleData [protected, inherited] |
Tuple data for the tuple datums representing only this cluster.
Definition at line 69 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), and readOrigClusterRow().
TupleDescriptor LcsClusterAppendExecStream::clusterColsTupleDesc [protected, inherited] |
Tuple descriptors for the columns that are part of this cluster.
Definition at line 74 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), and LcsClusterAppendExecStream::initTupleLoadParams().
boost::scoped_array<TupleDescriptor> LcsClusterAppendExecStream::colTupleDesc [protected, inherited] |
Individual tuple descriptors for each column in the cluster.
Definition at line 79 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), and LcsClusterAppendExecStream::startNewBlock().
SegmentAccessor LcsClusterAppendExecStream::scratchAccessor [protected, inherited] |
Scratch accessor for allocating large buffer pages.
Reimplemented from BTreeExecStream.
Definition at line 84 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::prepare().
ClusterPageLock LcsClusterAppendExecStream::bufferLock [protected, inherited] |
Lock on scratch page.
Definition at line 89 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::prepare().
bool LcsClusterAppendExecStream::overwrite [protected, inherited] |
bool LcsClusterAppendExecStream::isDone [protected, inherited] |
Whether row count has been produced.
Definition at line 99 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::open().
TupleData LcsClusterAppendExecStream::outputTuple [protected, inherited] |
Output tuple containing count of number of rows loaded.
Definition at line 104 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::prepare().
TupleAccessor* LcsClusterAppendExecStream::outputTupleAccessor [protected, inherited] |
A reference to the output accessor contained in SingleOutputExecStream::pOutAccessor.
Definition at line 110 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::open(), and LcsClusterAppendExecStream::prepare().
boost::scoped_array<FixedBuffer> LcsClusterAppendExecStream::outputTupleBuffer [protected, inherited] |
buffer holding the outputTuple to provide to the consumers
Definition at line 115 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::closeImpl(), LcsClusterAppendExecStream::compress(), and LcsClusterAppendExecStream::open().
bool LcsClusterAppendExecStream::compressCalled [protected, inherited] |
True if execute has been called at least once.
Definition at line 120 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::init(), and LcsClusterAppendExecStream::initLoad().
boost::scoped_array<LcsHash> LcsClusterAppendExecStream::hash [protected, inherited] |
Array of hashes, one per cluster column.
Definition at line 125 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBatch().
uint LcsClusterAppendExecStream::numColumns [protected, inherited] |
Number of columns in the cluster.
Definition at line 130 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::getResourceRequirements(), LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), LcsClusterAppendExecStream::initTupleLoadParams(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBatch().
boost::scoped_array<PBuffer> LcsClusterAppendExecStream::rowBlock [protected, inherited] |
Array of temporary blocks for row array.
Definition at line 135 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::writeBatch().
uint LcsClusterAppendExecStream::nRowsMax [protected, inherited] |
Maximum number of values that can be stored in m_rowBlock.
Definition at line 140 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::isRowArrayFull().
boost::scoped_array<PBuffer> LcsClusterAppendExecStream::hashBlock [protected, inherited] |
Array of temporary blocks for hash table.
Definition at line 145 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), and LcsClusterAppendExecStream::startNewBlock().
boost::scoped_array<PBuffer> LcsClusterAppendExecStream::builderBlock [protected, inherited] |
Array of temporary blocks used by ClusterNodeWriter.
Definition at line 150 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::startNewBlock().
uint LcsClusterAppendExecStream::rowCnt [protected, inherited] |
Number of rows loaded into the current set of batches.
Definition at line 155 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::isRowArrayFull(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::writeBatch(), and LcsClusterAppendExecStream::writeBlock().
bool LcsClusterAppendExecStream::indexBlockDirty [protected, inherited] |
True if index blocks need to be written to disk.
Definition at line 160 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::addValueOrdinal(), LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBlock().
LcsRid LcsClusterAppendExecStream::firstRow [protected, inherited] |
Starting rowid in a cluster page.
Definition at line 165 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::getLastBlock(), LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::startNewBlock().
LcsRid LcsClusterAppendExecStream::lastRow [protected, inherited] |
Last rowid in the last batch.
Definition at line 170 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), and LcsClusterAppendExecStream::writeBatch().
LcsRid LcsClusterAppendExecStream::startRow [protected, inherited] |
Definition at line 174 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::prepare().
SharedLcsClusterNodeWriter LcsClusterAppendExecStream::lcsBlockBuilder [protected, inherited] |
Page builder object.
Definition at line 179 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::close(), LcsClusterAppendExecStream::compress(), LcsClusterAppendExecStream::getLastBlock(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), LcsClusterAppendExecStream::startNewBlock(), LcsClusterAppendExecStream::writeBatch(), and LcsClusterAppendExecStream::writeBlock().
boost::scoped_array<LcsHashValOrd> LcsClusterAppendExecStream::hashValOrd [protected, inherited] |
Row value ordinal returned from hash, one per cluster column.
Definition at line 184 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), and LcsClusterAppendExecStream::compress().
boost::scoped_array<boost::scoped_array<FixedBuffer> > LcsClusterAppendExecStream::tempBuf [protected, inherited] |
Temporary buffers used by WriteBatch.
Definition at line 189 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), and LcsClusterAppendExecStream::writeBatch().
boost::scoped_array<uint> LcsClusterAppendExecStream::maxValueSize [protected, inherited] |
Max size for each column cluster used by WriteBatch.
Definition at line 194 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), LcsClusterAppendExecStream::close(), and LcsClusterAppendExecStream::writeBatch().
bool LcsClusterAppendExecStream::arraysAlloced [protected, inherited] |
Indicates where or not we have already allocated arrays.
Definition at line 199 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::allocArrays(), and LcsClusterAppendExecStream::init().
PLcsClusterNode LcsClusterAppendExecStream::pIndexBlock [protected, inherited] |
Buffer pointing to cluster page that will actually be written.
Definition at line 204 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::initLoad(), LcsClusterAppendExecStream::loadExistingBlock(), and LcsClusterAppendExecStream::startNewBlock().
RecordNum LcsClusterAppendExecStream::numRowCompressed [protected, inherited] |
Total number of rows loaded by this object.
Definition at line 209 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterAppendExecStream::init(), LcsClusterAppendExecStream::postProcessTuple(), and LcsClusterAppendExecStream::prepare().
BTreeDescriptor BTreeExecStream::treeDescriptor [protected, inherited] |
Definition at line 113 of file BTreeExecStream.h.
Referenced by BTreeInsertExecStream::buildTree(), BTreeExecStream::closeImpl(), BTreeExecStream::copyParamsToDescriptor(), getTupleForLoad(), LcsClusterAppendExecStream::initLoad(), initTupleLoadParams(), BTreePrefetchSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), BTreeExecStream::newWriter(), open(), BTreeSearchExecStream::open(), BTreePrefetchSearchExecStream::open(), BTreeInsertExecStream::open(), BTreeExecStream::open(), prepare(), LcsClusterAppendExecStream::prepare(), LbmSearchExecStream::prepare(), LbmGeneratorExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), BTreeExecStream::prepare(), and BTreeInsertExecStream::truncateTree().
BTreeOwnerRootMap* BTreeExecStream::pRootMap [protected, inherited] |
Definition at line 115 of file BTreeExecStream.h.
Referenced by BTreeExecStream::closeImpl(), BTreeExecStream::open(), and BTreeExecStream::prepare().
SharedBTreeAccessBase BTreeExecStream::pBTreeAccessBase [protected, inherited] |
Definition at line 116 of file BTreeExecStream.h.
Referenced by BTreeInsertExecStream::closeImpl(), BTreeExecStream::closeImpl(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), BTreeExecStream::newWriter(), and BTreeExecStream::open().
SharedBTreeReader BTreeExecStream::pBTreeReader [protected, inherited] |
Definition at line 117 of file BTreeExecStream.h.
Referenced by BTreeExecStream::endSearch(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), and BTreeExecStream::newWriter().
DynamicParamId BTreeExecStream::rootPageIdParamId [protected, inherited] |
Definition at line 118 of file BTreeExecStream.h.
Referenced by BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), BTreeInsertExecStream::prepare(), and BTreeExecStream::prepare().
SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited] |
Definition at line 56 of file SingleOutputExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().
SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited] |
Definition at line 51 of file SingleInputExecStream.h.
Referenced by SortedAggExecStream::compareGroupByKeys(), ExternalSortExecStreamImpl::computeFirstResult(), ExternalSortExecStreamImpl::execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), SegBufferWriterExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().