SegBufferWriterExecStream Class Reference

SegBufferWriterExecStream reads its input stream and writes the data to a buffer so it can be read by one or more SegBufferReaderExecStreams. More...

#include <SegBufferWriterExecStream.h>

Inheritance diagram for SegBufferWriterExecStream:

DiffluenceExecStream SingleInputExecStream ExecStream ClosableObject TraceSource ErrorSource List of all members.

Public Member Functions

virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

virtual void prepare (SegBufferWriterExecStreamParams const &params)
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void closeImpl ()
 Implements ClosableObject.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Indicate to the consumer if the buffer is provided by this exec stream which is the producer.
virtual void prepare (DiffluenceExecStreamParams const &params)
virtual void prepare (SingleInputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Protected Attributes

std::vector< SharedExecStreamBufAccessoroutAccessors
 List of output buffer accessors.
TupleDescriptor outputTupleDesc
 Output tuple descriptor.
SharedExecStreamBufAccessor pInAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose

Private Member Functions

int64_t readReaderRefCount ()
 Reads and returns the current value of the reader reference count stored in a dynamic parameter.

Private Attributes

SegmentAccessor bufferSegmentAccessor
SharedSegBufferWriter pSegBufferWriter
DynamicParamId readerRefCountParamId
PageId firstBufferPageId
TupleData outputTuple
boost::scoped_array< FixedBufferoutputTupleBuffer
uint outputBufSize
std::vector< bool > outputWritten
uint nOutputsWritten
bool paramCreated

Detailed Description

SegBufferWriterExecStream reads its input stream and writes the data to a buffer so it can be read by one or more SegBufferReaderExecStreams.

It does not write the buffered data to its output stream. When this stream has completed buffering its input, it will pass the first pageId of the buffered data through its output stream to any reader streams that are ready to start processing data.

The writer and reader stream instances all share a dynamic parameter. The dynamic parameter is created by this stream and read by the reader streams. The parameter is a reference counter used to keep track of the number of active readers, which this stream uses to determine if it can destroy the buffered data.

Author:
Zelaine Fong
Version:
Id
//open/dev/fennel/exec/SegBufferWriterExecStream.h#3

Definition at line 65 of file SegBufferWriterExecStream.h.


Member Function Documentation

int64_t SegBufferWriterExecStream::readReaderRefCount (  )  [private]

Reads and returns the current value of the reader reference count stored in a dynamic parameter.

Returns:
the reader reference count

Definition at line 98 of file SegBufferWriterExecStream.cpp.

References TupleDatum::cbData, paramCreated, TupleDatum::pData, ExecStream::pDynamicParamManager, and readerRefCountParamId.

Referenced by canEarlyClose(), and closeImpl().

00099 {
00100     if (!paramCreated) {
00101         // If the stream was never opened, then the parameter will not have
00102         // been created.
00103         return 0;
00104     }
00105 
00106     int64_t refCount;
00107     TupleDatum refCountDatum;
00108     refCountDatum.pData = (PConstBuffer) &refCount;
00109     refCountDatum.cbData = 8;
00110     pDynamicParamManager->readParam(readerRefCountParamId, refCountDatum);
00111     return refCount;
00112 }

bool SegBufferWriterExecStream::canEarlyClose (  )  [virtual]

Returns:
true if the stream can be closed early

Reimplemented from ExecStream.

Definition at line 93 of file SegBufferWriterExecStream.cpp.

References readReaderRefCount().

00094 {
00095     return (readReaderRefCount() == 0);
00096 }

void SegBufferWriterExecStream::prepare ( SegBufferWriterExecStreamParams const &  params  )  [virtual]

Definition at line 34 of file SegBufferWriterExecStream.cpp.

References bufferSegmentAccessor, TupleData::compute(), firstBufferPageId, DiffluenceExecStream::outAccessors, outputBufSize, outputTuple, outputWritten, paramCreated, DiffluenceExecStream::prepare(), SegBufferWriterExecStreamParams::readerRefCountParamId, and readerRefCountParamId.

00036 {
00037     DiffluenceExecStream::prepare(params);
00038     bufferSegmentAccessor = params.scratchAccessor;
00039     readerRefCountParamId = params.readerRefCountParamId;
00040     paramCreated = false;
00041 
00042     for (uint i = 0; i < outAccessors.size(); i++) {
00043         assert(outAccessors[i]->getTupleDesc().size() == 1);
00044     }
00045     outputTuple.compute(outAccessors[0]->getTupleDesc());
00046     outputTuple[0].pData = (PConstBuffer) &firstBufferPageId;
00047     outputBufSize =
00048         outAccessors[0]->getScratchTupleAccessor().getMaxByteCount();
00049     outputWritten.resize(outAccessors.size());
00050 }

void SegBufferWriterExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual]

Reimplemented from ExecStream.

Definition at line 52 of file SegBufferWriterExecStream.cpp.

References ExecStream::getResourceRequirements(), and ExecStreamResourceQuantity::nCachePages.

00055 {
00056     DiffluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00057 
00058     // set aside 1 page for I/O
00059     minQuantity.nCachePages += 1;
00060     optQuantity = minQuantity;
00061 }

void SegBufferWriterExecStream::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from DiffluenceExecStream.

Definition at line 63 of file SegBufferWriterExecStream.cpp.

References BUFPROV_CONSUMER, firstBufferPageId, FixedBuffer, StandardTypeDescriptorFactory::newDataType(), nOutputsWritten, NULL_PAGE_ID, DiffluenceExecStream::open(), outputBufSize, outputTupleBuffer, outputWritten, paramCreated, ExecStream::pDynamicParamManager, SingleInputExecStream::pInAccessor, readerRefCountParamId, and STANDARD_TYPE_UINT_64.

00064 {
00065     assert(pInAccessor);
00066     assert(pInAccessor->getProvision() == BUFPROV_CONSUMER);
00067 
00068     std::fill(outputWritten.begin(), outputWritten.end(), false);
00069     nOutputsWritten = 0;
00070 
00071     if (!restart) {
00072         StandardTypeDescriptorFactory stdTypeFactory;
00073         TupleAttributeDescriptor attrDesc =
00074             TupleAttributeDescriptor(
00075                 stdTypeFactory.newDataType(STANDARD_TYPE_UINT_64));
00076         pDynamicParamManager->createCounterParam(readerRefCountParamId);
00077         paramCreated = true;
00078         outputTupleBuffer.reset(new FixedBuffer[outputBufSize]);
00079         firstBufferPageId = NULL_PAGE_ID;
00080     }
00081     DiffluenceExecStream::open(restart);
00082 }

ExecStreamResult SegBufferWriterExecStream::execute ( ExecStreamQuantum const &  quantum  )  [virtual]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 114 of file SegBufferWriterExecStream.cpp.

References bufferSegmentAccessor, ExecStreamGraphImpl::closeProducers(), EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_EOS, firstBufferPageId, ExecStream::getGraph(), ExecStream::getStreamId(), TupleAccessor::marshal(), SegBufferWriter::newSegBufferWriter(), nOutputsWritten, NULL_PAGE_ID, DiffluenceExecStream::outAccessors, outputBufSize, outputTuple, outputTupleBuffer, outputWritten, SingleInputExecStream::pInAccessor, and pSegBufferWriter.

00115 {
00116     if (nOutputsWritten == outAccessors.size()) {
00117         for (uint i = 0; i < outAccessors.size(); i++) {
00118             outAccessors[i]->markEOS();
00119         }
00120         return EXECRC_EOS;
00121     }
00122 
00123     // Buffer the input
00124     if (firstBufferPageId == NULL_PAGE_ID) {
00125         if (!pSegBufferWriter) {
00126             pSegBufferWriter =
00127                 SegBufferWriter::newSegBufferWriter(
00128                     pInAccessor,
00129                     bufferSegmentAccessor,
00130                     true);
00131         }
00132         ExecStreamResult rc = pSegBufferWriter->write();
00133         if (rc != EXECRC_EOS) {
00134             return rc;
00135         }
00136         // Close the upstream producers
00137         ExecStreamGraphImpl &graphImpl =
00138             dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00139         graphImpl.closeProducers(getStreamId());
00140         firstBufferPageId = pSegBufferWriter->getFirstPageId();
00141     }
00142 
00143     // Once the input has been buffered, then pass along the first buffer
00144     // pageId to only those consumers that have explicitly requested data
00145     bool newOutput = false;
00146     for (uint i = 0; i < outAccessors.size(); i++) {
00147         switch (outAccessors[i]->getState()) {
00148         case EXECBUF_NONEMPTY:
00149         case EXECBUF_OVERFLOW:
00150         case EXECBUF_EMPTY:
00151         case EXECBUF_EOS:
00152             break;
00153 
00154         case EXECBUF_UNDERFLOW:
00155             // Underflow means the consumer has explicitly requested data
00156             {
00157                 assert(!outputWritten[i]);
00158                 TupleAccessor *outputTupleAccessor =
00159                     &outAccessors[i]->getScratchTupleAccessor();
00160                 outputTupleAccessor->marshal(
00161                     outputTuple,
00162                     outputTupleBuffer.get());
00163                 outAccessors[i]->provideBufferForConsumption(
00164                     outputTupleBuffer.get(),
00165                     outputTupleBuffer.get() + outputBufSize);
00166                 outputWritten[i] = true;
00167                 nOutputsWritten++;
00168                 newOutput = true;
00169                 break;
00170             }
00171 
00172         default:
00173             permAssert(false);
00174         }
00175     }
00176 
00177     // Verify that at least one output stream was written
00178     assert(newOutput);
00179     return EXECRC_BUF_OVERFLOW;
00180 }

void SegBufferWriterExecStream::closeImpl (  )  [virtual]

Implements ClosableObject.

ExecStream implementations may override this to release any resources acquired while open.

Reimplemented from ExecStream.

Definition at line 84 of file SegBufferWriterExecStream.cpp.

References ExecStream::closeImpl(), outputTupleBuffer, paramCreated, pSegBufferWriter, and readReaderRefCount().

00085 {
00086     assert(readReaderRefCount() == 0);
00087     paramCreated = false;
00088     pSegBufferWriter.reset();
00089     outputTupleBuffer.reset();
00090     DiffluenceExecStream::closeImpl();
00091 }

ExecStreamBufProvision SegBufferWriterExecStream::getInputBufProvision (  )  const [virtual]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented from SingleInputExecStream.

Definition at line 182 of file SegBufferWriterExecStream.cpp.

References BUFPROV_CONSUMER.

00183 {
00184     return BUFPROV_CONSUMER;
00185 }

ExecStreamBufProvision SegBufferWriterExecStream::getOutputBufProvision (  )  const [virtual]

Indicate to the consumer if the buffer is provided by this exec stream which is the producer.

Reimplemented from DiffluenceExecStream.

Definition at line 187 of file SegBufferWriterExecStream.cpp.

References BUFPROV_PRODUCER.

00188 {
00189     return BUFPROV_PRODUCER;
00190 }

void DiffluenceExecStream::prepare ( DiffluenceExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 42 of file DiffluenceExecStream.cpp.

References DiffluenceExecStream::getOutputBufProvision(), DiffluenceExecStream::outAccessors, DiffluenceExecStreamParams::outputTupleDesc, DiffluenceExecStreamParams::outputTupleFormat, SingleInputExecStream::pInAccessor, and SingleInputExecStream::prepare().

Referenced by LbmSplicerExecStream::prepare(), and prepare().

00043 {
00044     SingleInputExecStream::prepare(params);
00045 
00046     // By default, shape for all outputs is the same as the input if the
00047     // outputTupleDesc wasn't explicitly set.
00048     TupleDescriptor tupleDesc;
00049     TupleFormat tupleFormat;
00050     if (params.outputTupleDesc.empty()) {
00051         tupleDesc = pInAccessor->getTupleDesc();
00052         tupleFormat = pInAccessor->getTupleFormat();
00053     } else {
00054         tupleDesc = params.outputTupleDesc;
00055         tupleFormat = params.outputTupleFormat;
00056     }
00057     for (uint i = 0; i < outAccessors.size(); ++i) {
00058         assert(outAccessors[i]->getProvision() == getOutputBufProvision());
00059         outAccessors[i]->setTupleShape(tupleDesc, tupleFormat);
00060     }
00061 }

void SingleInputExecStream::prepare ( SingleInputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 44 of file SingleInputExecStream.cpp.

References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().

Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().

00045 {
00046     ExecStream::prepare(params);
00047 
00048     assert(pInAccessor);
00049     assert(pInAccessor->getProvision() == getInputBufProvision());
00050 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void DiffluenceExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Reimplemented from SingleInputExecStream.

Definition at line 36 of file DiffluenceExecStream.cpp.

References DiffluenceExecStream::outAccessors.

00038 {
00039     outAccessors = outAccessorsInit;
00040 }

void SingleInputExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Implements ExecStream.

Reimplemented in ConduitExecStream.

Definition at line 37 of file SingleInputExecStream.cpp.

References SingleInputExecStream::pInAccessor.

Referenced by ConduitExecStream::setInputBufAccessors().

00039 {
00040     assert(inAccessors.size() == 1);
00041     pInAccessor = inAccessors[0];
00042 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by ExternalSortExecStreamImpl::execute(), execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual, inherited]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 93 of file ExecStream.cpp.

References EXEC_RESOURCE_ACCURATE.

Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), FlatFileExecStreamImpl::getResourceRequirements(), getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().

00097 {
00098     getResourceRequirements(minQuantity, optQuantity);
00099     optType = EXEC_RESOURCE_ACCURATE;
00100 }

void ExecStream::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual, inherited]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 111 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.

Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().

00113 {
00114     resourceAllocation = quantity;
00115     if (pQuotaAccessor) {
00116         pQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00117     }
00118     if (pScratchQuotaAccessor) {
00119         pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00120     }
00121 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }


Member Data Documentation

SegmentAccessor SegBufferWriterExecStream::bufferSegmentAccessor [private]

Definition at line 68 of file SegBufferWriterExecStream.h.

Referenced by execute(), and prepare().

SharedSegBufferWriter SegBufferWriterExecStream::pSegBufferWriter [private]

Definition at line 69 of file SegBufferWriterExecStream.h.

Referenced by closeImpl(), and execute().

DynamicParamId SegBufferWriterExecStream::readerRefCountParamId [private]

Definition at line 70 of file SegBufferWriterExecStream.h.

Referenced by open(), prepare(), and readReaderRefCount().

PageId SegBufferWriterExecStream::firstBufferPageId [private]

Definition at line 71 of file SegBufferWriterExecStream.h.

Referenced by execute(), open(), and prepare().

TupleData SegBufferWriterExecStream::outputTuple [private]

Definition at line 72 of file SegBufferWriterExecStream.h.

Referenced by execute(), and prepare().

boost::scoped_array<FixedBuffer> SegBufferWriterExecStream::outputTupleBuffer [private]

Definition at line 73 of file SegBufferWriterExecStream.h.

Referenced by closeImpl(), execute(), and open().

uint SegBufferWriterExecStream::outputBufSize [private]

Definition at line 74 of file SegBufferWriterExecStream.h.

Referenced by execute(), open(), and prepare().

std::vector<bool> SegBufferWriterExecStream::outputWritten [private]

Definition at line 75 of file SegBufferWriterExecStream.h.

Referenced by execute(), open(), and prepare().

uint SegBufferWriterExecStream::nOutputsWritten [private]

Definition at line 76 of file SegBufferWriterExecStream.h.

Referenced by execute(), and open().

bool SegBufferWriterExecStream::paramCreated [private]

Definition at line 77 of file SegBufferWriterExecStream.h.

Referenced by closeImpl(), open(), prepare(), and readReaderRefCount().

std::vector<SharedExecStreamBufAccessor> DiffluenceExecStream::outAccessors [protected, inherited]

List of output buffer accessors.

Definition at line 63 of file DiffluenceExecStream.h.

Referenced by LbmSplicerExecStream::execute(), SplitterExecStream::execute(), execute(), LbmSplicerExecStream::open(), DiffluenceExecStream::open(), LbmSplicerExecStream::prepare(), prepare(), DiffluenceExecStream::prepare(), and DiffluenceExecStream::setOutputBufAccessors().

TupleDescriptor DiffluenceExecStream::outputTupleDesc [protected, inherited]

Output tuple descriptor.

Currently, all outputs must have the same descriptor.

Definition at line 69 of file DiffluenceExecStream.h.

SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited]

Definition at line 51 of file SingleInputExecStream.h.

Referenced by SortedAggExecStream::compareGroupByKeys(), ExternalSortExecStreamImpl::computeFirstResult(), ExternalSortExecStreamImpl::execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), LcsClusterReplaceExecStream::getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), LcsClusterReplaceExecStream::open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), ExternalSortExecStreamImpl::prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:45 2009 for Fennel by  doxygen 1.5.1