#include <LhxAggExecStream.h>
Inheritance diagram for LhxAggExecStream:
Public Member Functions | |
virtual void | prepare (LhxAggExecStreamParams const ¶ms) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual void | prepare (ConduitExecStreamParams const ¶ms) |
virtual void | prepare (SingleInputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | prepare (SingleOutputExecStreamParams const ¶ms) |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
virtual bool | canEarlyClose () |
| |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
Protected Member Functions | |
ExecStreamResult | precheckConduitBuffers () |
Checks the state of the input and output buffers. | |
Protected Attributes | |
SharedExecStreamBufAccessor | pInAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
SharedExecStreamBufAccessor | pOutAccessor |
Private Types | |
enum | LhxAggState { ForcePartitionBuild, Build, Produce, ProducePending, Partition, CreateChildPlan, GetNextPlan, Done } |
Private Member Functions | |
virtual void | closeImpl () |
Implements ClosableObject. | |
void | setHashInfo (LhxAggExecStreamParams const ¶ms) |
void | setAggComputers (LhxHashInfo &hashInfo, AggInvocationList const &aggInvocations) |
Private Attributes | |
TupleData | inputTuple |
Input tuple. | |
TupleData | outputTuple |
TupleData to assemble the output tuple. | |
uint | numTuplesProduced |
Number of tuples produced within the current quantum. | |
LhxHashInfo | hashInfo |
Hash aggregation info. | |
LhxHashTable | hashTable |
HashTable to use. | |
LhxHashTableReader | hashTableReader |
BlockNum | numBlocksHashTable |
Initial estimate of blocks required. | |
BlockNum | numMiscCacheBlocks |
Number of cache blocks set aside for I/O. | |
bool | isTopPlan |
SharedLhxPlan | rootPlan |
LhxPlan * | curPlan |
uint | buildInputIndex |
Index of build input(should be 0 for agg). | |
LhxPartitionInfo | partInfo |
Partition context used in recursive partitioning. | |
SharedLhxPartition | buildPart |
The build partition (which is also the only partition). | |
LhxPartitionReader | buildReader |
Partition reader. | |
bool | enableSubPartStat |
Whether to use sub partition stats. | |
uint | forcePartitionLevel |
This is set only in tests. | |
LhxAggState | aggState |
State of the AggExecStream. | |
LhxAggState | nextState |
The next state of the AggExecStream. | |
uint | groupByKeyCount |
AggComputerList | aggComputers |
AggComputerList | partialAggComputers |
The aggregation is performed by using a hash table.
Definition at line 90 of file LhxAggExecStream.h.
enum LhxAggExecStream::LhxAggState [private] |
ForcePartitionBuild | |
Build | |
Produce | |
ProducePending | |
Partition | |
CreateChildPlan | |
GetNextPlan | |
Done |
Definition at line 96 of file LhxAggExecStream.h.
00096 { 00097 ForcePartitionBuild, Build, Produce, ProducePending, 00098 Partition, CreateChildPlan, GetNextPlan, Done 00099 };
void LhxAggExecStream::closeImpl | ( | ) | [private, virtual] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from ExecStream.
Definition at line 469 of file LhxAggExecStream.cpp.
References ExecStream::closeImpl(), hashTable, LhxHashTable::releaseResources(), and rootPlan.
00470 { 00471 hashTable.releaseResources(); 00472 if (rootPlan) { 00473 rootPlan->close(); 00474 rootPlan.reset(); 00475 } 00476 // REVIEW jvs 25-Aug-2006: Are there other resources that ought to be 00477 // released here? Anything in hashTableReader, partInfo, buildPart, 00478 // buildReader? Or does that all get cleaned up implicitly? 00479 ConduitExecStream::closeImpl(); 00480 }
void LhxAggExecStream::setHashInfo | ( | LhxAggExecStreamParams const & | params | ) | [private] |
Definition at line 557 of file LhxAggExecStream.cpp.
References AGG_FUNC_COUNT, AGG_FUNC_MAX, AGG_FUNC_MIN, AGG_FUNC_SINGLE_VALUE, AGG_FUNC_SUM, SortedAggExecStreamParams::aggInvocations, LhxHashInfo::aggsProj, LhxAggExecStreamParams::cndGroupByKeys, LhxHashInfo::cndKeys, LhxHashInfo::dataProj, LhxHashInfo::externalSegmentAccessor, LhxHashInfo::filterNull, LhxHashInfo::filterNullKeyProj, SortedAggExecStreamParams::groupByKeyCount, HASH_TRIM_NONE, HASH_TRIM_UNICODE_VARCHAR, HASH_TRIM_VARCHAR, hashInfo, LhxHashInfo::inputDesc, LhxHashInfo::isKeyColVarChar, LhxHashInfo::keyProj, LhxHashInfo::memSegmentAccessor, StandardTypeDescriptorFactory::newDataType(), LhxAggExecStreamParams::numRows, LhxHashInfo::numRows, SegmentAccessor::pCacheAccessor, SingleInputExecStream::pInAccessor, TupleDescriptor::projectFrom(), SegmentAccessor::pSegment, LhxAggExecStreamParams::pTempSegment, LhxHashInfo::removeDuplicate, STANDARD_TYPE_INT_64, STANDARD_TYPE_UNICODE_VARCHAR, STANDARD_TYPE_VARCHAR, LhxHashInfo::streamBufAccessor, and LhxHashInfo::useJoinFilter.
Referenced by prepare().
00559 { 00560 TupleDescriptor inputDesc = pInAccessor->getTupleDesc(); 00561 00562 hashInfo.streamBufAccessor.push_back(pInAccessor); 00563 00564 hashInfo.cndKeys.push_back(params.cndGroupByKeys); 00565 00566 hashInfo.numRows.push_back(params.numRows); 00567 00568 hashInfo.filterNull.push_back(false); 00569 00570 // empty projection : do not filter nulls 00571 TupleProjection filterNullKeyProj; 00572 hashInfo.filterNullKeyProj.push_back(filterNullKeyProj); 00573 00574 hashInfo.removeDuplicate.push_back(false); 00575 hashInfo.useJoinFilter.push_back(false); 00576 00577 hashInfo.memSegmentAccessor = params.scratchAccessor; 00578 hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor; 00579 hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment; 00580 00581 TupleProjection keyProj; 00582 vector<LhxHashTrim> isKeyColVarChar; 00583 00584 for (int i = 0; i < params.groupByKeyCount; i ++) { 00585 keyProj.push_back(i); 00586 /* 00587 * Hashing is special for varchar types (the trailing blanks are 00588 * insignificant). 00589 */ 00590 StoredTypeDescriptor::Ordinal ordinal = 00591 inputDesc[i].pTypeDescriptor->getOrdinal(); 00592 if (ordinal == STANDARD_TYPE_VARCHAR) { 00593 isKeyColVarChar.push_back(HASH_TRIM_VARCHAR); 00594 } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) { 00595 isKeyColVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR); 00596 } else { 00597 isKeyColVarChar.push_back(HASH_TRIM_NONE); 00598 } 00599 } 00600 hashInfo.keyProj.push_back(keyProj); 00601 hashInfo.isKeyColVarChar.push_back(isKeyColVarChar); 00602 00603 /* 00604 * Empty data projection. 00605 */ 00606 TupleProjection dataProj; 00607 hashInfo.dataProj.push_back(dataProj); 00608 00609 /* 00610 * Set up keyDesc 00611 */ 00612 TupleDescriptor keyDesc; 00613 keyDesc.projectFrom(inputDesc, keyProj); 00614 00615 /* 00616 * Attribute descriptor for COUNT output 00617 */ 00618 StandardTypeDescriptorFactory stdTypeFactory; 00619 TupleAttributeDescriptor countDesc( 00620 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00621 00622 // REVIEW jvs 25-Aug-2006: It's possible to get rid of this nullability 00623 // type transformation (but it requires matching changes at the Farrago 00624 // level). The reason is that LhxAggExecStream is only used for GROUP BY. 00625 // Since empty groups are only possible with full-table agg, they are not 00626 // an issue with GROUP BY. So, the output can only be null if the input 00627 // admits nulls. However, the validator currently applies the 00628 // transformation in all cases (e.g. SqlSumAggFunction uses 00629 // rtiFirstArgTypeForceNullable). To do it right, it would need to be 00630 // context-sensitive (and SortedAggExecStream would need to be changed to 00631 // match, discriminating on whether any group keys were specified). 00632 // Probably not worth it. 00633 00634 // REVIEW jvs 25-Aug-2006: What is the prevTupleDesc mentioned here? 00635 /* 00636 Compute the accumulator result portion of prevTupleDesc based on 00637 requested aggregate function invocations, and instantiate polymorphic 00638 AggComputers bound to correct inputs. 00639 */ 00640 int i = params.groupByKeyCount; 00641 for (AggInvocationConstIter pInvocation(params.aggInvocations.begin()); 00642 pInvocation != params.aggInvocations.end(); 00643 ++pInvocation) 00644 { 00645 switch (pInvocation->aggFunction) { 00646 case AGG_FUNC_COUNT: 00647 keyDesc.push_back(countDesc); 00648 break; 00649 case AGG_FUNC_SUM: 00650 case AGG_FUNC_MIN: 00651 case AGG_FUNC_MAX: 00652 case AGG_FUNC_SINGLE_VALUE: 00653 // Key type is same as input type, but nullable 00654 keyDesc.push_back(inputDesc[pInvocation->iInputAttr]); 00655 keyDesc.back().isNullable = true; 00656 break; 00657 } 00658 hashInfo.aggsProj.push_back(i++); 00659 } 00660 00661 hashInfo.inputDesc.push_back(keyDesc); 00662 }
void LhxAggExecStream::setAggComputers | ( | LhxHashInfo & | hashInfo, | |
AggInvocationList const & | aggInvocations | |||
) | [private] |
Definition at line 482 of file LhxAggExecStream.cpp.
References AGG_FUNC_COUNT, AGG_FUNC_MAX, AGG_FUNC_MIN, AGG_FUNC_SINGLE_VALUE, AGG_FUNC_SUM, aggComputers, LhxHashInfo::aggsProj, hashInfo, LhxHashInfo::inputDesc, AggComputer::newAggComputer(), partialAggComputers, and SingleInputExecStream::pInAccessor.
Referenced by prepare().
00485 { 00486 /* 00487 * InputDesc from underlying producer. 00488 */ 00489 TupleDescriptor inputDesc = pInAccessor->getTupleDesc(); 00490 00491 /* 00492 * TupleDescriptor used by the hash table, of the format: 00493 * [ group-by keys, aggregates ] 00494 */ 00495 TupleDescriptor &hashDesc = hashInfo.inputDesc.back(); 00496 00497 /* 00498 * Fields corresponding to the aggregates in hashDesc 00499 */ 00500 TupleProjection &aggsProj = hashInfo.aggsProj; 00501 00506 AggFunction partialAggFunction; 00507 00508 uint i = 0; 00509 00510 assert (aggInvocations.size() == aggsProj.size()); 00511 00512 for (AggInvocationConstIter pInvocation(aggInvocations.begin()); 00513 pInvocation != aggInvocations.end(); 00514 ++pInvocation) 00515 { 00516 switch (pInvocation->aggFunction) { 00517 case AGG_FUNC_COUNT: 00518 partialAggFunction = AGG_FUNC_SUM; 00519 break; 00520 case AGG_FUNC_SUM: 00521 case AGG_FUNC_MIN: 00522 case AGG_FUNC_MAX: 00523 case AGG_FUNC_SINGLE_VALUE: 00524 partialAggFunction = pInvocation->aggFunction; 00525 break; 00526 default: 00527 permFail("unknown aggregation function: " 00528 << pInvocation->aggFunction); 00529 break; 00530 } 00531 00532 /* 00533 * Add to aggregate computer list. 00534 */ 00535 TupleAttributeDescriptor const *pInputAttr = NULL; 00536 if (pInvocation->iInputAttr != -1) { 00537 pInputAttr = &(inputDesc[pInvocation->iInputAttr]); 00538 } 00539 aggComputers.push_back( 00540 AggComputer::newAggComputer( 00541 pInvocation->aggFunction, pInputAttr)); 00542 aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr); 00543 00544 /* 00545 * Add to partial aggregate computer list. 00546 */ 00547 TupleAttributeDescriptor const *pInputAttrPartialAgg = 00548 &(hashDesc[aggsProj[i]]); 00549 partialAggComputers.push_back( 00550 AggComputer::newAggComputer( 00551 partialAggFunction, pInputAttrPartialAgg)); 00552 partialAggComputers.back().setInputAttrIndex(aggsProj[i]); 00553 i ++; 00554 } 00555 }
void LhxAggExecStream::prepare | ( | LhxAggExecStreamParams const & | params | ) | [virtual] |
Definition at line 33 of file LhxAggExecStream.cpp.
References SortedAggExecStreamParams::aggInvocations, buildInputIndex, LhxHashTable::calculateSize(), TupleData::compute(), LhxAggExecStreamParams::enableSubPartStat, enableSubPartStat, LhxAggExecStreamParams::forcePartitionLevel, forcePartitionLevel, hashInfo, hashTable, LhxHashInfo::inputDesc, LhxPlan::LhxChildPartCount, numBlocksHashTable, numMiscCacheBlocks, outputTuple, SingleOutputExecStream::pOutAccessor, ConduitExecStream::prepare(), setAggComputers(), and setHashInfo().
00035 { 00036 ConduitExecStream::prepare(params); 00037 00038 setHashInfo(params); 00039 setAggComputers(hashInfo, params.aggInvocations); 00040 00041 /* 00042 * Force partitioning level. Only set in tests. 00043 */ 00044 forcePartitionLevel = params.forcePartitionLevel; 00045 enableSubPartStat = params.enableSubPartStat; 00046 00047 buildInputIndex = hashInfo.inputDesc.size() - 1; 00048 00049 /* 00050 * number of block and slots required to perform the aggregation in memory, 00051 * using estimates from the optimizer. 00052 */ 00053 hashTable.calculateSize(hashInfo, buildInputIndex, numBlocksHashTable); 00054 00055 TupleDescriptor outputDesc; 00056 00057 outputDesc = hashInfo.inputDesc[buildInputIndex]; 00058 00059 if (!params.outputTupleDesc.empty()) { 00060 assert (outputDesc == params.outputTupleDesc); 00061 } 00062 00063 outputTuple.compute(outputDesc); 00064 pOutAccessor->setTupleShape(outputDesc); 00065 00066 /* 00067 * Set aside one cache block per child partition writer for I/O 00068 */ 00069 uint numInputs = 1; 00070 numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs; 00071 }
void LhxAggExecStream::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from ConduitExecStream.
Definition at line 103 of file LhxAggExecStream.cpp.
References aggComputers, aggState, LhxHashTable::allocateResources(), Build, buildInputIndex, buildPart, buildReader, curPlan, enableSubPartStat, ForcePartitionBuild, forcePartitionLevel, LhxPlan::getPartition(), hashInfo, hashTable, hashTableReader, LhxPartitionInfo::init(), LhxHashTableReader::init(), LhxHashTable::init(), isTopPlan, LhxPartitionReader::open(), ConduitExecStream::open(), partInfo, LhxHashTable::releaseResources(), and rootPlan.
00104 { 00105 ConduitExecStream::open(restart); 00106 00107 if (restart) { 00108 hashTable.releaseResources(); 00109 } 00110 00111 uint partitionLevel = 0; 00112 hashTable.init(partitionLevel, hashInfo, &aggComputers, buildInputIndex); 00113 hashTableReader.init(&hashTable, hashInfo, buildInputIndex); 00114 00115 bool status = hashTable.allocateResources(); 00116 00117 assert(status); 00118 00119 // REVIEW jvs 25-Aug-2006: Fennel coding convention is pParentPlan, 00120 // pBuildPart, etc. Same comment applies everywhere. Also, consider using 00121 // boost::ptr_vector<LhxPartition> rather than 00122 // std::vector<SharedLhxPartition> (unless shared pointers are really 00123 // required). 00124 00125 /* 00126 * Create the root plan. 00127 * 00128 * The execute state machine operates at the plan level. 00129 */ 00130 vector<SharedLhxPartition> partitionList; 00131 00132 buildPart = SharedLhxPartition(new LhxPartition(this)); 00133 // REVIEW jvs 25-Aug-2006: Why does buildPart->segStream need to be reset 00134 // immediately after construction? 00135 buildPart->segStream.reset(); 00136 buildPart->inputIndex = 0; 00137 partitionList.push_back(buildPart); 00138 00139 rootPlan = SharedLhxPlan(new LhxPlan()); 00140 rootPlan->init( 00141 WeakLhxPlan(), 00142 partitionLevel, 00143 partitionList, 00144 enableSubPartStat); 00145 00146 /* 00147 * initialize recursive partitioning context. 00148 */ 00149 partInfo.init(&hashInfo); 00150 00151 /* 00152 * Now starts at the first (root) plan. 00153 */ 00154 curPlan = rootPlan.get(); 00155 isTopPlan = true; 00156 00157 buildReader.open(curPlan->getPartition(buildInputIndex), hashInfo); 00158 00159 aggState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00160 }
ExecStreamResult LhxAggExecStream::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 162 of file LhxAggExecStream.cpp.
References LhxHashTable::addTuple(), aggComputers, aggState, LhxHashTable::allocateResources(), Build, buildInputIndex, buildReader, ExecStream::checkAbort(), LhxPartitionInfo::close(), LhxPartitionReader::close(), TupleData::compute(), LhxPartitionReader::consumeTuple(), CreateChildPlan, LhxPlan::createChildren(), curPlan, LhxPartitionReader::demandData(), Done, EXECBUF_EOS, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ForcePartitionBuild, forcePartitionLevel, LhxPlan::generatePartitions(), LhxPlan::getFirstChild(), LhxHashTableReader::getNext(), LhxPlan::getNextLeaf(), GetNextPlan, LhxPlan::getPartition(), LhxPlan::getPartitionLevel(), LhxPartitionReader::getState(), LhxPartitionReader::getTupleDesc(), hashInfo, hashTable, hashTableReader, LhxHashTableReader::init(), LhxHashTable::init(), inputTuple, isTopPlan, LhxPartitionReader::isTupleConsumptionPending(), nextState, ExecStreamQuantum::nTuplesMax, numTuplesProduced, LhxPartitionReader::open(), LhxPartitionInfo::open(), outputTuple, partialAggComputers, partInfo, Partition, PartitionUnderflow, SingleOutputExecStream::pOutAccessor, Produce, ProducePending, LhxHashTable::releaseResources(), LhxPlan::toString(), TRACE_FINE, and LhxPartitionReader::unmarshalTuple().
00163 { 00164 while (true) { 00165 // REVIEW jvs 25-Aug-2006: Some compilers do better if you 00166 // put the most commonly used cases first in a switch. Definitely 00167 // from a "follow-the-logic" standpoint, a testing-only state 00168 // like ForcePartitionBuild belongs last. 00169 switch (aggState) { 00170 // REVIEW jvs 25-Aug-2006: I'm not sure that repeating all 00171 // of this code between the ForcePartitionBuild and Build 00172 // states is worth it just to remove one test from the 00173 // inner loop. 00174 case ForcePartitionBuild: 00175 { 00176 /* 00177 * Build 00178 */ 00179 // REVIEW jvs 25-Aug-2006: Is it really necessary to compute 00180 // the tuple every time through here? 00181 inputTuple.compute(buildReader.getTupleDesc()); 00182 for (;;) { 00183 if (!buildReader.isTupleConsumptionPending()) { 00184 if (buildReader.getState() == EXECBUF_EOS) { 00185 numTuplesProduced = 0; 00186 /* 00187 * break out of this loop, and start returning. 00188 */ 00189 aggState = Produce; 00190 break; 00191 } 00192 00193 if (!buildReader.demandData()) { 00194 if (isTopPlan) { 00195 /* 00196 * Top level: request more data from stream 00197 * producer. 00198 */ 00199 return EXECRC_BUF_UNDERFLOW; 00200 } else { 00201 /* 00202 * Recursive level: no more data in partition. 00203 * Come back to the top of the same state to 00204 * detect EOS. 00205 */ 00206 break; 00207 } 00208 } 00209 buildReader.unmarshalTuple(inputTuple); 00210 } 00211 00212 /* 00213 * Add tuple to hash table. 00214 * 00215 * NOTE: This is a testing state. Always partition up to 00216 * forcePartitionLevel. 00217 */ 00218 if (curPlan->getPartitionLevel() < forcePartitionLevel || 00219 !hashTable.addTuple(inputTuple)) { 00220 if (isTopPlan) { 00221 partInfo.open( 00222 &hashTableReader, 00223 &buildReader, 00224 inputTuple, 00225 &aggComputers); 00226 } else { 00227 partInfo.open( 00228 &hashTableReader, 00229 &buildReader, 00230 inputTuple, 00231 &partialAggComputers); 00232 } 00233 aggState = Partition; 00234 break; 00235 } 00236 buildReader.consumeTuple(); 00237 } 00238 break; 00239 } 00240 case Build: 00241 { 00242 /* 00243 * Build 00244 */ 00245 inputTuple.compute(buildReader.getTupleDesc()); 00246 for (;;) { 00247 if (!buildReader.isTupleConsumptionPending()) { 00248 if (buildReader.getState() == EXECBUF_EOS) { 00249 buildReader.close(); 00250 numTuplesProduced = 0; 00251 /* 00252 * break out of this loop, and start returning. 00253 */ 00254 aggState = Produce; 00255 break; 00256 } 00257 00258 if (!buildReader.demandData()) { 00259 if (isTopPlan) { 00260 /* 00261 * Top level: request more data from stream 00262 * producer. 00263 */ 00264 return EXECRC_BUF_UNDERFLOW; 00265 } else { 00266 /* 00267 * Recursive level: no more data in partition. 00268 * Come back to the top of the same state to 00269 * detect EOS. 00270 */ 00271 break; 00272 } 00273 } 00274 buildReader.unmarshalTuple(inputTuple); 00275 } 00276 00277 /* 00278 * Add tuple to hash table. 00279 */ 00280 if (!hashTable.addTuple(inputTuple)) { 00281 if (isTopPlan) { 00282 partInfo.open( 00283 &hashTableReader, 00284 &buildReader, 00285 inputTuple, 00286 &aggComputers); 00287 } else { 00288 partInfo.open( 00289 &hashTableReader, 00290 &buildReader, 00291 inputTuple, 00292 &partialAggComputers); 00293 } 00294 aggState = Partition; 00295 break; 00296 } 00297 buildReader.consumeTuple(); 00298 } 00299 break; 00300 } 00301 case Partition: 00302 { 00303 for (;;) { 00304 if (curPlan->generatePartitions(hashInfo, partInfo) 00305 == PartitionUnderflow) { 00306 /* 00307 * Request more data from producer. 00308 */ 00309 return EXECRC_BUF_UNDERFLOW; 00310 } else { 00311 // REVIEW jvs 25-Aug-2006: only one input for agg.. 00312 /* 00313 * Finished building the partitions for both 00314 * inputs. 00315 */ 00316 break; 00317 } 00318 } 00319 partInfo.close(); 00320 aggState = CreateChildPlan; 00321 break; 00322 } 00323 case CreateChildPlan: 00324 { 00325 /* 00326 * Link the newly created partitioned in the plan tree. 00327 */ 00328 curPlan->createChildren(partInfo, false, false); 00329 00330 FENNEL_TRACE(TRACE_FINE, curPlan->toString()); 00331 00332 // REVIEW jvs 25-Aug-2006: This comment makes it sound 00333 // like it's walking multiple levels in the plan tree 00334 // right here, but really it's just walking down to the 00335 // first leaf it just created (i.e. one step in 00336 // recursion if curPlan was already non-root). 00337 /* 00338 * now recurse down the plan tree to get the first leaf plan. 00339 */ 00340 curPlan = curPlan->getFirstChild().get(); 00341 isTopPlan = false; 00342 00343 hashTable.releaseResources(); 00344 00345 /* 00346 * At recursive level, the input tuple shape is 00347 * different. Inputs are all partial aggregates now. 00348 * Hash table needs to aggregate partial results using the 00349 * correct agg computer interface. 00350 */ 00351 hashTable.init( 00352 curPlan->getPartitionLevel(), 00353 hashInfo, 00354 &partialAggComputers, 00355 buildInputIndex); 00356 hashTableReader.init( 00357 &hashTable, 00358 hashInfo, 00359 buildInputIndex); 00360 00361 bool status = hashTable.allocateResources(); 00362 assert(status); 00363 00364 buildReader.open( 00365 curPlan->getPartition(buildInputIndex), 00366 hashInfo); 00367 00368 aggState = 00369 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00370 break; 00371 } 00372 case GetNextPlan: 00373 { 00374 hashTable.releaseResources(); 00375 00376 checkAbort(); 00377 00378 curPlan = curPlan->getNextLeaf(); 00379 00380 if (curPlan) { 00381 /* 00382 * At recursive level, the input tuple shape is 00383 * different. Inputs are all partial aggregates now. 00384 * Hash table needs to aggregate partial results using the 00385 * correct agg computer interface. 00386 */ 00387 hashTable.init( 00388 curPlan->getPartitionLevel(), 00389 hashInfo, 00390 &partialAggComputers, 00391 buildInputIndex); 00392 hashTableReader.init(&hashTable, hashInfo, buildInputIndex); 00393 bool status = hashTable.allocateResources(); 00394 assert(status); 00395 00396 buildReader.open( 00397 curPlan->getPartition(buildInputIndex), 00398 hashInfo); 00399 00400 aggState = 00401 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00402 } else { 00403 aggState = Done; 00404 } 00405 break; 00406 } 00407 case Produce: 00408 { 00409 // REVIEW jvs 25-Aug-2006: Is there a reason tuples can't be 00410 // pumped out in a loop right here? Popping in and out of 00411 // the state machine for every tuple is a bit of overhead. 00412 // It's only a couple of lines of code which would be 00413 // duplicated. (An inline method would contradict 00414 // my earlier comment about numTuplesProduced being 00415 // a local variable.) 00416 /* 00417 * Producing the results. Handle output overflow and quantum 00418 * expiration in ProducePending. 00419 */ 00420 if (hashTableReader.getNext(outputTuple)) { 00421 aggState = ProducePending; 00422 /* 00423 * Come back to this state after producing the output tuple 00424 * successfully. 00425 */ 00426 nextState = Produce; 00427 } else { 00428 aggState = GetNextPlan; 00429 } 00430 break; 00431 } 00432 case ProducePending: 00433 { 00434 if (pOutAccessor->produceTuple(outputTuple)) { 00435 numTuplesProduced++; 00436 aggState = nextState; 00437 } else { 00438 numTuplesProduced = 0; 00439 return EXECRC_BUF_OVERFLOW; 00440 } 00441 00442 /* 00443 * Successfully produced an output row. Now check if quantum 00444 * has expired. 00445 */ 00446 if (numTuplesProduced >= quantum.nTuplesMax) { 00447 /* 00448 * Reset count. 00449 */ 00450 numTuplesProduced = 0; 00451 return EXECRC_QUANTUM_EXPIRED; 00452 } 00453 break; 00454 } 00455 case Done: 00456 { 00457 pOutAccessor->markEOS(); 00458 return EXECRC_EOS; 00459 } 00460 } 00461 } 00462 00463 /* 00464 * The state machine should never come here. 00465 */ 00466 assert(false); 00467 }
void LhxAggExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented from ExecStream.
Definition at line 73 of file LhxAggExecStream.cpp.
References EXEC_RESOURCE_ESTIMATE, EXEC_RESOURCE_UNBOUNDED, ExecStream::getResourceRequirements(), isMAXU(), LhxPlan::LhxChildPartCount, LhxHashTable::LhxHashTableMinPages, max(), ExecStreamResourceQuantity::nCachePages, numBlocksHashTable, and numMiscCacheBlocks.
00077 { 00078 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00079 00080 uint minPages = 00081 LhxHashTable::LhxHashTableMinPages * LhxPlan::LhxChildPartCount 00082 + numMiscCacheBlocks; 00083 minQuantity.nCachePages += minPages; 00084 // if valid stats weren't passed in, make an unbounded resource request 00085 if (isMAXU(numBlocksHashTable)) { 00086 optType = EXEC_RESOURCE_UNBOUNDED; 00087 } else { 00088 // make sure the opt is bigger than the min; otherwise, the 00089 // resource governor won't try to give it extra 00090 optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable); 00091 optType = EXEC_RESOURCE_ESTIMATE; 00092 } 00093 }
void LhxAggExecStream::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented from ExecStream.
Definition at line 95 of file LhxAggExecStream.cpp.
References hashInfo, ExecStreamResourceQuantity::nCachePages, LhxHashInfo::numCachePages, numMiscCacheBlocks, and ExecStream::setResourceAllocation().
00097 { 00098 ConduitExecStream::setResourceAllocation(quantity); 00099 hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks; 00100 }
ExecStreamResult ConduitExecStream::precheckConduitBuffers | ( | ) | [protected, inherited] |
Checks the state of the input and output buffers.
If input empty, requests production. If input EOS, propagates that to output buffer. If output full, returns EXECRC_OVERFLOW.
Definition at line 61 of file ConduitExecStream.cpp.
References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, SingleInputExecStream::pInAccessor, and SingleOutputExecStream::pOutAccessor.
Referenced by ExternalSortExecStreamImpl::execute(), FtrsTableWriterExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreeInsertExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), ReshapeExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), and CalcExecStream::execute().
00062 { 00063 switch (pInAccessor->getState()) { 00064 case EXECBUF_EMPTY: 00065 pInAccessor->requestProduction(); 00066 return EXECRC_BUF_UNDERFLOW; 00067 case EXECBUF_UNDERFLOW: 00068 return EXECRC_BUF_UNDERFLOW; 00069 case EXECBUF_EOS: 00070 pOutAccessor->markEOS(); 00071 return EXECRC_EOS; 00072 case EXECBUF_NONEMPTY: 00073 case EXECBUF_OVERFLOW: 00074 break; 00075 default: 00076 permAssert(false); 00077 } 00078 if (pOutAccessor->getState() == EXECBUF_OVERFLOW) { 00079 return EXECRC_BUF_OVERFLOW; 00080 } 00081 return EXECRC_YIELD; 00082 }
void ConduitExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Reimplemented from SingleInputExecStream.
Definition at line 36 of file ConduitExecStream.cpp.
References SingleOutputExecStream::setOutputBufAccessors().
00038 { 00039 SingleOutputExecStream::setOutputBufAccessors(outAccessors); 00040 }
void ConduitExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Reimplemented from SingleInputExecStream.
Definition at line 30 of file ConduitExecStream.cpp.
References SingleInputExecStream::setInputBufAccessors().
00032 { 00033 SingleInputExecStream::setInputBufAccessors(inAccessors); 00034 }
void ConduitExecStream::prepare | ( | ConduitExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 42 of file ConduitExecStream.cpp.
References SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
Referenced by ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmNormalizerExecStream::prepare(), prepare(), FtrsTableWriterExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SegBufferReaderExecStream::prepare(), SegBufferExecStream::prepare(), ScratchBufferExecStream::prepare(), ReshapeExecStream::prepare(), DoubleBufferExecStream::prepare(), CopyExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), and CalcExecStream::prepare().
00043 { 00044 SingleInputExecStream::prepare(params); 00045 00046 if (params.outputTupleDesc.empty()) { 00047 pOutAccessor->setTupleShape( 00048 pInAccessor->getTupleDesc(), 00049 pInAccessor->getTupleFormat()); 00050 } 00051 00052 SingleOutputExecStream::prepare(params); 00053 }
void SingleInputExecStream::prepare | ( | SingleInputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 44 of file SingleInputExecStream.cpp.
References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().
Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().
00045 { 00046 ExecStream::prepare(params); 00047 00048 assert(pInAccessor); 00049 assert(pInAccessor->getProvision() == getInputBufProvision()); 00050 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void SingleOutputExecStream::prepare | ( | SingleOutputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 48 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().
Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().
00049 { 00050 ExecStream::prepare(params); 00051 assert(pOutAccessor); 00052 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00053 if (pOutAccessor->getTupleDesc().empty()) { 00054 assert(!params.outputTupleDesc.empty()); 00055 pOutAccessor->setTupleShape( 00056 params.outputTupleDesc, 00057 params.outputTupleFormat); 00058 } 00059 }
ExecStreamBufProvision SingleInputExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from ExecStream.
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, and SegBufferWriterExecStream.
Definition at line 62 of file SingleInputExecStream.cpp.
References BUFPROV_PRODUCER.
Referenced by SingleInputExecStream::prepare().
00063 { 00064 return BUFPROV_PRODUCER; 00065 }
bool ExecStream::canEarlyClose | ( | ) | [virtual, inherited] |
Reimplemented in SegBufferWriterExecStream.
Definition at line 49 of file ExecStream.cpp.
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual, inherited] |
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, FlatFileExecStreamImpl, BTreeInsertExecStream, BTreeReadExecStream, FtrsTableWriterExecStream, LbmChopperExecStream, LbmSplicerExecStream, LcsClusterAppendExecStream, LcsClusterReplaceExecStream, LcsRowScanBaseExecStream, and LcsRowScanExecStream.
Definition at line 102 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, and ExecStreamResourceQuantity::nThreads.
00105 { 00106 minQuantity.nThreads = 0; 00107 minQuantity.nCachePages = 0; 00108 optQuantity = minQuantity; 00109 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented in BarrierExecStream, DiffluenceExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, SingleOutputExecStream, SplitterExecStream, ValuesExecStream, JavaTransformExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.
Definition at line 172 of file ExecStream.cpp.
References BUFPROV_NONE.
00173 { 00174 return BUFPROV_NONE; 00175 }
ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented from ExecStream.
Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.
Definition at line 69 of file SingleOutputExecStream.cpp.
References BUFPROV_CONSUMER.
Referenced by SingleOutputExecStream::prepare().
00070 { 00071 return BUFPROV_CONSUMER; 00072 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
TupleData LhxAggExecStream::inputTuple [private] |
TupleData LhxAggExecStream::outputTuple [private] |
uint LhxAggExecStream::numTuplesProduced [private] |
Number of tuples produced within the current quantum.
Definition at line 117 of file LhxAggExecStream.h.
Referenced by execute().
LhxHashInfo LhxAggExecStream::hashInfo [private] |
Hash aggregation info.
Definition at line 122 of file LhxAggExecStream.h.
Referenced by execute(), open(), prepare(), setAggComputers(), setHashInfo(), and setResourceAllocation().
LhxHashTable LhxAggExecStream::hashTable [private] |
HashTable to use.
Definition at line 127 of file LhxAggExecStream.h.
Referenced by closeImpl(), execute(), open(), and prepare().
BlockNum LhxAggExecStream::numBlocksHashTable [private] |
Initial estimate of blocks required.
Definition at line 133 of file LhxAggExecStream.h.
Referenced by getResourceRequirements(), and prepare().
BlockNum LhxAggExecStream::numMiscCacheBlocks [private] |
Number of cache blocks set aside for I/O.
If MAXU, no stats are available to compute this value.
Definition at line 139 of file LhxAggExecStream.h.
Referenced by getResourceRequirements(), prepare(), and setResourceAllocation().
bool LhxAggExecStream::isTopPlan [private] |
SharedLhxPlan LhxAggExecStream::rootPlan [private] |
LhxPlan* LhxAggExecStream::curPlan [private] |
uint LhxAggExecStream::buildInputIndex [private] |
LhxPartitionInfo LhxAggExecStream::partInfo [private] |
Partition context used in recursive partitioning.
Definition at line 169 of file LhxAggExecStream.h.
The build partition (which is also the only partition).
Definition at line 174 of file LhxAggExecStream.h.
Referenced by open().
bool LhxAggExecStream::enableSubPartStat [private] |
uint LhxAggExecStream::forcePartitionLevel [private] |
This is set only in tests.
Force partitioning level.
Definition at line 190 of file LhxAggExecStream.h.
LhxAggState LhxAggExecStream::aggState [private] |
LhxAggState LhxAggExecStream::nextState [private] |
The next state of the AggExecStream.
Definition at line 203 of file LhxAggExecStream.h.
Referenced by execute().
uint LhxAggExecStream::groupByKeyCount [private] |
Definition at line 211 of file LhxAggExecStream.h.
Definition at line 217 of file LhxAggExecStream.h.
Referenced by execute(), open(), and setAggComputers().
SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited] |
Definition at line 51 of file SingleInputExecStream.h.
Referenced by SortedAggExecStream::compareGroupByKeys(), ExternalSortExecStreamImpl::computeFirstResult(), ExternalSortExecStreamImpl::execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), LcsClusterReplaceExecStream::getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), LcsClusterReplaceExecStream::open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), SegBufferWriterExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), setAggComputers(), setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().
SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited] |
Definition at line 56 of file SingleOutputExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().