ExternalSortExecStreamImpl Class Reference

ExternalSortExecStreamImpl implements the ExternalSortExecStream interface. More...

#include <ExternalSortExecStreamImpl.h>

Inheritance diagram for ExternalSortExecStreamImpl:

ExternalSortExecStream ConduitExecStream SingleInputExecStream SingleOutputExecStream ExecStream ExecStream ClosableObject TraceSource ErrorSource ClosableObject TraceSource ErrorSource List of all members.

Public Member Functions

 ExternalSortExecStreamImpl ()
virtual void prepare (ExternalSortExecStreamParams const &params)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void prepare (ConduitExecStreamParams const &params)
virtual void prepare (SingleInputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void prepare (SingleOutputExecStreamParams const &params)
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Static Public Member Functions

static ExternalSortExecStreamnewExternalSortExecStream ()
 Factory method.

Protected Member Functions

ExecStreamResult precheckConduitBuffers ()
 Checks the state of the input and output buffers.

Protected Attributes

SharedExecStreamBufAccessor pInAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose
SharedExecStreamBufAccessor pOutAccessor

Private Member Functions

void computeFirstResult ()
 Performs enough sorting to be able to start returning results (non-parallel version).
void computeFirstResultParallel ()
 Performs enough sorting to be able to start returning results (parallel version).
void sortRun (ExternalSortRunLoader &runLoader)
 Sorts one run in memory.
void storeRun (ExternalSortSubStream &subStream)
 Stores one run.
void mergeFirstResult ()
 Performs enough merging to be able to start returning results.
void optimizeRunOrder ()
 Adjusts run order for optimal merging.
void deleteStoredRunInfo (uint iFirstRun, uint nRuns)
 Deletes information on runs after they have been merged.
ExternalSortRunLoaderreserveRunLoader ()
 Reserves one run loader for use during parallel sort, blocking on runLoaderAvailable until one becomes unavailable.
void unreserveRunLoader (ExternalSortRunLoader &runLoader)
 Unreserves one run loader after its contents have been stored, making it available to other threads.
void releaseResources ()
 Releases resources associated with this stream.
virtual void closeImpl ()
 Implements ClosableObject.

Private Attributes

SharedSegment pTempSegment
 Segment to use for storing runs externally.
ExternalSortInfo sortInfo
 Global information shared with subcomponents.
uint nParallel
 Maximum number of parallel threads to use.
boost::scoped_array< SharedExternalSortRunLoaderrunLoaders
 Array of helpers used to load, quicksort, and store runs.
ThreadPool< ExternalSortTaskthreadPool
 Thread pool used during parallel sort.
LocalCondition runLoaderAvailable
 Condition variable used to signal availability of entries in runLoaders array.
StrictMutex runLoaderMutex
 Synchronization for availability status in runLoaders array.
boost::scoped_ptr< ExternalSortRunAccessorpFinalRunAccessor
 Helper used to read final run when stored externally.
boost::scoped_ptr< ExternalSortMergerpMerger
 Helper used to merge runs.
boost::scoped_ptr< ExternalSortOutputpOutputWriter
 Helper used to write XO output.
StrictMutex storedRunMutex
 Synchronization for storedRuns.
std::vector< SharedSegStreamAllocationstoredRuns
 Information on runs stored externally.
bool resultsReady
 Whether the XO is ready to start writing results.
bool storeFinalRun
 Whether to materialize one big final run, or return results directly from last merge stage.
int estimatedNumRows
 Estimate of the number of rows in the sort input.
bool earlyClose
 If true, close producers once all input has been read.

Friends

class ExternalSortTask

Detailed Description

ExternalSortExecStreamImpl implements the ExternalSortExecStream interface.

Author:
John V. Sichi
Version:
Id
//open/dev/fennel/sorter/ExternalSortExecStreamImpl.h#2

Definition at line 52 of file ExternalSortExecStreamImpl.h.


Constructor & Destructor Documentation

ExternalSortExecStreamImpl::ExternalSortExecStreamImpl (  )  [explicit]

Definition at line 64 of file ExternalSortExecStreamImpl.cpp.

00065     : sortInfo(*this)
00066 {
00067 }


Member Function Documentation

void ExternalSortExecStreamImpl::computeFirstResult (  )  [private]

Performs enough sorting to be able to start returning results (non-parallel version).

Definition at line 255 of file ExternalSortExecStreamImpl.cpp.

References EXTSORT_OVERFLOW, ExternalSortRunLoader::isStarted(), ExternalSortRunLoader::loadRun(), SingleInputExecStream::pInAccessor, runLoaders, sortRun(), ExternalSortRunLoader::startRun(), and storeRun().

Referenced by execute().

00256 {
00257     ExternalSortRunLoader &runLoader = *(runLoaders[0]);
00258     for (;;) {
00259         if (!runLoader.isStarted()) {
00260             runLoader.startRun();
00261         }
00262         ExternalSortRC rc = runLoader.loadRun(*pInAccessor);
00263         if (rc == EXTSORT_OVERFLOW) {
00264             sortRun(runLoader);
00265             storeRun(runLoader);
00266         } else {
00267             return;
00268         }
00269     }
00270 }

void ExternalSortExecStreamImpl::computeFirstResultParallel (  )  [private]

Performs enough sorting to be able to start returning results (parallel version).

Definition at line 374 of file ExternalSortExecStreamImpl.cpp.

References EXTSORT_ENDOFDATA, mergeFirstResult(), nParallel, reserveRunLoader(), resultsReady, ThreadPoolBase::start(), ExternalSortRunLoader::startRun(), ThreadPoolBase::stop(), ThreadPool< Task >::submitTask(), threadPool, and unreserveRunLoader().

Referenced by execute().

00375 {
00376     // FIXME jvs 19-June-2004:  ThreadPool needs to propagate excns!
00377 
00378     assert(nParallel > 1);
00379 
00380     // minus one because the main dispatcher thread runs in parallel with the
00381     // pooled threads
00382     threadPool.start(nParallel - 1);
00383     try {
00384         for (;;) {
00385             ExternalSortRunLoader &runLoader = reserveRunLoader();
00386             runLoader.startRun();
00387             // FIXME
00388 #if 0
00389             ExternalSortRC rc = runLoader.loadRun(*pInputStream);
00390 #else
00391             ExternalSortRC rc = EXTSORT_ENDOFDATA;
00392 #endif
00393             if (rc == EXTSORT_ENDOFDATA) {
00394                 // the entire input has been processed, so we're ready
00395                 // for merge
00396                 unreserveRunLoader(runLoader);
00397                 break;
00398             }
00399             // otherwise, schedule a new sort task
00400             // FIXME
00401 #if 0
00402             ExternalSortTask task(*this,runLoader);
00403             threadPool.submitTask(task);
00404 #endif
00405         }
00406     } catch (...) {
00407         // REVEW jvs 19-June-2004:  signal a request to expedite cleanup?
00408 
00409         // wait for all tasks to clean up
00410         threadPool.stop();
00411         throw;
00412     }
00413 
00414     // wait for all tasks to complete before beginning merge
00415     threadPool.stop();
00416 
00417     mergeFirstResult();
00418     resultsReady = true;
00419 }

void ExternalSortExecStreamImpl::sortRun ( ExternalSortRunLoader runLoader  )  [private]

Sorts one run in memory.

Parameters:
runLoader loaded run to sort

Definition at line 428 of file ExternalSortExecStreamImpl.cpp.

References ExternalSortRunLoader::getLoadedTupleCount(), ExternalSortRunLoader::sort(), and TRACE_FINE.

Referenced by computeFirstResult(), ExternalSortTask::execute(), and execute().

00429 {
00430     FENNEL_TRACE(
00431         TRACE_FINE,
00432         "sorting run with tuple count = "
00433         << runLoader.getLoadedTupleCount());
00434     runLoader.sort();
00435 }

void ExternalSortExecStreamImpl::storeRun ( ExternalSortSubStream subStream  )  [private]

Stores one run.

Parameters:
subStream substream whose contents are to be fetched and stored as a run

Definition at line 272 of file ExternalSortExecStreamImpl.cpp.

References sortInfo, storedRunMutex, storedRuns, and TRACE_FINE.

Referenced by computeFirstResult(), ExternalSortTask::execute(), execute(), and mergeFirstResult().

00273 {
00274     FENNEL_TRACE(
00275         TRACE_FINE,
00276         "storing run " << storedRuns.size());
00277 
00278     boost::scoped_ptr<ExternalSortRunAccessor> pRunAccessor;
00279     pRunAccessor.reset(new ExternalSortRunAccessor(sortInfo));
00280     pRunAccessor->storeRun(subStream);
00281 
00282     StrictMutexGuard mutexGuard(storedRunMutex);
00283     storedRuns.push_back(pRunAccessor->getStoredRun());
00284 }

void ExternalSortExecStreamImpl::mergeFirstResult (  )  [private]

Performs enough merging to be able to start returning results.

Definition at line 286 of file ExternalSortExecStreamImpl.cpp.

References deleteStoredRunInfo(), nParallel, ExternalSortInfo::nSortMemPages, optimizeRunOrder(), pFinalRunAccessor, pMerger, pOutputWriter, releaseResources(), runLoaders, sortInfo, storedRuns, storeFinalRun, storeRun(), and TRACE_FINE.

Referenced by computeFirstResultParallel(), and execute().

00287 {
00288     if (storedRuns.size()) {
00289         for (uint i = 0; i < nParallel; i++) {
00290             runLoaders[i]->releaseResources();
00291         }
00292 
00293         if (!pMerger) {
00294             pMerger.reset(new ExternalSortMerger(sortInfo));
00295             pMerger->initRunAccess();
00296         }
00297 
00298         uint iFirstRun = storedRuns.size() - 1;
00299         while (iFirstRun > 0) {
00300             uint nRunsToMerge;
00301 
00302             // REVIEW jvs 13-June-2004:  I had to change this to account for
00303             // the output buffer needed during merge.  Not sure why it worked
00304             // in BB?
00305             uint nMergePages = sortInfo.nSortMemPages - 1;
00306             if (storedRuns.size() <= nMergePages) {
00307                 nRunsToMerge = storedRuns.size();
00308             } else {
00309                 nRunsToMerge = std::min<uint>(
00310                     storedRuns.size() - nMergePages + 1,
00311                     nMergePages);
00312             }
00313 
00314             optimizeRunOrder();
00315             iFirstRun = storedRuns.size() - nRunsToMerge;
00316 
00317             FENNEL_TRACE(
00318                 TRACE_FINE,
00319                 "merging from run " << iFirstRun
00320                 << " with run count = " << nRunsToMerge);
00321 
00322             pMerger->startMerge(
00323                 storedRuns.begin() + iFirstRun, nRunsToMerge);
00324             if ((iFirstRun > 0) || storeFinalRun) {
00325                 storeRun(*pMerger);
00326                 deleteStoredRunInfo(iFirstRun,nRunsToMerge);
00327             }
00328         }
00329 
00330         if (storedRuns.size() == 1) {
00331             if (!pFinalRunAccessor) {
00332                 pFinalRunAccessor.reset(new ExternalSortRunAccessor(sortInfo));
00333             }
00334 
00335             FENNEL_TRACE(
00336                 TRACE_FINE,
00337                 "fetching from final run");
00338 
00339             pFinalRunAccessor->initRead();
00340             pFinalRunAccessor->startRead(storedRuns[0]);
00341             pMerger->releaseResources();
00342             pOutputWriter->setSubStream(*pFinalRunAccessor);
00343         } else {
00344             FENNEL_TRACE(
00345                 TRACE_FINE,
00346                 "fetching from final merge with run count = "
00347                 << storedRuns.size());
00348 
00349             pOutputWriter->setSubStream(*pMerger);
00350         }
00351     }
00352 }

void ExternalSortExecStreamImpl::optimizeRunOrder (  )  [private]

Adjusts run order for optimal merging.

Definition at line 354 of file ExternalSortExecStreamImpl.cpp.

References storedRuns.

Referenced by mergeFirstResult().

00355 {
00356     uint i = storedRuns.size() - 1;
00357     while ((i > 0)
00358            && (storedRuns[i]->getWrittenPageCount()
00359                > storedRuns[i - 1]->getWrittenPageCount()))
00360     {
00361         std::swap(storedRuns[i],storedRuns[i - 1]);
00362         i--;
00363     }
00364 }

void ExternalSortExecStreamImpl::deleteStoredRunInfo ( uint  iFirstRun,
uint  nRuns 
) [private]

Deletes information on runs after they have been merged.

Parameters:
iFirstRun 0-based deletion start position in storedRuns
nRuns number of storedRuns entries to delete

Definition at line 366 of file ExternalSortExecStreamImpl.cpp.

References storedRunMutex, and storedRuns.

Referenced by mergeFirstResult().

00367 {
00368     StrictMutexGuard mutexGuard(storedRunMutex);
00369     storedRuns.erase(
00370         storedRuns.begin() + iFirstRun,
00371         storedRuns.begin() + iFirstRun + nRuns);
00372 }

ExternalSortRunLoader & ExternalSortExecStreamImpl::reserveRunLoader (  )  [private]

Reserves one run loader for use during parallel sort, blocking on runLoaderAvailable until one becomes unavailable.

Returns:
reserved loader

Definition at line 437 of file ExternalSortExecStreamImpl.cpp.

References nParallel, runLoaderAvailable, runLoaderMutex, and runLoaders.

Referenced by computeFirstResultParallel().

00438 {
00439     StrictMutexGuard mutexGuard(runLoaderMutex);
00440     for (;;) {
00441         for (uint i = 0; i < nParallel; ++i) {
00442             ExternalSortRunLoader &runLoader = *(runLoaders[i]);
00443             if (!runLoader.runningParallelTask) {
00444                 runLoader.runningParallelTask = true;
00445                 return runLoader;
00446             }
00447         }
00448         runLoaderAvailable.wait(mutexGuard);
00449     }
00450 }

void ExternalSortExecStreamImpl::unreserveRunLoader ( ExternalSortRunLoader runLoader  )  [private]

Unreserves one run loader after its contents have been stored, making it available to other threads.

Parameters:
runLoader loader to unreserve

Definition at line 452 of file ExternalSortExecStreamImpl.cpp.

References runLoaderAvailable, runLoaderMutex, and ExternalSortRunLoader::runningParallelTask.

Referenced by computeFirstResultParallel(), and ExternalSortTask::execute().

00454 {
00455     StrictMutexGuard mutexGuard(runLoaderMutex);
00456     runLoader.runningParallelTask = false;
00457     runLoaderAvailable.notify_all();
00458 }

void ExternalSortExecStreamImpl::releaseResources (  )  [private]

Releases resources associated with this stream.

Definition at line 242 of file ExternalSortExecStreamImpl.cpp.

References pFinalRunAccessor, pMerger, pOutputWriter, runLoaders, and storedRuns.

Referenced by closeImpl(), mergeFirstResult(), and open().

00243 {
00244     if (pFinalRunAccessor) {
00245         pFinalRunAccessor->releaseResources();
00246     }
00247 
00248     runLoaders.reset();
00249     pMerger.reset();
00250     pOutputWriter.reset();
00251     pFinalRunAccessor.reset();
00252     storedRuns.clear();
00253 }

void ExternalSortExecStreamImpl::closeImpl (  )  [private, virtual]

Implements ClosableObject.

ExecStream implementations may override this to release any resources acquired while open.

Reimplemented from ExecStream.

Definition at line 236 of file ExternalSortExecStreamImpl.cpp.

References ExecStream::closeImpl(), and releaseResources().

00237 {
00238     releaseResources();
00239     ConduitExecStream::closeImpl();
00240 }

void ExternalSortExecStreamImpl::prepare ( ExternalSortExecStreamParams const &  params  )  [virtual]

Implements ExternalSortExecStream.

Definition at line 69 of file ExternalSortExecStreamImpl.cpp.

References ExternalSortInfo::cbPage, ExternalSortExecStreamParams::descendingKeyColumns, ExternalSortInfo::descendingKeyColumns, ExternalSortExecStreamParams::distinctness, DUP_ALLOW, DUP_DISCARD, DUP_FAIL, ExternalSortExecStreamParams::earlyClose, earlyClose, ExternalSortExecStreamParams::estimatedNumRows, estimatedNumRows, ExternalSortInfo::externalSegmentAccessor, ExternalSortInfo::keyDesc, ExternalSortExecStreamParams::keyProj, ExternalSortInfo::keyProj, ExternalSortInfo::memSegmentAccessor, nParallel, ExternalSortInfo::nSortMemPages, SegmentAccessor::pCacheAccessor, SingleInputExecStream::pInAccessor, ConduitExecStream::prepare(), TupleDescriptor::projectFrom(), SegmentAccessor::pSegment, ExternalSortExecStreamParams::pTempSegment, pTempSegment, resultsReady, sortInfo, ExternalSortExecStreamParams::storeFinalRun, storeFinalRun, and ExternalSortInfo::tupleDesc.

00071 {
00072     ConduitExecStream::prepare(params);
00073 
00074     pTempSegment = params.pTempSegment;
00075     resultsReady = false;
00076     nParallel = 1;
00077     storeFinalRun = params.storeFinalRun;
00078     estimatedNumRows = params.estimatedNumRows;
00079     earlyClose = params.earlyClose;
00080 
00081     switch (params.distinctness) {
00082     case DUP_ALLOW:
00083         break;
00084     case DUP_DISCARD:
00085         // TODO
00086         permAssert(false);
00087     case DUP_FAIL:
00088         // TODO
00089         permAssert(false);
00090     }
00091 
00092     TupleDescriptor const &srcRecDef = pInAccessor->getTupleDesc();
00093     sortInfo.keyProj = params.keyProj;
00094     assert(params.outputTupleDesc == srcRecDef);
00095     sortInfo.tupleDesc = srcRecDef;
00096     sortInfo.keyDesc.projectFrom(sortInfo.tupleDesc,params.keyProj);
00097     sortInfo.descendingKeyColumns = params.descendingKeyColumns;
00098     if (sortInfo.descendingKeyColumns.empty()) {
00099         // default is all ascending
00100         sortInfo.descendingKeyColumns.resize(sortInfo.keyProj.size(), false);
00101     }
00102     sortInfo.cbPage = params.pTempSegment->getFullPageSize();
00103     sortInfo.memSegmentAccessor = params.scratchAccessor;
00104     sortInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00105     sortInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00106     sortInfo.nSortMemPages = 0;
00107 }

void ExternalSortExecStreamImpl::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from ConduitExecStream.

Definition at line 160 of file ExternalSortExecStreamImpl.cpp.

References nParallel, ExternalSortInfo::nSortMemPages, ExternalSortInfo::nSortMemPagesPerRun, ConduitExecStream::open(), pOutputWriter, releaseResources(), resultsReady, runLoaders, and sortInfo.

00161 {
00162     if (restart) {
00163         releaseResources();
00164     }
00165 
00166     ConduitExecStream::open(restart);
00167 
00168     // divvy up available memory by degree of parallelism
00169     sortInfo.nSortMemPagesPerRun = (sortInfo.nSortMemPages / nParallel);
00170 
00171     // subtract off one page per run for I/O buffering
00172     assert(sortInfo.nSortMemPagesPerRun > 0);
00173     sortInfo.nSortMemPagesPerRun--;
00174 
00175     // need at least two non-I/O pages per run: one for keys and one for data
00176     assert(sortInfo.nSortMemPagesPerRun > 1);
00177 
00178     runLoaders.reset(new SharedExternalSortRunLoader[nParallel]);
00179     for (uint i = 0; i < nParallel; ++i) {
00180         runLoaders[i].reset(new ExternalSortRunLoader(sortInfo));
00181     }
00182 
00183     pOutputWriter.reset(new ExternalSortOutput(sortInfo));
00184 
00185     for (uint i = 0; i < nParallel; ++i) {
00186         runLoaders[i]->startRun();
00187     }
00188 
00189     // default to local sort as output obj
00190     pOutputWriter->setSubStream(*(runLoaders[0]));
00191 
00192     resultsReady = false;
00193 }

ExecStreamResult ExternalSortExecStreamImpl::execute ( ExecStreamQuantum const &  quantum  )  [virtual]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 195 of file ExternalSortExecStreamImpl.cpp.

References ExecStreamGraphImpl::closeProducers(), computeFirstResult(), computeFirstResultParallel(), earlyClose, EXECBUF_EOS, EXECRC_BUF_UNDERFLOW, EXECRC_YIELD, ExecStream::getGraph(), ExecStream::getStreamId(), ExternalSortRunLoader::isStarted(), mergeFirstResult(), nParallel, SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, pOutputWriter, ConduitExecStream::precheckConduitBuffers(), resultsReady, runLoaders, sortRun(), storedRuns, storeFinalRun, and storeRun().

00197 {
00198     if (!resultsReady) {
00199         if (pInAccessor->getState() != EXECBUF_EOS) {
00200             ExecStreamResult rc = precheckConduitBuffers();
00201             if (rc != EXECRC_YIELD) {
00202                 return rc;
00203             }
00204             if (nParallel > 1) {
00205                 // FIXME
00206                 computeFirstResultParallel();
00207             } else {
00208                 computeFirstResult();
00209                 return EXECRC_BUF_UNDERFLOW;
00210             }
00211         } else {
00212             ExternalSortRunLoader &runLoader = *(runLoaders[0]);
00213             if (runLoader.isStarted()) {
00214                 sortRun(runLoader);
00215                 if (storedRuns.size() || storeFinalRun) {
00216                     // store last run
00217                     storeRun(runLoader);
00218                 }
00219             }
00220             mergeFirstResult();
00221 
00222             // close the producers now that we've read all input
00223             if (earlyClose) {
00224                 ExecStreamGraphImpl &graphImpl =
00225                     dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00226                 graphImpl.closeProducers(getStreamId());
00227             }
00228 
00229             resultsReady = true;
00230         }
00231     }
00232 
00233     return pOutputWriter->fetch(*pOutAccessor);
00234 }

void ExternalSortExecStreamImpl::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented from ExecStream.

Definition at line 109 of file ExternalSortExecStreamImpl.cpp.

References estimatedNumRows, EXEC_RESOURCE_ESTIMATE, EXEC_RESOURCE_UNBOUNDED, ExecStream::getResourceRequirements(), isMAXU(), max(), MAXU, ExternalSortInfo::memSegmentAccessor, ExecStreamResourceQuantity::nCachePages, SingleOutputExecStream::pOutAccessor, SegmentAccessor::pSegment, and sortInfo.

00113 {
00114     ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00115 
00116     // REVIEW
00117     uint minPages = 3;
00118     minQuantity.nCachePages += minPages;
00119 
00120     // if no estimated row count is available, request an unbounded amount
00121     // from the resource governor; otherwise, estimate the number of pages
00122     // for an in-memory sort
00123     if (isMAXU(estimatedNumRows)) {
00124         optType = EXEC_RESOURCE_UNBOUNDED;
00125     } else {
00126         // use the average of the min and max rowsizes
00127         // TODO - use stats to come up with a more accurate average
00128         RecordNum nPages =
00129             estimatedNumRows *
00130             ((pOutAccessor->getScratchTupleAccessor().getMaxByteCount() +
00131             pOutAccessor->getScratchTupleAccessor().getMinByteCount()) / 2) /
00132             sortInfo.memSegmentAccessor.pSegment->getUsablePageSize();
00133         uint numPages;
00134         if (nPages >= uint(MAXU)) {
00135             numPages = uint(MAXU) - 1;
00136         } else {
00137             numPages = uint(nPages);
00138         }
00139         // make sure the opt is bigger than the min; otherwise, the
00140         // resource governor won't try to give it extra
00141         optQuantity.nCachePages += std::max(minPages + 1, numPages);
00142         optType = EXEC_RESOURCE_ESTIMATE;
00143     }
00144 }

void ExternalSortExecStreamImpl::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented from ExecStream.

Definition at line 146 of file ExternalSortExecStreamImpl.cpp.

References ExecStreamResourceQuantity::nCachePages, nParallel, ExternalSortInfo::nSortMemPages, ExecStreamResourceQuantity::nThreads, ExecStream::setResourceAllocation(), and sortInfo.

00148 {
00149     // REVIEW
00150     ConduitExecStream::setResourceAllocation(quantity);
00151     sortInfo.nSortMemPages = quantity.nCachePages;
00152     nParallel = quantity.nThreads + 1;
00153 
00154     // NOTE jvs 10-Nov-2004:  parallel sort is currently disabled
00155     // as an effect of the scheduler-revamp.  We may resurrect it, or
00156     // we may decide to handle parallelism up at the scheduler level.
00157     assert(nParallel == 1);
00158 }

ExternalSortExecStream * ExternalSortExecStream::newExternalSortExecStream (  )  [static, inherited]

Factory method.

Returns:
new ExternalSortExecStream instance

Definition at line 33 of file ExternalSortExecStreamImpl.cpp.

Referenced by LbmExecStreamTestBase::initSorterExecStream(), LbmSearchTest::loadTableAndIndex(), LhxAggExecStreamTest::testGroupCountImpl(), LhxJoinExecStreamTest::testImpl(), ExternalSortExecStreamTest::testImpl(), LbmLoadBitmapTest::testLoad(), LhxAggExecStreamTest::testSingleValueImpl(), and ExecStreamFactory::visit().

00034 {
00035     return new ExternalSortExecStreamImpl();
00036 }

void ConduitExecStream::prepare ( ConduitExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 42 of file ConduitExecStream.cpp.

References SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

Referenced by prepare(), LcsClusterAppendExecStream::prepare(), LbmNormalizerExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SegBufferReaderExecStream::prepare(), SegBufferExecStream::prepare(), ScratchBufferExecStream::prepare(), ReshapeExecStream::prepare(), DoubleBufferExecStream::prepare(), CopyExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), and CalcExecStream::prepare().

00043 {
00044     SingleInputExecStream::prepare(params);
00045 
00046     if (params.outputTupleDesc.empty()) {
00047         pOutAccessor->setTupleShape(
00048             pInAccessor->getTupleDesc(),
00049             pInAccessor->getTupleFormat());
00050     }
00051 
00052     SingleOutputExecStream::prepare(params);
00053 }

void SingleInputExecStream::prepare ( SingleInputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 44 of file SingleInputExecStream.cpp.

References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().

Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().

00045 {
00046     ExecStream::prepare(params);
00047 
00048     assert(pInAccessor);
00049     assert(pInAccessor->getProvision() == getInputBufProvision());
00050 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void SingleOutputExecStream::prepare ( SingleOutputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 48 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().

Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().

00049 {
00050     ExecStream::prepare(params);
00051     assert(pOutAccessor);
00052     assert(pOutAccessor->getProvision() == getOutputBufProvision());
00053     if (pOutAccessor->getTupleDesc().empty()) {
00054         assert(!params.outputTupleDesc.empty());
00055         pOutAccessor->setTupleShape(
00056             params.outputTupleDesc,
00057             params.outputTupleFormat);
00058     }
00059 }

ExecStreamResult ConduitExecStream::precheckConduitBuffers (  )  [protected, inherited]

Checks the state of the input and output buffers.

If input empty, requests production. If input EOS, propagates that to output buffer. If output full, returns EXECRC_OVERFLOW.

Returns:
result of precheck; anything but EXECRC_YIELD indicates that execution should terminate immediately with returned code

Definition at line 61 of file ConduitExecStream.cpp.

References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, SingleInputExecStream::pInAccessor, and SingleOutputExecStream::pOutAccessor.

Referenced by execute(), FtrsTableWriterExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreeInsertExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), ReshapeExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), and CalcExecStream::execute().

00062 {
00063     switch (pInAccessor->getState()) {
00064     case EXECBUF_EMPTY:
00065         pInAccessor->requestProduction();
00066         return EXECRC_BUF_UNDERFLOW;
00067     case EXECBUF_UNDERFLOW:
00068         return EXECRC_BUF_UNDERFLOW;
00069     case EXECBUF_EOS:
00070         pOutAccessor->markEOS();
00071         return EXECRC_EOS;
00072     case EXECBUF_NONEMPTY:
00073     case EXECBUF_OVERFLOW:
00074         break;
00075     default:
00076         permAssert(false);
00077     }
00078     if (pOutAccessor->getState() == EXECBUF_OVERFLOW) {
00079         return EXECRC_BUF_OVERFLOW;
00080     }
00081     return EXECRC_YIELD;
00082 }

void ConduitExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Reimplemented from SingleInputExecStream.

Definition at line 36 of file ConduitExecStream.cpp.

References SingleOutputExecStream::setOutputBufAccessors().

00038 {
00039     SingleOutputExecStream::setOutputBufAccessors(outAccessors);
00040 }

void ConduitExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Reimplemented from SingleInputExecStream.

Definition at line 30 of file ConduitExecStream.cpp.

References SingleInputExecStream::setInputBufAccessors().

00032 {
00033     SingleInputExecStream::setInputBufAccessors(inAccessors);
00034 }

ExecStreamBufProvision SingleInputExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, and SegBufferWriterExecStream.

Definition at line 62 of file SingleInputExecStream.cpp.

References BUFPROV_PRODUCER.

Referenced by SingleInputExecStream::prepare().

00063 {
00064     return BUFPROV_PRODUCER;
00065 }

bool ExecStream::canEarlyClose (  )  [virtual, inherited]

Returns:
true if the stream can be closed early

Reimplemented in SegBufferWriterExecStream.

Definition at line 49 of file ExecStream.cpp.

00050 {
00051     return true;
00052 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual, inherited]

Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, FlatFileExecStreamImpl, BTreeInsertExecStream, BTreeReadExecStream, FtrsTableWriterExecStream, LbmChopperExecStream, LbmSplicerExecStream, LcsClusterAppendExecStream, LcsClusterReplaceExecStream, LcsRowScanBaseExecStream, and LcsRowScanExecStream.

Definition at line 102 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, and ExecStreamResourceQuantity::nThreads.

00105 {
00106     minQuantity.nThreads = 0;
00107     minQuantity.nCachePages = 0;
00108     optQuantity = minQuantity;
00109 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented in BarrierExecStream, DiffluenceExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, SingleOutputExecStream, SplitterExecStream, ValuesExecStream, JavaTransformExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.

Definition at line 172 of file ExecStream.cpp.

References BUFPROV_NONE.

00173 {
00174     return BUFPROV_NONE;
00175 }

ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.

Definition at line 69 of file SingleOutputExecStream.cpp.

References BUFPROV_CONSUMER.

Referenced by SingleOutputExecStream::prepare().

00070 {
00071     return BUFPROV_CONSUMER;
00072 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }


Friends And Related Function Documentation

friend class ExternalSortTask [friend]

Definition at line 55 of file ExternalSortExecStreamImpl.h.


Member Data Documentation

SharedSegment ExternalSortExecStreamImpl::pTempSegment [private]

Segment to use for storing runs externally.

Definition at line 60 of file ExternalSortExecStreamImpl.h.

Referenced by prepare().

ExternalSortInfo ExternalSortExecStreamImpl::sortInfo [private]

Global information shared with subcomponents.

Definition at line 65 of file ExternalSortExecStreamImpl.h.

Referenced by getResourceRequirements(), mergeFirstResult(), open(), prepare(), setResourceAllocation(), and storeRun().

uint ExternalSortExecStreamImpl::nParallel [private]

Maximum number of parallel threads to use.

Definition at line 70 of file ExternalSortExecStreamImpl.h.

Referenced by computeFirstResultParallel(), execute(), mergeFirstResult(), open(), prepare(), reserveRunLoader(), and setResourceAllocation().

boost::scoped_array<SharedExternalSortRunLoader> ExternalSortExecStreamImpl::runLoaders [private]

Array of helpers used to load, quicksort, and store runs.

This array has size equal to nParallel. For non-parallel sort (nParallel=1), this means there's just one entry. For parallel sort, this array acts as an availability queue governed by runLoaderAvailable, runLoaderMutex, and the ExternalSortRunLoader::runningParallelTask flag.

Definition at line 79 of file ExternalSortExecStreamImpl.h.

Referenced by computeFirstResult(), execute(), mergeFirstResult(), open(), releaseResources(), and reserveRunLoader().

ThreadPool<ExternalSortTask> ExternalSortExecStreamImpl::threadPool [private]

Thread pool used during parallel sort.

Definition at line 84 of file ExternalSortExecStreamImpl.h.

Referenced by computeFirstResultParallel().

LocalCondition ExternalSortExecStreamImpl::runLoaderAvailable [private]

Condition variable used to signal availability of entries in runLoaders array.

Definition at line 90 of file ExternalSortExecStreamImpl.h.

Referenced by reserveRunLoader(), and unreserveRunLoader().

StrictMutex ExternalSortExecStreamImpl::runLoaderMutex [private]

Synchronization for availability status in runLoaders array.

Definition at line 95 of file ExternalSortExecStreamImpl.h.

Referenced by reserveRunLoader(), and unreserveRunLoader().

boost::scoped_ptr<ExternalSortRunAccessor> ExternalSortExecStreamImpl::pFinalRunAccessor [private]

Helper used to read final run when stored externally.

Definition at line 100 of file ExternalSortExecStreamImpl.h.

Referenced by mergeFirstResult(), and releaseResources().

boost::scoped_ptr<ExternalSortMerger> ExternalSortExecStreamImpl::pMerger [private]

Helper used to merge runs.

Definition at line 105 of file ExternalSortExecStreamImpl.h.

Referenced by mergeFirstResult(), and releaseResources().

boost::scoped_ptr<ExternalSortOutput> ExternalSortExecStreamImpl::pOutputWriter [private]

Helper used to write XO output.

Definition at line 110 of file ExternalSortExecStreamImpl.h.

Referenced by execute(), mergeFirstResult(), open(), and releaseResources().

StrictMutex ExternalSortExecStreamImpl::storedRunMutex [private]

Synchronization for storedRuns.

Definition at line 115 of file ExternalSortExecStreamImpl.h.

Referenced by deleteStoredRunInfo(), and storeRun().

std::vector<SharedSegStreamAllocation> ExternalSortExecStreamImpl::storedRuns [private]

Information on runs stored externally.

Definition at line 120 of file ExternalSortExecStreamImpl.h.

Referenced by deleteStoredRunInfo(), execute(), mergeFirstResult(), optimizeRunOrder(), releaseResources(), and storeRun().

bool ExternalSortExecStreamImpl::resultsReady [private]

Whether the XO is ready to start writing results.

Definition at line 125 of file ExternalSortExecStreamImpl.h.

Referenced by computeFirstResultParallel(), execute(), open(), and prepare().

bool ExternalSortExecStreamImpl::storeFinalRun [private]

Whether to materialize one big final run, or return results directly from last merge stage.

Definition at line 131 of file ExternalSortExecStreamImpl.h.

Referenced by execute(), mergeFirstResult(), and prepare().

int ExternalSortExecStreamImpl::estimatedNumRows [private]

Estimate of the number of rows in the sort input.

If < 0, no stats were available to estimate this value.

Definition at line 137 of file ExternalSortExecStreamImpl.h.

Referenced by getResourceRequirements(), and prepare().

bool ExternalSortExecStreamImpl::earlyClose [private]

If true, close producers once all input has been read.

Definition at line 142 of file ExternalSortExecStreamImpl.h.

Referenced by execute(), and prepare().

SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited]

Definition at line 51 of file SingleInputExecStream.h.

Referenced by SortedAggExecStream::compareGroupByKeys(), computeFirstResult(), execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), LcsClusterReplaceExecStream::getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), LcsClusterReplaceExecStream::open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), SegBufferWriterExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().

SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited]

Definition at line 56 of file SingleOutputExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:31 2009 for Fennel by  doxygen 1.5.1