#include <SegBufferWriterExecStream.h>
Inheritance diagram for SegBufferWriterExecStream:
Public Member Functions | |
virtual bool | canEarlyClose () |
| |
virtual void | prepare (SegBufferWriterExecStreamParams const ¶ms) |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | closeImpl () |
Implements ClosableObject. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Indicate to the consumer if the buffer is provided by this exec stream which is the producer. | |
virtual void | prepare (DiffluenceExecStreamParams const ¶ms) |
virtual void | prepare (SingleInputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
Protected Attributes | |
std::vector< SharedExecStreamBufAccessor > | outAccessors |
List of output buffer accessors. | |
TupleDescriptor | outputTupleDesc |
Output tuple descriptor. | |
SharedExecStreamBufAccessor | pInAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
Private Member Functions | |
int64_t | readReaderRefCount () |
Reads and returns the current value of the reader reference count stored in a dynamic parameter. | |
Private Attributes | |
SegmentAccessor | bufferSegmentAccessor |
SharedSegBufferWriter | pSegBufferWriter |
DynamicParamId | readerRefCountParamId |
PageId | firstBufferPageId |
TupleData | outputTuple |
boost::scoped_array< FixedBuffer > | outputTupleBuffer |
uint | outputBufSize |
std::vector< bool > | outputWritten |
uint | nOutputsWritten |
bool | paramCreated |
It does not write the buffered data to its output stream. When this stream has completed buffering its input, it will pass the first pageId of the buffered data through its output stream to any reader streams that are ready to start processing data.
The writer and reader stream instances all share a dynamic parameter. The dynamic parameter is created by this stream and read by the reader streams. The parameter is a reference counter used to keep track of the number of active readers, which this stream uses to determine if it can destroy the buffered data.
Definition at line 65 of file SegBufferWriterExecStream.h.
int64_t SegBufferWriterExecStream::readReaderRefCount | ( | ) | [private] |
Reads and returns the current value of the reader reference count stored in a dynamic parameter.
Definition at line 98 of file SegBufferWriterExecStream.cpp.
References TupleDatum::cbData, paramCreated, TupleDatum::pData, ExecStream::pDynamicParamManager, and readerRefCountParamId.
Referenced by canEarlyClose(), and closeImpl().
00099 { 00100 if (!paramCreated) { 00101 // If the stream was never opened, then the parameter will not have 00102 // been created. 00103 return 0; 00104 } 00105 00106 int64_t refCount; 00107 TupleDatum refCountDatum; 00108 refCountDatum.pData = (PConstBuffer) &refCount; 00109 refCountDatum.cbData = 8; 00110 pDynamicParamManager->readParam(readerRefCountParamId, refCountDatum); 00111 return refCount; 00112 }
bool SegBufferWriterExecStream::canEarlyClose | ( | ) | [virtual] |
Reimplemented from ExecStream.
Definition at line 93 of file SegBufferWriterExecStream.cpp.
References readReaderRefCount().
00094 { 00095 return (readReaderRefCount() == 0); 00096 }
void SegBufferWriterExecStream::prepare | ( | SegBufferWriterExecStreamParams const & | params | ) | [virtual] |
Definition at line 34 of file SegBufferWriterExecStream.cpp.
References bufferSegmentAccessor, TupleData::compute(), firstBufferPageId, DiffluenceExecStream::outAccessors, outputBufSize, outputTuple, outputWritten, paramCreated, DiffluenceExecStream::prepare(), SegBufferWriterExecStreamParams::readerRefCountParamId, and readerRefCountParamId.
00036 { 00037 DiffluenceExecStream::prepare(params); 00038 bufferSegmentAccessor = params.scratchAccessor; 00039 readerRefCountParamId = params.readerRefCountParamId; 00040 paramCreated = false; 00041 00042 for (uint i = 0; i < outAccessors.size(); i++) { 00043 assert(outAccessors[i]->getTupleDesc().size() == 1); 00044 } 00045 outputTuple.compute(outAccessors[0]->getTupleDesc()); 00046 outputTuple[0].pData = (PConstBuffer) &firstBufferPageId; 00047 outputBufSize = 00048 outAccessors[0]->getScratchTupleAccessor().getMaxByteCount(); 00049 outputWritten.resize(outAccessors.size()); 00050 }
void SegBufferWriterExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual] |
Reimplemented from ExecStream.
Definition at line 52 of file SegBufferWriterExecStream.cpp.
References ExecStream::getResourceRequirements(), and ExecStreamResourceQuantity::nCachePages.
00055 { 00056 DiffluenceExecStream::getResourceRequirements(minQuantity, optQuantity); 00057 00058 // set aside 1 page for I/O 00059 minQuantity.nCachePages += 1; 00060 optQuantity = minQuantity; 00061 }
void SegBufferWriterExecStream::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from DiffluenceExecStream.
Definition at line 63 of file SegBufferWriterExecStream.cpp.
References BUFPROV_CONSUMER, firstBufferPageId, FixedBuffer, StandardTypeDescriptorFactory::newDataType(), nOutputsWritten, NULL_PAGE_ID, DiffluenceExecStream::open(), outputBufSize, outputTupleBuffer, outputWritten, paramCreated, ExecStream::pDynamicParamManager, SingleInputExecStream::pInAccessor, readerRefCountParamId, and STANDARD_TYPE_UINT_64.
00064 { 00065 assert(pInAccessor); 00066 assert(pInAccessor->getProvision() == BUFPROV_CONSUMER); 00067 00068 std::fill(outputWritten.begin(), outputWritten.end(), false); 00069 nOutputsWritten = 0; 00070 00071 if (!restart) { 00072 StandardTypeDescriptorFactory stdTypeFactory; 00073 TupleAttributeDescriptor attrDesc = 00074 TupleAttributeDescriptor( 00075 stdTypeFactory.newDataType(STANDARD_TYPE_UINT_64)); 00076 pDynamicParamManager->createCounterParam(readerRefCountParamId); 00077 paramCreated = true; 00078 outputTupleBuffer.reset(new FixedBuffer[outputBufSize]); 00079 firstBufferPageId = NULL_PAGE_ID; 00080 } 00081 DiffluenceExecStream::open(restart); 00082 }
ExecStreamResult SegBufferWriterExecStream::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 114 of file SegBufferWriterExecStream.cpp.
References bufferSegmentAccessor, ExecStreamGraphImpl::closeProducers(), EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_EOS, firstBufferPageId, ExecStream::getGraph(), ExecStream::getStreamId(), TupleAccessor::marshal(), SegBufferWriter::newSegBufferWriter(), nOutputsWritten, NULL_PAGE_ID, DiffluenceExecStream::outAccessors, outputBufSize, outputTuple, outputTupleBuffer, outputWritten, SingleInputExecStream::pInAccessor, and pSegBufferWriter.
00115 { 00116 if (nOutputsWritten == outAccessors.size()) { 00117 for (uint i = 0; i < outAccessors.size(); i++) { 00118 outAccessors[i]->markEOS(); 00119 } 00120 return EXECRC_EOS; 00121 } 00122 00123 // Buffer the input 00124 if (firstBufferPageId == NULL_PAGE_ID) { 00125 if (!pSegBufferWriter) { 00126 pSegBufferWriter = 00127 SegBufferWriter::newSegBufferWriter( 00128 pInAccessor, 00129 bufferSegmentAccessor, 00130 true); 00131 } 00132 ExecStreamResult rc = pSegBufferWriter->write(); 00133 if (rc != EXECRC_EOS) { 00134 return rc; 00135 } 00136 // Close the upstream producers 00137 ExecStreamGraphImpl &graphImpl = 00138 dynamic_cast<ExecStreamGraphImpl&>(getGraph()); 00139 graphImpl.closeProducers(getStreamId()); 00140 firstBufferPageId = pSegBufferWriter->getFirstPageId(); 00141 } 00142 00143 // Once the input has been buffered, then pass along the first buffer 00144 // pageId to only those consumers that have explicitly requested data 00145 bool newOutput = false; 00146 for (uint i = 0; i < outAccessors.size(); i++) { 00147 switch (outAccessors[i]->getState()) { 00148 case EXECBUF_NONEMPTY: 00149 case EXECBUF_OVERFLOW: 00150 case EXECBUF_EMPTY: 00151 case EXECBUF_EOS: 00152 break; 00153 00154 case EXECBUF_UNDERFLOW: 00155 // Underflow means the consumer has explicitly requested data 00156 { 00157 assert(!outputWritten[i]); 00158 TupleAccessor *outputTupleAccessor = 00159 &outAccessors[i]->getScratchTupleAccessor(); 00160 outputTupleAccessor->marshal( 00161 outputTuple, 00162 outputTupleBuffer.get()); 00163 outAccessors[i]->provideBufferForConsumption( 00164 outputTupleBuffer.get(), 00165 outputTupleBuffer.get() + outputBufSize); 00166 outputWritten[i] = true; 00167 nOutputsWritten++; 00168 newOutput = true; 00169 break; 00170 } 00171 00172 default: 00173 permAssert(false); 00174 } 00175 } 00176 00177 // Verify that at least one output stream was written 00178 assert(newOutput); 00179 return EXECRC_BUF_OVERFLOW; 00180 }
void SegBufferWriterExecStream::closeImpl | ( | ) | [virtual] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from ExecStream.
Definition at line 84 of file SegBufferWriterExecStream.cpp.
References ExecStream::closeImpl(), outputTupleBuffer, paramCreated, pSegBufferWriter, and readReaderRefCount().
00085 { 00086 assert(readReaderRefCount() == 0); 00087 paramCreated = false; 00088 pSegBufferWriter.reset(); 00089 outputTupleBuffer.reset(); 00090 DiffluenceExecStream::closeImpl(); 00091 }
ExecStreamBufProvision SegBufferWriterExecStream::getInputBufProvision | ( | ) | const [virtual] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from SingleInputExecStream.
Definition at line 182 of file SegBufferWriterExecStream.cpp.
References BUFPROV_CONSUMER.
00183 { 00184 return BUFPROV_CONSUMER; 00185 }
ExecStreamBufProvision SegBufferWriterExecStream::getOutputBufProvision | ( | ) | const [virtual] |
Indicate to the consumer if the buffer is provided by this exec stream which is the producer.
Reimplemented from DiffluenceExecStream.
Definition at line 187 of file SegBufferWriterExecStream.cpp.
References BUFPROV_PRODUCER.
00188 { 00189 return BUFPROV_PRODUCER; 00190 }
void DiffluenceExecStream::prepare | ( | DiffluenceExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 42 of file DiffluenceExecStream.cpp.
References DiffluenceExecStream::getOutputBufProvision(), DiffluenceExecStream::outAccessors, DiffluenceExecStreamParams::outputTupleDesc, DiffluenceExecStreamParams::outputTupleFormat, SingleInputExecStream::pInAccessor, and SingleInputExecStream::prepare().
Referenced by LbmSplicerExecStream::prepare(), and prepare().
00043 { 00044 SingleInputExecStream::prepare(params); 00045 00046 // By default, shape for all outputs is the same as the input if the 00047 // outputTupleDesc wasn't explicitly set. 00048 TupleDescriptor tupleDesc; 00049 TupleFormat tupleFormat; 00050 if (params.outputTupleDesc.empty()) { 00051 tupleDesc = pInAccessor->getTupleDesc(); 00052 tupleFormat = pInAccessor->getTupleFormat(); 00053 } else { 00054 tupleDesc = params.outputTupleDesc; 00055 tupleFormat = params.outputTupleFormat; 00056 } 00057 for (uint i = 0; i < outAccessors.size(); ++i) { 00058 assert(outAccessors[i]->getProvision() == getOutputBufProvision()); 00059 outAccessors[i]->setTupleShape(tupleDesc, tupleFormat); 00060 } 00061 }
void SingleInputExecStream::prepare | ( | SingleInputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 44 of file SingleInputExecStream.cpp.
References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().
Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().
00045 { 00046 ExecStream::prepare(params); 00047 00048 assert(pInAccessor); 00049 assert(pInAccessor->getProvision() == getInputBufProvision()); 00050 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void DiffluenceExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Reimplemented from SingleInputExecStream.
Definition at line 36 of file DiffluenceExecStream.cpp.
References DiffluenceExecStream::outAccessors.
00038 { 00039 outAccessors = outAccessorsInit; 00040 }
void SingleInputExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Implements ExecStream.
Reimplemented in ConduitExecStream.
Definition at line 37 of file SingleInputExecStream.cpp.
References SingleInputExecStream::pInAccessor.
Referenced by ConduitExecStream::setInputBufAccessors().
00039 { 00040 assert(inAccessors.size() == 1); 00041 pInAccessor = inAccessors[0]; 00042 }
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by ExternalSortExecStreamImpl::execute(), execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual, inherited] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 93 of file ExecStream.cpp.
References EXEC_RESOURCE_ACCURATE.
Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().
00097 { 00098 getResourceRequirements(minQuantity, optQuantity); 00099 optType = EXEC_RESOURCE_ACCURATE; 00100 }
void ExecStream::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual, inherited] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.
Definition at line 111 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.
Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().
00113 { 00114 resourceAllocation = quantity; 00115 if (pQuotaAccessor) { 00116 pQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00117 } 00118 if (pScratchQuotaAccessor) { 00119 pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages); 00120 } 00121 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
Definition at line 69 of file SegBufferWriterExecStream.h.
Referenced by closeImpl(), and execute().
DynamicParamId SegBufferWriterExecStream::readerRefCountParamId [private] |
Definition at line 70 of file SegBufferWriterExecStream.h.
Referenced by open(), prepare(), and readReaderRefCount().
PageId SegBufferWriterExecStream::firstBufferPageId [private] |
Definition at line 71 of file SegBufferWriterExecStream.h.
boost::scoped_array<FixedBuffer> SegBufferWriterExecStream::outputTupleBuffer [private] |
Definition at line 73 of file SegBufferWriterExecStream.h.
Referenced by closeImpl(), execute(), and open().
uint SegBufferWriterExecStream::outputBufSize [private] |
Definition at line 74 of file SegBufferWriterExecStream.h.
std::vector<bool> SegBufferWriterExecStream::outputWritten [private] |
Definition at line 75 of file SegBufferWriterExecStream.h.
bool SegBufferWriterExecStream::paramCreated [private] |
Definition at line 77 of file SegBufferWriterExecStream.h.
Referenced by closeImpl(), open(), prepare(), and readReaderRefCount().
std::vector<SharedExecStreamBufAccessor> DiffluenceExecStream::outAccessors [protected, inherited] |
List of output buffer accessors.
Definition at line 63 of file DiffluenceExecStream.h.
Referenced by LbmSplicerExecStream::execute(), SplitterExecStream::execute(), execute(), LbmSplicerExecStream::open(), DiffluenceExecStream::open(), LbmSplicerExecStream::prepare(), prepare(), DiffluenceExecStream::prepare(), and DiffluenceExecStream::setOutputBufAccessors().
TupleDescriptor DiffluenceExecStream::outputTupleDesc [protected, inherited] |
Output tuple descriptor.
Currently, all outputs must have the same descriptor.
Definition at line 69 of file DiffluenceExecStream.h.
SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited] |
Definition at line 51 of file SingleInputExecStream.h.
Referenced by SortedAggExecStream::compareGroupByKeys(), ExternalSortExecStreamImpl::computeFirstResult(), ExternalSortExecStreamImpl::execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), LcsClusterReplaceExecStream::getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), LcsClusterReplaceExecStream::open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().