#include <LcsClusterAppendExecStream.h>
Inheritance diagram for LcsClusterAppendExecStream:
Public Member Functions | |
virtual void | prepare (LcsClusterAppendExecStreamParams const ¶ms) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | closeImpl () |
Implements ClosableObject. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual void | prepare (BTreeExecStreamParams const ¶ms) |
virtual void | prepare (SingleOutputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual bool | canEarlyClose () |
| |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
bool | isClosed () const |
| |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual void | prepare (ConduitExecStreamParams const ¶ms) |
virtual void | prepare (SingleInputExecStreamParams const ¶ms) |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
Static Public Member Functions | |
static SharedBTreeWriter | newWriter (BTreeExecStreamParams const ¶ms) |
static void | copyParamsToDescriptor (BTreeDescriptor &, BTreeParams const &, SharedCacheAccessor const &) |
Protected Member Functions | |
void | allocArrays () |
Allocate memory for arrays. | |
void | initLoad () |
Initializes the load. | |
void | loadExistingBlock () |
Populates row and hash arrays from existing index block. | |
void | startNewBlock () |
Prepare to write a fresh block. | |
void | convertTuplesToCols () |
Given a TupleData representing all columns in a cluster, converts each column into its own TupleData. | |
void | addValueOrdinal (uint column, uint16_t vOrd) |
Adds value ordinal to row array for new row. | |
bool | isRowArrayFull () |
True if row array is full. | |
void | writeBatch (bool lastBatch) |
Writes a batch(run) to index block. | |
void | writeBlock () |
Writes block to index when the block is full or this is the last block in the load. | |
bool | getLastBlock (PLcsClusterNode &pBlock) |
Gets last block written to disk so we can append to it, reading in the first rid value stored on the page. | |
void | init () |
Initializes and sets up object with content specific to the load that will be carried out. | |
ExecStreamResult | compress (ExecStreamQuantum const &quantum) |
Processes rows for loading. | |
virtual void | close () |
Writes out the last pending batches and btree pages. | |
virtual void | initTupleLoadParams (const TupleProjection &inputProj) |
Initializes member fields corresponding to the data to be loaded. | |
virtual ExecStreamResult | getTupleForLoad () |
Retrieves the tuple that will be loaded into the cluster. | |
virtual void | postProcessTuple () |
Performs post-processing after a tuple has been loaded. | |
virtual SharedBTreeReader | newReader () |
SharedBTreeWriter | newWriter (bool monotonic=false) |
virtual void | endSearch () |
Forgets the current reader or writer's search, releasing any page locks. | |
ExecStreamResult | precheckConduitBuffers () |
Checks the state of the input and output buffers. | |
Protected Attributes | |
uint | blockSize |
Space available on page blocks for writing cluster data. | |
TupleDescriptor | tableColsTupleDesc |
Tuple descriptor for the tuple representing all cluster columns across the table that this cluster is a part of. | |
TupleData | clusterColsTupleData |
Tuple data for the tuple datums representing only this cluster. | |
TupleDescriptor | clusterColsTupleDesc |
Tuple descriptors for the columns that are part of this cluster. | |
boost::scoped_array< TupleDescriptor > | colTupleDesc |
Individual tuple descriptors for each column in the cluster. | |
SegmentAccessor | scratchAccessor |
Scratch accessor for allocating large buffer pages. | |
ClusterPageLock | bufferLock |
Lock on scratch page. | |
bool | overwrite |
True if overwriting all existing data. | |
bool | isDone |
Whether row count has been produced. | |
TupleData | outputTuple |
Output tuple containing count of number of rows loaded. | |
TupleAccessor * | outputTupleAccessor |
A reference to the output accessor contained in SingleOutputExecStream::pOutAccessor. | |
boost::scoped_array< FixedBuffer > | outputTupleBuffer |
buffer holding the outputTuple to provide to the consumers | |
bool | compressCalled |
True if execute has been called at least once. | |
boost::scoped_array< LcsHash > | hash |
Array of hashes, one per cluster column. | |
uint | numColumns |
Number of columns in the cluster. | |
boost::scoped_array< PBuffer > | rowBlock |
Array of temporary blocks for row array. | |
uint | nRowsMax |
Maximum number of values that can be stored in m_rowBlock. | |
boost::scoped_array< PBuffer > | hashBlock |
Array of temporary blocks for hash table. | |
boost::scoped_array< PBuffer > | builderBlock |
Array of temporary blocks used by ClusterNodeWriter. | |
uint | rowCnt |
Number of rows loaded into the current set of batches. | |
bool | indexBlockDirty |
True if index blocks need to be written to disk. | |
LcsRid | firstRow |
Starting rowid in a cluster page. | |
LcsRid | lastRow |
Last rowid in the last batch. | |
LcsRid | startRow |
SharedLcsClusterNodeWriter | lcsBlockBuilder |
Page builder object. | |
boost::scoped_array< LcsHashValOrd > | hashValOrd |
Row value ordinal returned from hash, one per cluster column. | |
boost::scoped_array< boost::scoped_array< FixedBuffer > > | tempBuf |
Temporary buffers used by WriteBatch. | |
boost::scoped_array< uint > | maxValueSize |
Max size for each column cluster used by WriteBatch. | |
bool | arraysAlloced |
Indicates where or not we have already allocated arrays. | |
PLcsClusterNode | pIndexBlock |
Buffer pointing to cluster page that will actually be written. | |
RecordNum | numRowCompressed |
Total number of rows loaded by this object. | |
BTreeDescriptor | treeDescriptor |
BTreeOwnerRootMap * | pRootMap |
SharedBTreeAccessBase | pBTreeAccessBase |
SharedBTreeReader | pBTreeReader |
DynamicParamId | rootPageIdParamId |
SharedExecStreamBufAccessor | pOutAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
SharedExecStreamBufAccessor | pInAccessor |
Definition at line 50 of file LcsClusterAppendExecStream.h.
void LcsClusterAppendExecStream::allocArrays | ( | ) | [protected] |
Allocate memory for arrays.
Definition at line 714 of file LcsClusterAppendExecStream.cpp.
References arraysAlloced, builderBlock, hash, hashBlock, hashValOrd, maxValueSize, numColumns, rowBlock, and tempBuf.
Referenced by initLoad().
00715 { 00716 // allocate arrays only if they have not been allocated already 00717 if (arraysAlloced) { 00718 return; 00719 } 00720 arraysAlloced = true; 00721 00722 // instantiate hashes 00723 hash.reset(new LcsHash[numColumns]); 00724 00725 // allocate pointers for row, hash blocks, other arrays 00726 rowBlock.reset(new PBuffer[numColumns]); 00727 hashBlock.reset(new PBuffer[numColumns]); 00728 00729 builderBlock.reset(new PBuffer[numColumns]); 00730 00731 hashValOrd.reset(new LcsHashValOrd[numColumns]); 00732 tempBuf.reset(new boost::scoped_array<FixedBuffer>[numColumns]); 00733 maxValueSize.reset(new uint[numColumns]); 00734 }
void LcsClusterAppendExecStream::initLoad | ( | ) | [protected] |
Initializes the load.
This method should only be called when the input stream has data available to read.
Definition at line 285 of file LcsClusterAppendExecStream.cpp.
References allocArrays(), SegNodeLock< Node >::allocatePage(), blockSize, bufferLock, builderBlock, clusterColsTupleDesc, colTupleDesc, compressCalled, getLastBlock(), SegPageLock::getPage(), TraceSource::getSharedTraceTarget(), TraceSource::getTraceSourceName(), CachePage::getWritableData(), hash, hashBlock, lcsBlockBuilder, loadExistingBlock(), nRowsMax, numColumns, pIndexBlock, rowBlock, scratchAccessor, startNewBlock(), startRow, BTreeExecStream::treeDescriptor, and SegPageLock::unlock().
Referenced by LcsClusterReplaceExecStream::getTupleForLoad(), and getTupleForLoad().
00286 { 00287 // If this is the first time this method is called, then 00288 // start a new block (for the new table), or load the last block 00289 // (of the existing table). We do this here rather than in 00290 // init() because for INSERT into T as SELECT * from T, 00291 // we need to make sure that we extract all the data from 00292 // T before modifying the blocks there; hence that's why this 00293 // method should not be called until there is input available. 00294 // Use the boolean to ensure that initialization of cluster page 00295 // is only done once. 00296 00297 if (!compressCalled) { 00298 compressCalled = true; 00299 00300 // The dynamic allocated memory in lcsBlockBuilder is allocated for 00301 // every LcsClusterAppendExecStream.open() and deallocated for every 00302 // LcsClusterAppendExecStream.closeImpl(). The dynamic memory is not 00303 // reused across calls(e.g. when issueing the same statement twice). 00304 lcsBlockBuilder = SharedLcsClusterNodeWriter( 00305 new LcsClusterNodeWriter( 00306 treeDescriptor, 00307 scratchAccessor, 00308 clusterColsTupleDesc, 00309 getSharedTraceTarget(), 00310 getTraceSourceName())); 00311 00312 allocArrays(); 00313 00314 // get blocks from cache to use as temporary space and initialize arrays 00315 for (uint i = 0; i < numColumns; i++) { 00316 bufferLock.allocatePage(); 00317 rowBlock[i] = bufferLock.getPage().getWritableData(); 00318 bufferLock.unlock(); 00319 00320 bufferLock.allocatePage(); 00321 hashBlock[i] = bufferLock.getPage().getWritableData(); 00322 bufferLock.unlock(); 00323 00324 bufferLock.allocatePage(); 00325 builderBlock[i] = bufferLock.getPage().getWritableData(); 00326 bufferLock.unlock(); 00327 00328 hash[i].init( 00329 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize); 00330 } 00331 00332 nRowsMax = blockSize / sizeof(uint16_t); 00333 00334 // if the index exists, get last block written 00335 00336 PLcsClusterNode pExistingIndexBlock; 00337 00338 bool found = getLastBlock(pExistingIndexBlock); 00339 if (found) { 00340 // indicate we are updating a leaf 00341 pIndexBlock = pExistingIndexBlock; 00342 00343 // extract rows and values from last batch so we can 00344 // add to it. 00345 loadExistingBlock(); 00346 } else { 00347 // Start writing a new block 00348 startNewBlock(); 00349 startRow = LcsRid(0); 00350 } 00351 } 00352 }
void LcsClusterAppendExecStream::loadExistingBlock | ( | ) | [protected] |
Populates row and hash arrays from existing index block.
Definition at line 418 of file LcsClusterAppendExecStream.cpp.
References addValueOrdinal(), blockSize, builderBlock, firstRow, FixedBuffer, LcsHashValOrd::getValOrd(), hash, indexBlockDirty, lastRow, lcsBlockBuilder, numColumns, pIndexBlock, rowCnt, startNewBlock(), startRow, and writeBlock().
Referenced by initLoad().
00419 { 00420 boost::scoped_array<uint> numVals; // number of values in block 00421 boost::scoped_array<uint16_t> lastValOff; 00422 boost::scoped_array<boost::scoped_array<FixedBuffer> > aLeftOverBufs; 00423 // array of buffers to hold 00424 // rolled back data for each 00425 // column 00426 uint anLeftOvers; // number of leftover rows for 00427 // each col; since the value is 00428 // same for every column, no need 00429 // for this to be an array 00430 boost::scoped_array<uint> aiFixedSize; // how much space was used for 00431 // each column; should be 00432 // equal for each value in a column 00433 LcsHashValOrd vOrd; 00434 00435 uint i, j; 00436 RecordNum startRowCnt; 00437 RecordNum nrows; 00438 00439 lcsBlockBuilder->init( 00440 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock), 00441 builderBlock.get(), blockSize); 00442 00443 lastValOff.reset(new uint16_t[numColumns]); 00444 numVals.reset(new uint[numColumns]); 00445 00446 // REVIEW jvs 28-Nov-2005: A simpler approach to this whole problem 00447 // might be to pretend we were starting an entirely new block, 00448 // use an LcsClusterReader to read the old one logically and append 00449 // the old rows into the new block, and then carry on from there with 00450 // the new rows. 00451 00452 // Append to an existing cluster page. Set the last rowid based on 00453 // the first rowid and the number of rows currently on the page. 00454 // As rows are "rolled back", lastRow is decremented accordingly 00455 00456 bool bStartNewBlock = 00457 lcsBlockBuilder->openAppend(numVals.get(), lastValOff.get(), nrows); 00458 lastRow = firstRow + nrows; 00459 startRow = lastRow; 00460 00461 // Setup structures to hold rolled back information 00462 aiFixedSize.reset(new uint[numColumns]); 00463 aLeftOverBufs.reset(new boost::scoped_array<FixedBuffer>[numColumns]); 00464 00465 startRowCnt = rowCnt; 00466 00467 // Rollback the final batch for each column 00468 // We need to rollback all 00469 // the batches before we can start the new batches because 00470 // 1) in openAppend() we adjust m_szLeft to not include space 00471 // for numColumns * sizeof(RIBatch). So if the 00472 // block was full, then m_szLeft would be negative, 00473 // since we decreased it by numColumns * sizeof(LcsBatch) 00474 // 2) the rollback code will add sizeof(LcsBatch) to szLeft 00475 // for each batch it rolls back 00476 // 3) the code to add values to a batch gets upset if 00477 // szLeft < 0 00478 for (i = 0; i < numColumns; i++) { 00479 //reset everytime through loop 00480 rowCnt = startRowCnt; 00481 lcsBlockBuilder->describeLastBatch(i, anLeftOvers, aiFixedSize[i]); 00482 00483 // if we have left overs from the last batch (ie. batch did not end on 00484 // an 8 boundary), rollback and store in temporary mem 00485 // aLeftOverBufs[i] 00486 if (anLeftOvers > 0) { 00487 aLeftOverBufs[i].reset( 00488 new FixedBuffer[anLeftOvers * aiFixedSize[i]]); 00489 lcsBlockBuilder->rollBackLastBatch(i, aLeftOverBufs[i].get()); 00490 indexBlockDirty = true; 00491 } 00492 } 00493 00494 // Decrement lastRow if there was a rollback of the last batch 00495 lastRow -= anLeftOvers; 00496 00497 // If the last page is already full, then write it out and start a new one 00498 if (bStartNewBlock) { 00499 writeBlock(); 00500 startNewBlock(); 00501 } 00502 00503 // Start a new batch for each column. 00504 for (i = 0; i < numColumns; i++) { 00505 //reset everytime through loop 00506 rowCnt = startRowCnt; 00507 00508 if (!bStartNewBlock) { 00509 // Repopulate the hash table with the values already in the 00510 // data segment at the bottom of the block (because we didn't 00511 // roll back these values, we only roll back the pointers to 00512 // these values). But only do this if we haven't started a new 00513 // block. 00514 hash[i].restore(numVals[i], lastValOff[i]); 00515 } 00516 00517 // if we had left overs from the last batch, start a new batch 00518 // NOTE: we are guaranteed to be able to add these values back 00519 // to the current block 00520 if (anLeftOvers > 0) { 00521 uint8_t *val; 00522 bool undoInsert = false; 00523 00524 // There is a very small probability that when the hash is 00525 // restored, using the existing values in the block, that the 00526 // hash will be full and some of the left over values 00527 // can not be stored in the hash. If this is true then clear 00528 // the hash. 00529 if (hash[i].isHashFull(anLeftOvers)) { 00530 hash[i].startNewBatch(anLeftOvers); 00531 } 00532 00533 for (j = 0, val = aLeftOverBufs[i].get(); 00534 j < anLeftOvers; 00535 j++, val += aiFixedSize[i]) 00536 { 00537 hash[i].insert(val, &vOrd, &undoInsert); 00538 00539 //If we have left overs they should fit in the block 00540 assert(!undoInsert); 00541 addValueOrdinal(i, vOrd.getValOrd()); 00542 rowCnt++; 00543 } 00544 } 00545 } 00546 }
void LcsClusterAppendExecStream::startNewBlock | ( | ) | [protected] |
Prepare to write a fresh block.
Definition at line 378 of file LcsClusterAppendExecStream.cpp.
References blockSize, builderBlock, colTupleDesc, firstRow, hash, hashBlock, indexBlockDirty, init(), lastRow, lcsBlockBuilder, numColumns, pIndexBlock, and rowCnt.
Referenced by initLoad(), loadExistingBlock(), and writeBatch().
00379 { 00380 firstRow = lastRow; 00381 00382 // Get a new cluster page from the btree segment 00383 pIndexBlock = lcsBlockBuilder->allocateClusterPage(firstRow); 00384 00385 // Reset index block and block builder. 00386 lcsBlockBuilder->init( 00387 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock), 00388 builderBlock.get(), blockSize); 00389 00390 // reset Hashes 00391 for (uint i = 0; i < numColumns; i++) { 00392 hash[i].init( 00393 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize); 00394 } 00395 00396 // reset row count 00397 // NOTE: if the rowCnt is less than eight then we know we are carrying 00398 // over rows from previous block because the count did not end 00399 // on a boundary of 8 00400 if (rowCnt >= 8) { 00401 rowCnt = 0; 00402 } 00403 indexBlockDirty = false; 00404 00405 // Start writing a new block. 00406 lcsBlockBuilder->openNew(firstRow); 00407 }
void LcsClusterAppendExecStream::convertTuplesToCols | ( | ) | [protected] |
Adds value ordinal to row array for new row.
Definition at line 548 of file LcsClusterAppendExecStream.cpp.
References indexBlockDirty, rowBlock, and rowCnt.
Referenced by compress(), loadExistingBlock(), and writeBatch().
00549 { 00550 uint16_t *rowWordArray = (uint16_t *) rowBlock[column]; 00551 rowWordArray[rowCnt] = vOrd; 00552 00553 // since we added a row mark block as dirty 00554 indexBlockDirty = true; 00555 }
bool LcsClusterAppendExecStream::isRowArrayFull | ( | ) | [protected] |
True if row array is full.
Definition at line 557 of file LcsClusterAppendExecStream.cpp.
References nRowsMax, and rowCnt.
Referenced by compress().
00558 { 00559 if (rowCnt >= nRowsMax) { 00560 return true; 00561 } else { 00562 return false; 00563 } 00564 }
void LcsClusterAppendExecStream::writeBatch | ( | bool | lastBatch | ) | [protected] |
Writes a batch(run) to index block.
Batches have a multiple of 8 rows.
lastBatch | true if last batch |
Definition at line 566 of file LcsClusterAppendExecStream.cpp.
References addValueOrdinal(), count(), FixedBuffer, hash, lastRow, LCS_FIXED, LCS_VARIABLE, lcsBlockBuilder, maxValueSize, numColumns, rowBlock, rowCnt, startNewBlock(), tempBuf, and writeBlock().
Referenced by compress(), and writeBlock().
00567 { 00568 uint16_t *oVals; 00569 uint leftOvers; 00570 PBuffer val; 00571 LcsBatchMode mode; 00572 uint i, j; 00573 uint origRowCnt, count = 0; 00574 00575 lastRow += rowCnt; 00576 00577 for (origRowCnt = rowCnt, i = 0; i < numColumns; i++) { 00578 rowCnt = origRowCnt; 00579 00580 // save max value size so we can read leftovers 00581 maxValueSize[i] = hash[i].getMaxValueSize(); 00582 00583 // Pick which compression mode to use (fixed, variable, or compressed) 00584 lcsBlockBuilder->pickCompressionMode( 00585 i, maxValueSize[i], rowCnt, &oVals, mode); 00586 leftOvers = rowCnt > 8 ? rowCnt % 8 : 0; 00587 00588 // all batches must end on an eight boundary so we move 00589 // values over eight boundary to the next batch. 00590 // if there are leftOvers or if the there are less than 00591 // eight values in this batch allocate buffer to store 00592 // values to be written to next batch 00593 if (leftOvers) { 00594 tempBuf[i].reset(new FixedBuffer[leftOvers * maxValueSize[i]]); 00595 count = leftOvers; 00596 00597 } else if (origRowCnt < 8) { 00598 tempBuf[i].reset(new FixedBuffer[origRowCnt * maxValueSize[i]]); 00599 count = origRowCnt; 00600 } else { 00601 // no values to write to next batch (ie on boundary of 8) 00602 tempBuf[i].reset(); 00603 } 00604 00605 // Write out the batch and collect the leftover rows in tempBuf 00606 if (LCS_FIXED == mode || LCS_VARIABLE == mode) { 00607 hash[i].prepareFixedOrVariableBatch( 00608 (PBuffer) rowBlock[i], rowCnt); 00609 lcsBlockBuilder->putFixedVarBatch( 00610 i, (uint16_t *) rowBlock[i], tempBuf[i].get()); 00611 if (mode == LCS_FIXED) { 00612 hash[i].clearFixedEntries(); 00613 } 00614 00615 } else { 00616 uint16_t numVals; 00617 00618 // write orderVals to oVals and remap val ords in row array 00619 hash[i].prepareCompressedBatch( 00620 (PBuffer) rowBlock[i], rowCnt, (uint16_t *) &numVals, oVals); 00621 lcsBlockBuilder->putCompressedBatch( 00622 i, (PBuffer) rowBlock[i], tempBuf[i].get()); 00623 } 00624 00625 // setup next batch 00626 rowCnt = 0; 00627 hash[i].startNewBatch(!lastBatch ? count : 0); 00628 } 00629 00630 //compensate for left over and rolled back rows 00631 if (!lastBatch) { 00632 lastRow -= count; 00633 } 00634 bool bStartNewBlock; 00635 bStartNewBlock = false; 00636 00637 // If we couldn't even fit 8 values into the batch (and this is not the 00638 // final batch), then the block must be full. putCompressedBatch()/ 00639 // putFixedVarBatch() assumed that this was the last batch, so they wrote 00640 // out these rows in a small batch. Roll back the entire batch (putting 00641 // rolled back results in tempBuf) and move to next block 00642 if (!lastBatch && origRowCnt < 8) { 00643 // rollback each batch 00644 for (i = 0; i < numColumns; i++) { 00645 lcsBlockBuilder->rollBackLastBatch(i, tempBuf[i].get()); 00646 } 00647 bStartNewBlock = true; 00648 } 00649 00650 // Should we move to a new block? Move if 00651 // (a) bStartNewBlock (we need to move just to write the current batch) 00652 // or (b) lcsBlockBuilder->isEndOfBlock() (there isn't room to even start 00653 // the next batch) 00654 if (bStartNewBlock || (!lastBatch && lcsBlockBuilder->isEndOfBlock())) { 00655 writeBlock(); 00656 startNewBlock(); 00657 } 00658 00659 // Add leftOvers or rolled back values to new batch 00660 if (!lastBatch) { 00661 for (i = 0; i < numColumns; i++) { 00662 rowCnt = 0; 00663 for (j = 0, val = tempBuf[i].get(); j < count; j++) { 00664 LcsHashValOrd vOrd; 00665 bool undoInsert = false; 00666 00667 hash[i].insert(val, &vOrd, &undoInsert); 00668 00669 // If we have leftovers they should fit in the current block 00670 // (because we moved to a new block above, if it was necessary) 00671 assert(!undoInsert); 00672 addValueOrdinal(i, vOrd.getValOrd()); 00673 rowCnt++; 00674 val += maxValueSize[i]; 00675 } 00676 } 00677 } 00678 00679 for (i = 0; i < numColumns; i++) { 00680 if (tempBuf[i].get()) { 00681 tempBuf[i].reset(); 00682 } 00683 } 00684 }
void LcsClusterAppendExecStream::writeBlock | ( | ) | [protected] |
Writes block to index when the block is full or this is the last block in the load.
Definition at line 686 of file LcsClusterAppendExecStream.cpp.
References indexBlockDirty, lcsBlockBuilder, rowCnt, and writeBatch().
Referenced by compress(), loadExistingBlock(), and writeBatch().
00687 { 00688 if (indexBlockDirty) { 00689 // If the rowCnt is not zero, then the last batch was not on 00690 // a boundary of 8 so we need to write the last batch 00691 if (rowCnt) { 00692 writeBatch(true); 00693 00694 // REVIEW jvs 28-Nov-2005: it must be possible to eliminate 00695 // this circularity between writeBlock and writeBatch. 00696 00697 // Handle corner case. writeBatch may have written this block 00698 // to the btree 00699 if (!indexBlockDirty) { 00700 return; 00701 } 00702 } 00703 00704 // Tell block builder we are done so it can wrap up writing to the 00705 // index block 00706 lcsBlockBuilder->endBlock(); 00707 00708 // Dump out the page contents to trace if appropriate 00709 00710 indexBlockDirty = false; 00711 } 00712 }
bool LcsClusterAppendExecStream::getLastBlock | ( | PLcsClusterNode & | pBlock | ) | [protected] |
Gets last block written to disk so we can append to it, reading in the first rid value stored on the page.
pBlock | returns pointer to last cluster block |
Definition at line 409 of file LcsClusterAppendExecStream.cpp.
References firstRow, and lcsBlockBuilder.
Referenced by initLoad().
00410 { 00411 if (!lcsBlockBuilder->getLastClusterPageForWrite(pBlock, firstRow)) { 00412 return false; 00413 } else { 00414 return true; 00415 } 00416 }
void LcsClusterAppendExecStream::init | ( | ) | [protected] |
Initializes and sets up object with content specific to the load that will be carried out.
Definition at line 131 of file LcsClusterAppendExecStream.cpp.
References arraysAlloced, compressCalled, firstRow, indexBlockDirty, lastRow, numRowCompressed, pIndexBlock, rowCnt, and startRow.
Referenced by open(), and startNewBlock().
00132 { 00133 pIndexBlock = 0; 00134 firstRow = LcsRid(0); 00135 lastRow = LcsRid(0); 00136 startRow = LcsRid(0); 00137 rowCnt = 0; 00138 indexBlockDirty = false; 00139 arraysAlloced = false; 00140 compressCalled = false; 00141 numRowCompressed = 0; 00142 }
ExecStreamResult LcsClusterAppendExecStream::compress | ( | ExecStreamQuantum const & | quantum | ) | [protected] |
Processes rows for loading.
Calls WriteBatch once values cannot fit into a page
quantum | ExecStream quantum |
Definition at line 144 of file LcsClusterAppendExecStream.cpp.
References addValueOrdinal(), close(), clusterColsTupleData, EXECRC_BUF_OVERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, EXECRC_YIELD, TupleAccessor::getCurrentByteCount(), getTupleForLoad(), hash, hashValOrd, isDone, isRowArrayFull(), lcsBlockBuilder, TupleAccessor::marshal(), ExecStreamQuantum::nTuplesMax, numColumns, outputTuple, outputTupleAccessor, outputTupleBuffer, postProcessTuple(), SingleOutputExecStream::pOutAccessor, rowCnt, writeBatch(), and writeBlock().
Referenced by execute().
00146 { 00147 uint i, j, k; 00148 bool canFit = false; 00149 bool undoInsert = false; 00150 00151 if (isDone) { 00152 // already returned final result 00153 pOutAccessor->markEOS(); 00154 return EXECRC_EOS; 00155 } 00156 00157 for (i = 0; i < quantum.nTuplesMax; i++) { 00158 // if we have finished processing the previous row, retrieve 00159 // the next cluster tuple and then convert the columns in the 00160 // cluster into individual tuples, one per cluster column 00161 ExecStreamResult rc = getTupleForLoad(); 00162 00163 // no more input; produce final row count 00164 if (rc == EXECRC_EOS) { 00165 // since we're done adding rows to the index, write the last batch 00166 // and block 00167 if (rowCnt) { 00168 // if rowCnt < 8 or a multiple of 8, force writeBatch to 00169 // treat this as the last batch 00170 if (rowCnt < 8 || (rowCnt % 8) == 0) { 00171 writeBatch(true); 00172 } else { 00173 writeBatch(false); 00174 } 00175 } 00176 00177 // Write out the last block and then free up resources 00178 // rather than waiting until stream close. This will keep 00179 // resource usage window smaller and avoid interference with 00180 // downstream processing such as writing to unclustered indexes. 00181 writeBlock(); 00182 if (lcsBlockBuilder) { 00183 lcsBlockBuilder->close(); 00184 } 00185 close(); 00186 00187 // outputTuple was already initialized to point to numRowCompressed/ 00188 // startRow in prepare() 00189 // Write a single outputTuple(numRowCompressed, [startRow]) 00190 // and indicate OVERFLOW. 00191 00192 outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get()); 00193 pOutAccessor->provideBufferForConsumption( 00194 outputTupleBuffer.get(), 00195 outputTupleBuffer.get() + 00196 outputTupleAccessor->getCurrentByteCount()); 00197 00198 isDone = true; 00199 return EXECRC_BUF_OVERFLOW; 00200 } else if (rc != EXECRC_YIELD) { 00201 return rc; 00202 } 00203 00204 // Go through each column value for current row and insert it. 00205 // If we run out of space then rollback all the columns that 00206 // I already inserted. 00207 undoInsert = false; 00208 00209 for (j = 0; j < numColumns; j++) { 00210 hash[j].insert( 00211 clusterColsTupleData[j], &hashValOrd[j], &undoInsert); 00212 00213 if (undoInsert) { 00214 // rollback cluster columns already inserted 00215 // j has not been incremented yet, so the condition should be 00216 // k <= j 00217 for (k = 0; k <= j; k++) { 00218 hash[k].undoInsert(clusterColsTupleData[k]); 00219 } 00220 break; 00221 } 00222 } 00223 00224 // Was there enough space to add this row? Note that the Insert() 00225 // calls above accounted for the space needed by addValueOrdinal() 00226 // below, so we don't have to worry about addValueOrdinal() running 00227 // out of space 00228 if (!undoInsert) { 00229 canFit = true; 00230 } else { 00231 canFit = false; 00232 } 00233 00234 if (canFit) { 00235 // Add the pointers from the batch to the data values 00236 for (j = 0; j < numColumns; j++) { 00237 addValueOrdinal(j, hashValOrd[j].getValOrd()); 00238 } 00239 00240 rowCnt++; 00241 00242 // if reach max rows that can fit in row array then write batch 00243 if (isRowArrayFull()) { 00244 writeBatch(false); 00245 } 00246 } else { 00247 // since we can't fit anymore values write out current batch 00248 writeBatch(false); 00249 00250 // restart using last value retrieved from stream because it 00251 // could not fit in the batch; by continuing we can avoid 00252 // a goto to jump back to the top of this for loop at the 00253 // expense of a harmless increment of the quantum 00254 continue; 00255 } 00256 00257 // only consume the tuple after we know the row can fit 00258 // on the current page 00259 postProcessTuple(); 00260 } 00261 00262 return EXECRC_QUANTUM_EXPIRED; 00263 }
void LcsClusterAppendExecStream::close | ( | ) | [protected, virtual] |
Writes out the last pending batches and btree pages.
Deallocates temporary memory and buffer pages. Allows resources to be freed before the execution stream is actually closed.
Reimplemented from ClosableObject.
Reimplemented in LcsClusterReplaceExecStream.
Definition at line 360 of file LcsClusterAppendExecStream.cpp.
References builderBlock, hash, hashBlock, hashValOrd, lcsBlockBuilder, maxValueSize, NULL_PAGE_ID, SegmentAccessor::pSegment, rowBlock, scratchAccessor, and tempBuf.
Referenced by LcsClusterReplaceExecStream::close(), closeImpl(), and compress().
00361 { 00362 if (scratchAccessor.pSegment) { 00363 scratchAccessor.pSegment->deallocatePageRange( 00364 NULL_PAGE_ID, NULL_PAGE_ID); 00365 } 00366 rowBlock.reset(); 00367 hashBlock.reset(); 00368 builderBlock.reset(); 00369 00370 hash.reset(); 00371 hashValOrd.reset(); 00372 tempBuf.reset(); 00373 maxValueSize.reset(); 00374 00375 lcsBlockBuilder.reset(); 00376 }
void LcsClusterAppendExecStream::initTupleLoadParams | ( | const TupleProjection & | inputProj | ) | [protected, virtual] |
Initializes member fields corresponding to the data to be loaded.
inputProj | projection of the input tuple that's relevant to this cluster append |
Reimplemented in LcsClusterReplaceExecStream.
Definition at line 70 of file LcsClusterAppendExecStream.cpp.
References clusterColsTupleData, clusterColsTupleDesc, colTupleDesc, TupleData::compute(), numColumns, TupleDescriptor::projectFrom(), and tableColsTupleDesc.
Referenced by prepare().
00072 { 00073 numColumns = inputProj.size(); 00074 clusterColsTupleDesc.projectFrom(tableColsTupleDesc, inputProj); 00075 clusterColsTupleData.compute(clusterColsTupleDesc); 00076 00077 // setup one tuple descriptor per cluster column 00078 colTupleDesc.reset(new TupleDescriptor[numColumns]); 00079 for (int i = 0; i < numColumns; i++) { 00080 colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i]]); 00081 } 00082 }
ExecStreamResult LcsClusterAppendExecStream::getTupleForLoad | ( | ) | [protected, virtual] |
Retrieves the tuple that will be loaded into the cluster.
Reimplemented in LcsClusterReplaceExecStream.
Definition at line 265 of file LcsClusterAppendExecStream.cpp.
References clusterColsTupleData, EXECBUF_EOS, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, initLoad(), and SingleInputExecStream::pInAccessor.
Referenced by compress().
00266 { 00267 if (pInAccessor->getState() == EXECBUF_EOS) { 00268 return EXECRC_EOS; 00269 } 00270 00271 if (!pInAccessor->demandData()) { 00272 return EXECRC_BUF_UNDERFLOW; 00273 } 00274 00275 // Initialize the load, only after we know we have input available 00276 initLoad(); 00277 00278 if (!pInAccessor->isTupleConsumptionPending()) { 00279 pInAccessor->unmarshalProjectedTuple(clusterColsTupleData); 00280 } 00281 00282 return EXECRC_YIELD; 00283 }
void LcsClusterAppendExecStream::postProcessTuple | ( | ) | [protected, virtual] |
Performs post-processing after a tuple has been loaded.
Reimplemented in LcsClusterReplaceExecStream.
Definition at line 354 of file LcsClusterAppendExecStream.cpp.
References numRowCompressed, and SingleInputExecStream::pInAccessor.
Referenced by compress(), and LcsClusterReplaceExecStream::postProcessTuple().
00355 { 00356 pInAccessor->consumeTuple(); 00357 numRowCompressed++; 00358 }
void LcsClusterAppendExecStream::prepare | ( | LcsClusterAppendExecStreamParams const & | params | ) | [virtual] |
Definition at line 31 of file LcsClusterAppendExecStream.cpp.
References SegPageLock::accessSegment(), blockSize, bufferLock, TupleData::compute(), initTupleLoadParams(), LcsClusterAppendExecStreamParams::inputProj, numRowCompressed, outputTuple, outputTupleAccessor, SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, ConduitExecStream::prepare(), BTreeExecStream::prepare(), SegmentAccessor::pSegment, ExecStreamParams::scratchAccessor, scratchAccessor, BTreeDescriptor::segmentAccessor, startRow, tableColsTupleDesc, and BTreeExecStream::treeDescriptor.
Referenced by LcsClusterReplaceExecStream::prepare().
00033 { 00034 BTreeExecStream::prepare(params); 00035 ConduitExecStream::prepare(params); 00036 00037 tableColsTupleDesc = pInAccessor->getTupleDesc(); 00038 initTupleLoadParams(params.inputProj); 00039 00040 // setup descriptors, accessors and data to access only the columns 00041 // for this cluster, based on the input projection 00042 00043 pInAccessor->bindProjection(params.inputProj); 00044 00045 // setup bufferLock to access temporary large page blocks 00046 00047 scratchAccessor = params.scratchAccessor; 00048 bufferLock.accessSegment(scratchAccessor); 00049 00050 // The output stream from the loader is either a single column representing 00051 // the number of rows loaded or two columns -- number of rows loaded and 00052 // starting rid value. The latter applies when there are 00053 // downstream create indexes 00054 00055 TupleDescriptor outputTupleDesc; 00056 00057 outputTupleDesc = pOutAccessor->getTupleDesc(); 00058 outputTuple.compute(outputTupleDesc); 00059 outputTuple[0].pData = (PConstBuffer) &numRowCompressed; 00060 if (outputTupleDesc.size() > 1) { 00061 outputTuple[1].pData = (PConstBuffer) &startRow; 00062 } 00063 00064 outputTupleAccessor = & pOutAccessor->getScratchTupleAccessor(); 00065 00066 blockSize = treeDescriptor.segmentAccessor.pSegment->getUsablePageSize(); 00067 00068 }
void LcsClusterAppendExecStream::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from ConduitExecStream.
Reimplemented in LcsClusterReplaceExecStream.
Definition at line 103 of file LcsClusterAppendExecStream.cpp.
References FixedBuffer, TupleAccessor::getMaxByteCount(), init(), isDone, ConduitExecStream::open(), BTreeExecStream::open(), outputTupleAccessor, and outputTupleBuffer.
Referenced by LcsClusterReplaceExecStream::open().
00104 { 00105 BTreeExecStream::open(restart); 00106 ConduitExecStream::open(restart); 00107 00108 if (!restart) { 00109 outputTupleBuffer.reset( 00110 new FixedBuffer[outputTupleAccessor->getMaxByteCount()]); 00111 } 00112 00113 init(); 00114 isDone = false; 00115 }
ExecStreamResult LcsClusterAppendExecStream::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 117 of file LcsClusterAppendExecStream.cpp.
References compress().
00119 { 00120 return compress(quantum); 00121 }
void LcsClusterAppendExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual] |
Reimplemented from ExecStream.
Reimplemented in LcsClusterReplaceExecStream.
Definition at line 84 of file LcsClusterAppendExecStream.cpp.
References ExecStream::getResourceRequirements(), ExecStreamResourceQuantity::nCachePages, and numColumns.
Referenced by LcsClusterReplaceExecStream::getResourceRequirements().
00087 { 00088 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00089 00090 // REVIEW -- 00091 // 4 pages per cluster column 00092 // - 1 for indexBlock 00093 // - 1 for rowBlock 00094 // - 1 for hash, 00095 // - 1 for value bank 00096 // 5 pages for btree and then 1 for cluster page 00097 minQuantity.nCachePages += (numColumns * 4) + 6; 00098 00099 // TODO 00100 optQuantity = minQuantity; 00101 }
void LcsClusterAppendExecStream::closeImpl | ( | ) | [virtual] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from BTreeExecStream.
Definition at line 123 of file LcsClusterAppendExecStream.cpp.
References close(), ExecStream::closeImpl(), BTreeExecStream::closeImpl(), and outputTupleBuffer.
00124 { 00125 BTreeExecStream::closeImpl(); 00126 ConduitExecStream::closeImpl(); 00127 outputTupleBuffer.reset(); 00128 close(); 00129 }
ExecStreamBufProvision LcsClusterAppendExecStream::getOutputBufProvision | ( | ) | const [virtual] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented from SingleOutputExecStream.
Definition at line 737 of file LcsClusterAppendExecStream.cpp.
References BUFPROV_PRODUCER.
00738 { 00739 return BUFPROV_PRODUCER; 00740 }
SharedBTreeReader BTreeExecStream::newReader | ( | ) | [protected, virtual, inherited] |
Reimplemented in BTreePrefetchSearchExecStream.
Definition at line 67 of file BTreeExecStream.cpp.
References BTreeExecStream::pBTreeAccessBase, BTreeExecStream::pBTreeReader, and BTreeExecStream::treeDescriptor.
Referenced by BTreeReadExecStream::open().
00068 { 00069 SharedBTreeReader pReader = SharedBTreeReader( 00070 new BTreeReader(treeDescriptor)); 00071 pBTreeAccessBase = pBTreeReader = pReader; 00072 return pReader; 00073 }
SharedBTreeWriter BTreeExecStream::newWriter | ( | bool | monotonic = false |
) | [protected, inherited] |
Definition at line 75 of file BTreeExecStream.cpp.
References BTreeExecStream::pBTreeAccessBase, BTreeExecStream::pBTreeReader, BTreeExecStream::scratchAccessor, and BTreeExecStream::treeDescriptor.
Referenced by FtrsTableWriter::createIndexWriter(), and BTreeInsertExecStream::open().
00076 { 00077 SharedBTreeWriter pWriter = SharedBTreeWriter( 00078 new BTreeWriter(treeDescriptor,scratchAccessor,monotonic)); 00079 pBTreeAccessBase = pBTreeReader = pWriter; 00080 return pWriter; 00081 }
SharedBTreeWriter BTreeExecStream::newWriter | ( | BTreeExecStreamParams const & | params | ) | [static, inherited] |
Definition at line 83 of file BTreeExecStream.cpp.
References BTreeExecStream::copyParamsToDescriptor(), ExecStreamParams::pCacheAccessor, ExecStreamParams::scratchAccessor, and BTreeExecStream::treeDescriptor.
00085 { 00086 BTreeDescriptor treeDescriptor; 00087 copyParamsToDescriptor(treeDescriptor,params,params.pCacheAccessor); 00088 return SharedBTreeWriter( 00089 new BTreeWriter( 00090 treeDescriptor,params.scratchAccessor)); 00091 }
void BTreeExecStream::endSearch | ( | ) | [protected, virtual, inherited] |
Forgets the current reader or writer's search, releasing any page locks.
Definition at line 107 of file BTreeExecStream.cpp.
References BTreeExecStream::pBTreeReader.
Referenced by BTreeExecStream::closeImpl(), and BTreeExecStream::open().
00108 { 00109 if (pBTreeReader && pBTreeReader->isSingular() == false) { 00110 pBTreeReader->endSearch(); 00111 } 00112 }
void BTreeExecStream::prepare | ( | BTreeExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 30 of file BTreeExecStream.cpp.
References BTreeExecStream::copyParamsToDescriptor(), ExecStreamParams::pCacheAccessor, SingleOutputExecStream::prepare(), BTreeParams::pRootMap, BTreeExecStream::pRootMap, BTreeParams::rootPageIdParamId, BTreeExecStream::rootPageIdParamId, ExecStreamParams::scratchAccessor, BTreeExecStream::scratchAccessor, and BTreeExecStream::treeDescriptor.
Referenced by prepare(), LbmGeneratorExecStream::prepare(), BTreeReadExecStream::prepare(), and BTreeInsertExecStream::prepare().
00031 { 00032 SingleOutputExecStream::prepare(params); 00033 00034 copyParamsToDescriptor(treeDescriptor,params,params.pCacheAccessor); 00035 scratchAccessor = params.scratchAccessor; 00036 pRootMap = params.pRootMap; 00037 rootPageIdParamId = params.rootPageIdParamId; 00038 }
void SingleOutputExecStream::prepare | ( | SingleOutputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 48 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().
Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().
00049 { 00050 ExecStream::prepare(params); 00051 assert(pOutAccessor); 00052 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00053 if (pOutAccessor->getTupleDesc().empty()) { 00054 assert(!params.outputTupleDesc.empty()); 00055 pOutAccessor->setTupleShape( 00056 params.outputTupleDesc, 00057 params.outputTupleFormat); 00058 } 00059 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void BTreeExecStream::copyParamsToDescriptor | ( | BTreeDescriptor & | , | |
BTreeParams const & | , | |||
SharedCacheAccessor const & | ||||
) | [static, inherited] |
Definition at line 93 of file BTreeExecStream.cpp.
References BTreeParams::keyProj, BTreeDescriptor::keyProjection, BTreeParams::pageOwnerId, BTreeDescriptor::pageOwnerId, SegmentAccessor::pCacheAccessor, BTreeParams::pSegment, SegmentAccessor::pSegment, BTreeParams::rootPageId, BTreeDescriptor::rootPageId, BTreeDescriptor::segmentAccessor, BTreeParams::segmentId, BTreeDescriptor::segmentId, BTreeExecStream::treeDescriptor, BTreeParams::tupleDesc, and BTreeDescriptor::tupleDescriptor.
Referenced by BTreeExecStream::newWriter(), LbmSplicerExecStream::prepare(), and BTreeExecStream::prepare().
00097 { 00098 treeDescriptor.segmentAccessor.pSegment = params.pSegment; 00099 treeDescriptor.segmentAccessor.pCacheAccessor = pCacheAccessor; 00100 treeDescriptor.tupleDescriptor = params.tupleDesc; 00101 treeDescriptor.keyProjection = params.keyProj; 00102 treeDescriptor.rootPageId = params.rootPageId; 00103 treeDescriptor.segmentId = params.segmentId; 00104 treeDescriptor.pageOwnerId = params.pageOwnerId; 00105 }
void SingleOutputExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Implements ExecStream.
Reimplemented in ConduitExecStream, and ConfluenceExecStream.
Definition at line 35 of file SingleOutputExecStream.cpp.
void SingleOutputExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Implements ExecStream.
Reimplemented in ConduitExecStream.
Definition at line 41 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::pOutAccessor.
Referenced by ConduitExecStream::setOutputBufAccessors().
00043 { 00044 assert(outAccessors.size() == 1); 00045 pOutAccessor = outAccessors[0]; 00046 }
bool ExecStream::canEarlyClose | ( | ) | [virtual, inherited] |
Reimplemented in SegBufferWriterExecStream.
Definition at line 49 of file ExecStream.cpp.
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual, inherited] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 93 of file ExecStream.cpp.
References EXEC_RESOURCE_ACCURATE.
Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().
00097 { 00098 getResourceRequirements(minQuantity, optQuantity); 00099 optType = EXEC_RESOURCE_ACCURATE; 00100 }
void ExecStream::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual, inherited] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 111 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.
Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().
00113 { 00114 resourceAllocation = quantity; 00115 if (pQuotaAccessor) { 00116 pQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00117 } 00118 if (pScratchQuotaAccessor) { 00119 pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00120 } 00121 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
ExecStreamBufProvision ExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented in ConfluenceExecStream, DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferWriterExecStream, SingleInputExecStream, and JavaTransformExecStream.
Definition at line 182 of file ExecStream.cpp.
References BUFPROV_NONE.
00183 { 00184 return BUFPROV_NONE; 00185 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
ExecStreamResult ConduitExecStream::precheckConduitBuffers | ( | ) | [protected, inherited] |
Checks the state of the input and output buffers.
If input empty, requests production. If input EOS, propagates that to output buffer. If output full, returns EXECRC_OVERFLOW.
Definition at line 61 of file ConduitExecStream.cpp.
References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, SingleInputExecStream::pInAccessor, and SingleOutputExecStream::pOutAccessor.
Referenced by ExternalSortExecStreamImpl::execute(), FtrsTableWriterExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreeInsertExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), ReshapeExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), and CalcExecStream::execute().
00062 { 00063 switch (pInAccessor->getState()) { 00064 case EXECBUF_EMPTY: 00065 pInAccessor->requestProduction(); 00066 return EXECRC_BUF_UNDERFLOW; 00067 case EXECBUF_UNDERFLOW: 00068 return EXECRC_BUF_UNDERFLOW; 00069 case EXECBUF_EOS: 00070 pOutAccessor->markEOS(); 00071 return EXECRC_EOS; 00072 case EXECBUF_NONEMPTY: 00073 case EXECBUF_OVERFLOW: 00074 break; 00075 default: 00076 permAssert(false); 00077 } 00078 if (pOutAccessor->getState() == EXECBUF_OVERFLOW) { 00079 return EXECRC_BUF_OVERFLOW; 00080 } 00081 return EXECRC_YIELD; 00082 }
void ConduitExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Reimplemented from SingleInputExecStream.
Definition at line 36 of file ConduitExecStream.cpp.
References SingleOutputExecStream::setOutputBufAccessors().
00038 { 00039 SingleOutputExecStream::setOutputBufAccessors(outAccessors); 00040 }
void ConduitExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Reimplemented from SingleInputExecStream.
Definition at line 30 of file ConduitExecStream.cpp.
References SingleInputExecStream::setInputBufAccessors().
00032 { 00033 SingleInputExecStream::setInputBufAccessors(inAccessors); 00034 }
void ConduitExecStream::prepare | ( | ConduitExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 42 of file ConduitExecStream.cpp.
References SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
Referenced by ExternalSortExecStreamImpl::prepare(), prepare(), LbmNormalizerExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SegBufferReaderExecStream::prepare(), SegBufferExecStream::prepare(), ScratchBufferExecStream::prepare(), ReshapeExecStream::prepare(), DoubleBufferExecStream::prepare(), CopyExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), and CalcExecStream::prepare().
00043 { 00044 SingleInputExecStream::prepare(params); 00045 00046 if (params.outputTupleDesc.empty()) { 00047 pOutAccessor->setTupleShape( 00048 pInAccessor->getTupleDesc(), 00049 pInAccessor->getTupleFormat()); 00050 } 00051 00052 SingleOutputExecStream::prepare(params); 00053 }
void SingleInputExecStream::prepare | ( | SingleInputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 44 of file SingleInputExecStream.cpp.
References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().
Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().
00045 { 00046 ExecStream::prepare(params); 00047 00048 assert(pInAccessor); 00049 assert(pInAccessor->getProvision() == getInputBufProvision()); 00050 }
ExecStreamBufProvision SingleInputExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from ExecStream.
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, and SegBufferWriterExecStream.
Definition at line 62 of file SingleInputExecStream.cpp.
References BUFPROV_PRODUCER.
Referenced by SingleInputExecStream::prepare().
00063 { 00064 return BUFPROV_PRODUCER; 00065 }
uint LcsClusterAppendExecStream::blockSize [protected] |
Space available on page blocks for writing cluster data.
Definition at line 58 of file LcsClusterAppendExecStream.h.
Referenced by initLoad(), loadExistingBlock(), prepare(), and startNewBlock().
Tuple descriptor for the tuple representing all cluster columns across the table that this cluster is a part of.
Definition at line 64 of file LcsClusterAppendExecStream.h.
Referenced by LcsClusterReplaceExecStream::initTupleLoadParams(), initTupleLoadParams(), and prepare().
Tuple data for the tuple datums representing only this cluster.
Definition at line 69 of file LcsClusterAppendExecStream.h.
Referenced by compress(), LcsClusterReplaceExecStream::getTupleForLoad(), getTupleForLoad(), LcsClusterReplaceExecStream::initTupleLoadParams(), initTupleLoadParams(), and LcsClusterReplaceExecStream::readOrigClusterRow().
Tuple descriptors for the columns that are part of this cluster.
Definition at line 74 of file LcsClusterAppendExecStream.h.
Referenced by initLoad(), LcsClusterReplaceExecStream::initTupleLoadParams(), and initTupleLoadParams().
boost::scoped_array<TupleDescriptor> LcsClusterAppendExecStream::colTupleDesc [protected] |
Individual tuple descriptors for each column in the cluster.
Definition at line 79 of file LcsClusterAppendExecStream.h.
Referenced by initLoad(), LcsClusterReplaceExecStream::initTupleLoadParams(), initTupleLoadParams(), and startNewBlock().
Scratch accessor for allocating large buffer pages.
Reimplemented from BTreeExecStream.
Definition at line 84 of file LcsClusterAppendExecStream.h.
Referenced by close(), initLoad(), and prepare().
Lock on scratch page.
Definition at line 89 of file LcsClusterAppendExecStream.h.
Referenced by initLoad(), and prepare().
bool LcsClusterAppendExecStream::overwrite [protected] |
bool LcsClusterAppendExecStream::isDone [protected] |
Whether row count has been produced.
Definition at line 99 of file LcsClusterAppendExecStream.h.
Referenced by compress(), and open().
TupleData LcsClusterAppendExecStream::outputTuple [protected] |
Output tuple containing count of number of rows loaded.
Definition at line 104 of file LcsClusterAppendExecStream.h.
Referenced by compress(), and prepare().
A reference to the output accessor contained in SingleOutputExecStream::pOutAccessor.
Definition at line 110 of file LcsClusterAppendExecStream.h.
Referenced by compress(), open(), and prepare().
boost::scoped_array<FixedBuffer> LcsClusterAppendExecStream::outputTupleBuffer [protected] |
buffer holding the outputTuple to provide to the consumers
Definition at line 115 of file LcsClusterAppendExecStream.h.
Referenced by closeImpl(), compress(), and open().
bool LcsClusterAppendExecStream::compressCalled [protected] |
True if execute has been called at least once.
Definition at line 120 of file LcsClusterAppendExecStream.h.
Referenced by init(), and initLoad().
boost::scoped_array<LcsHash> LcsClusterAppendExecStream::hash [protected] |
Array of hashes, one per cluster column.
Definition at line 125 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), close(), compress(), initLoad(), loadExistingBlock(), startNewBlock(), and writeBatch().
uint LcsClusterAppendExecStream::numColumns [protected] |
Number of columns in the cluster.
Definition at line 130 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), compress(), getResourceRequirements(), initLoad(), LcsClusterReplaceExecStream::initTupleLoadParams(), initTupleLoadParams(), loadExistingBlock(), startNewBlock(), and writeBatch().
boost::scoped_array<PBuffer> LcsClusterAppendExecStream::rowBlock [protected] |
Array of temporary blocks for row array.
Definition at line 135 of file LcsClusterAppendExecStream.h.
Referenced by addValueOrdinal(), allocArrays(), close(), initLoad(), and writeBatch().
uint LcsClusterAppendExecStream::nRowsMax [protected] |
Maximum number of values that can be stored in m_rowBlock.
Definition at line 140 of file LcsClusterAppendExecStream.h.
Referenced by initLoad(), and isRowArrayFull().
boost::scoped_array<PBuffer> LcsClusterAppendExecStream::hashBlock [protected] |
Array of temporary blocks for hash table.
Definition at line 145 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), close(), initLoad(), and startNewBlock().
boost::scoped_array<PBuffer> LcsClusterAppendExecStream::builderBlock [protected] |
Array of temporary blocks used by ClusterNodeWriter.
Definition at line 150 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), close(), initLoad(), loadExistingBlock(), and startNewBlock().
uint LcsClusterAppendExecStream::rowCnt [protected] |
Number of rows loaded into the current set of batches.
Definition at line 155 of file LcsClusterAppendExecStream.h.
Referenced by addValueOrdinal(), compress(), init(), isRowArrayFull(), loadExistingBlock(), startNewBlock(), writeBatch(), and writeBlock().
bool LcsClusterAppendExecStream::indexBlockDirty [protected] |
True if index blocks need to be written to disk.
Definition at line 160 of file LcsClusterAppendExecStream.h.
Referenced by addValueOrdinal(), init(), loadExistingBlock(), startNewBlock(), and writeBlock().
LcsRid LcsClusterAppendExecStream::firstRow [protected] |
Starting rowid in a cluster page.
Definition at line 165 of file LcsClusterAppendExecStream.h.
Referenced by getLastBlock(), init(), loadExistingBlock(), and startNewBlock().
LcsRid LcsClusterAppendExecStream::lastRow [protected] |
Last rowid in the last batch.
Definition at line 170 of file LcsClusterAppendExecStream.h.
Referenced by init(), loadExistingBlock(), startNewBlock(), and writeBatch().
LcsRid LcsClusterAppendExecStream::startRow [protected] |
Definition at line 174 of file LcsClusterAppendExecStream.h.
Referenced by init(), initLoad(), loadExistingBlock(), and prepare().
Page builder object.
Definition at line 179 of file LcsClusterAppendExecStream.h.
Referenced by close(), compress(), getLastBlock(), initLoad(), loadExistingBlock(), startNewBlock(), writeBatch(), and writeBlock().
boost::scoped_array<LcsHashValOrd> LcsClusterAppendExecStream::hashValOrd [protected] |
Row value ordinal returned from hash, one per cluster column.
Definition at line 184 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), close(), and compress().
boost::scoped_array<boost::scoped_array<FixedBuffer> > LcsClusterAppendExecStream::tempBuf [protected] |
Temporary buffers used by WriteBatch.
Definition at line 189 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), close(), and writeBatch().
boost::scoped_array<uint> LcsClusterAppendExecStream::maxValueSize [protected] |
Max size for each column cluster used by WriteBatch.
Definition at line 194 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), close(), and writeBatch().
bool LcsClusterAppendExecStream::arraysAlloced [protected] |
Indicates where or not we have already allocated arrays.
Definition at line 199 of file LcsClusterAppendExecStream.h.
Referenced by allocArrays(), and init().
Buffer pointing to cluster page that will actually be written.
Definition at line 204 of file LcsClusterAppendExecStream.h.
Referenced by init(), initLoad(), loadExistingBlock(), and startNewBlock().
Total number of rows loaded by this object.
Definition at line 209 of file LcsClusterAppendExecStream.h.
Referenced by init(), postProcessTuple(), and prepare().
BTreeDescriptor BTreeExecStream::treeDescriptor [protected, inherited] |
Definition at line 113 of file BTreeExecStream.h.
Referenced by BTreeInsertExecStream::buildTree(), BTreeExecStream::closeImpl(), BTreeExecStream::copyParamsToDescriptor(), LcsClusterReplaceExecStream::getTupleForLoad(), initLoad(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreePrefetchSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), BTreeExecStream::newWriter(), LcsClusterReplaceExecStream::open(), BTreeSearchExecStream::open(), BTreePrefetchSearchExecStream::open(), BTreeInsertExecStream::open(), BTreeExecStream::open(), LcsClusterReplaceExecStream::prepare(), prepare(), LbmSearchExecStream::prepare(), LbmGeneratorExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), BTreeExecStream::prepare(), and BTreeInsertExecStream::truncateTree().
BTreeOwnerRootMap* BTreeExecStream::pRootMap [protected, inherited] |
Definition at line 115 of file BTreeExecStream.h.
Referenced by BTreeExecStream::closeImpl(), BTreeExecStream::open(), and BTreeExecStream::prepare().
SharedBTreeAccessBase BTreeExecStream::pBTreeAccessBase [protected, inherited] |
Definition at line 116 of file BTreeExecStream.h.
Referenced by BTreeInsertExecStream::closeImpl(), BTreeExecStream::closeImpl(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), BTreeExecStream::newWriter(), and BTreeExecStream::open().
SharedBTreeReader BTreeExecStream::pBTreeReader [protected, inherited] |
Definition at line 117 of file BTreeExecStream.h.
Referenced by BTreeExecStream::endSearch(), BTreePrefetchSearchExecStream::newReader(), BTreeExecStream::newReader(), and BTreeExecStream::newWriter().
DynamicParamId BTreeExecStream::rootPageIdParamId [protected, inherited] |
Definition at line 118 of file BTreeExecStream.h.
Referenced by BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), BTreeInsertExecStream::prepare(), and BTreeExecStream::prepare().
SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited] |
Definition at line 56 of file SingleOutputExecStream.h.
Referenced by compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().
SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited] |
Definition at line 51 of file SingleInputExecStream.h.
Referenced by SortedAggExecStream::compareGroupByKeys(), ExternalSortExecStreamImpl::computeFirstResult(), ExternalSortExecStreamImpl::execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), LcsClusterReplaceExecStream::getTupleForLoad(), getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), LcsClusterReplaceExecStream::open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), SegBufferWriterExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), ExternalSortExecStreamImpl::prepare(), prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().