#include <ExternalSortExecStreamImpl.h>
Inheritance diagram for ExternalSortExecStreamImpl:
Public Member Functions | |
ExternalSortExecStreamImpl () | |
virtual void | prepare (ExternalSortExecStreamParams const ¶ms) |
virtual void | open (bool restart) |
Opens this stream, acquiring any resources needed in order to be able to fetch data. | |
virtual ExecStreamResult | execute (ExecStreamQuantum const &quantum) |
Executes this stream. | |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType) |
Determines resource requirements for this stream. | |
virtual void | setResourceAllocation (ExecStreamResourceQuantity &quantity) |
Sets current resource allocation for this stream. | |
virtual void | prepare (ConduitExecStreamParams const ¶ms) |
virtual void | prepare (SingleInputExecStreamParams const ¶ms) |
virtual void | prepare (ExecStreamParams const ¶ms) |
Prepares this stream for execution. | |
virtual void | prepare (SingleOutputExecStreamParams const ¶ms) |
virtual void | setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors) |
Initializes the buffer accessors for outputs from this stream. | |
virtual void | setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors) |
Initializes the buffer accessors for inputs to this stream. | |
virtual ExecStreamBufProvision | getInputBufProvision () const |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples. | |
virtual bool | canEarlyClose () |
| |
ExecStreamGraph & | getGraph () const |
| |
ExecStreamId | getStreamId () const |
| |
virtual void | getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity) |
virtual void | setName (std::string const &) |
Sets unique name of this stream. | |
virtual std::string const & | getName () const |
| |
virtual bool | mayBlock () const |
Queries whether this stream's implementation may block when execute() is called. | |
virtual void | checkAbort () const |
Checks whether there is an abort request for this stream's scheduler. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual ExecStreamBufProvision | getOutputBufProvision () const |
Queries the BufferProvision which this stream is capable of when producing tuples. | |
virtual ExecStreamBufProvision | getOutputBufConversion () const |
Queries the BufferProvision to which this stream needs its output to be converted, if any. | |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
For use when initialization has to be deferred until after construction. | |
void | trace (TraceLevel level, std::string message) const |
Records a trace message. | |
bool | isTracing () const |
| |
bool | isTracingLevel (TraceLevel level) const |
Determines whether a particular level is being traced. | |
TraceTarget & | getTraceTarget () const |
| |
SharedTraceTarget | getSharedTraceTarget () const |
| |
std::string | getTraceSourceName () const |
Gets the name of this source. | |
void | setTraceSourceName (std::string const &n) |
Sets the name of this source. | |
TraceLevel | getMinimumTraceLevel () const |
void | disableTracing () |
virtual void | initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name) |
For use when initialization has to be deferred until after construction. | |
void | postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index) |
Posts an exception, such as a row exception. | |
void | postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index) |
Posts an exception, such as a row exception. | |
bool | hasTarget () const |
| |
ErrorTarget & | getErrorTarget () const |
| |
SharedErrorTarget | getSharedErrorTarget () const |
| |
std::string | getErrorSourceName () const |
Gets the name of this source. | |
void | setErrorSourceName (std::string const &n) |
Sets the name of this source. | |
void | disableTarget () |
Static Public Member Functions | |
static ExternalSortExecStream * | newExternalSortExecStream () |
Factory method. | |
Protected Member Functions | |
ExecStreamResult | precheckConduitBuffers () |
Checks the state of the input and output buffers. | |
Protected Attributes | |
SharedExecStreamBufAccessor | pInAccessor |
bool | isOpen |
Whether this stream is currently open. | |
ExecStreamGraph * | pGraph |
Dataflow graph containing this stream. | |
ExecStreamId | id |
Identifier for this stream; local to its containing graph. | |
std::string | name |
Name of stream, as known by optimizer. | |
SharedDynamicParamManager | pDynamicParamManager |
The dynamic parameter manager available to this stream. | |
SharedLogicalTxn | pTxn |
The transaction embracing the stream. | |
ExecStreamResourceQuantity | resourceAllocation |
Resource quantities currently allocated to this stream. | |
SharedCacheAccessor | pQuotaAccessor |
CacheAccessor used for quota tracking. | |
SharedCacheAccessor | pScratchQuotaAccessor |
CacheAccessor used for scratch page quota tracking. | |
bool | needsClose |
SharedExecStreamBufAccessor | pOutAccessor |
Private Member Functions | |
void | computeFirstResult () |
Performs enough sorting to be able to start returning results (non-parallel version). | |
void | computeFirstResultParallel () |
Performs enough sorting to be able to start returning results (parallel version). | |
void | sortRun (ExternalSortRunLoader &runLoader) |
Sorts one run in memory. | |
void | storeRun (ExternalSortSubStream &subStream) |
Stores one run. | |
void | mergeFirstResult () |
Performs enough merging to be able to start returning results. | |
void | optimizeRunOrder () |
Adjusts run order for optimal merging. | |
void | deleteStoredRunInfo (uint iFirstRun, uint nRuns) |
Deletes information on runs after they have been merged. | |
ExternalSortRunLoader & | reserveRunLoader () |
Reserves one run loader for use during parallel sort, blocking on runLoaderAvailable until one becomes unavailable. | |
void | unreserveRunLoader (ExternalSortRunLoader &runLoader) |
Unreserves one run loader after its contents have been stored, making it available to other threads. | |
void | releaseResources () |
Releases resources associated with this stream. | |
virtual void | closeImpl () |
Implements ClosableObject. | |
Private Attributes | |
SharedSegment | pTempSegment |
Segment to use for storing runs externally. | |
ExternalSortInfo | sortInfo |
Global information shared with subcomponents. | |
uint | nParallel |
Maximum number of parallel threads to use. | |
boost::scoped_array< SharedExternalSortRunLoader > | runLoaders |
Array of helpers used to load, quicksort, and store runs. | |
ThreadPool< ExternalSortTask > | threadPool |
Thread pool used during parallel sort. | |
LocalCondition | runLoaderAvailable |
Condition variable used to signal availability of entries in runLoaders array. | |
StrictMutex | runLoaderMutex |
Synchronization for availability status in runLoaders array. | |
boost::scoped_ptr< ExternalSortRunAccessor > | pFinalRunAccessor |
Helper used to read final run when stored externally. | |
boost::scoped_ptr< ExternalSortMerger > | pMerger |
Helper used to merge runs. | |
boost::scoped_ptr< ExternalSortOutput > | pOutputWriter |
Helper used to write XO output. | |
StrictMutex | storedRunMutex |
Synchronization for storedRuns. | |
std::vector< SharedSegStreamAllocation > | storedRuns |
Information on runs stored externally. | |
bool | resultsReady |
Whether the XO is ready to start writing results. | |
bool | storeFinalRun |
Whether to materialize one big final run, or return results directly from last merge stage. | |
int | estimatedNumRows |
Estimate of the number of rows in the sort input. | |
bool | earlyClose |
If true, close producers once all input has been read. | |
Friends | |
class | ExternalSortTask |
Definition at line 52 of file ExternalSortExecStreamImpl.h.
ExternalSortExecStreamImpl::ExternalSortExecStreamImpl | ( | ) | [explicit] |
Definition at line 64 of file ExternalSortExecStreamImpl.cpp.
00065 : sortInfo(*this) 00066 { 00067 }
void ExternalSortExecStreamImpl::computeFirstResult | ( | ) | [private] |
Performs enough sorting to be able to start returning results (non-parallel version).
Definition at line 255 of file ExternalSortExecStreamImpl.cpp.
References EXTSORT_OVERFLOW, ExternalSortRunLoader::isStarted(), ExternalSortRunLoader::loadRun(), SingleInputExecStream::pInAccessor, runLoaders, sortRun(), ExternalSortRunLoader::startRun(), and storeRun().
Referenced by execute().
00256 { 00257 ExternalSortRunLoader &runLoader = *(runLoaders[0]); 00258 for (;;) { 00259 if (!runLoader.isStarted()) { 00260 runLoader.startRun(); 00261 } 00262 ExternalSortRC rc = runLoader.loadRun(*pInAccessor); 00263 if (rc == EXTSORT_OVERFLOW) { 00264 sortRun(runLoader); 00265 storeRun(runLoader); 00266 } else { 00267 return; 00268 } 00269 } 00270 }
void ExternalSortExecStreamImpl::computeFirstResultParallel | ( | ) | [private] |
Performs enough sorting to be able to start returning results (parallel version).
Definition at line 374 of file ExternalSortExecStreamImpl.cpp.
References EXTSORT_ENDOFDATA, mergeFirstResult(), nParallel, reserveRunLoader(), resultsReady, ThreadPoolBase::start(), ExternalSortRunLoader::startRun(), ThreadPoolBase::stop(), ThreadPool< Task >::submitTask(), threadPool, and unreserveRunLoader().
Referenced by execute().
00375 { 00376 // FIXME jvs 19-June-2004: ThreadPool needs to propagate excns! 00377 00378 assert(nParallel > 1); 00379 00380 // minus one because the main dispatcher thread runs in parallel with the 00381 // pooled threads 00382 threadPool.start(nParallel - 1); 00383 try { 00384 for (;;) { 00385 ExternalSortRunLoader &runLoader = reserveRunLoader(); 00386 runLoader.startRun(); 00387 // FIXME 00388 #if 0 00389 ExternalSortRC rc = runLoader.loadRun(*pInputStream); 00390 #else 00391 ExternalSortRC rc = EXTSORT_ENDOFDATA; 00392 #endif 00393 if (rc == EXTSORT_ENDOFDATA) { 00394 // the entire input has been processed, so we're ready 00395 // for merge 00396 unreserveRunLoader(runLoader); 00397 break; 00398 } 00399 // otherwise, schedule a new sort task 00400 // FIXME 00401 #if 0 00402 ExternalSortTask task(*this,runLoader); 00403 threadPool.submitTask(task); 00404 #endif 00405 } 00406 } catch (...) { 00407 // REVEW jvs 19-June-2004: signal a request to expedite cleanup? 00408 00409 // wait for all tasks to clean up 00410 threadPool.stop(); 00411 throw; 00412 } 00413 00414 // wait for all tasks to complete before beginning merge 00415 threadPool.stop(); 00416 00417 mergeFirstResult(); 00418 resultsReady = true; 00419 }
void ExternalSortExecStreamImpl::sortRun | ( | ExternalSortRunLoader & | runLoader | ) | [private] |
Sorts one run in memory.
runLoader | loaded run to sort |
Definition at line 428 of file ExternalSortExecStreamImpl.cpp.
References ExternalSortRunLoader::getLoadedTupleCount(), ExternalSortRunLoader::sort(), and TRACE_FINE.
Referenced by computeFirstResult(), ExternalSortTask::execute(), and execute().
00429 { 00430 FENNEL_TRACE( 00431 TRACE_FINE, 00432 "sorting run with tuple count = " 00433 << runLoader.getLoadedTupleCount()); 00434 runLoader.sort(); 00435 }
void ExternalSortExecStreamImpl::storeRun | ( | ExternalSortSubStream & | subStream | ) | [private] |
Stores one run.
subStream | substream whose contents are to be fetched and stored as a run |
Definition at line 272 of file ExternalSortExecStreamImpl.cpp.
References sortInfo, storedRunMutex, storedRuns, and TRACE_FINE.
Referenced by computeFirstResult(), ExternalSortTask::execute(), execute(), and mergeFirstResult().
00273 { 00274 FENNEL_TRACE( 00275 TRACE_FINE, 00276 "storing run " << storedRuns.size()); 00277 00278 boost::scoped_ptr<ExternalSortRunAccessor> pRunAccessor; 00279 pRunAccessor.reset(new ExternalSortRunAccessor(sortInfo)); 00280 pRunAccessor->storeRun(subStream); 00281 00282 StrictMutexGuard mutexGuard(storedRunMutex); 00283 storedRuns.push_back(pRunAccessor->getStoredRun()); 00284 }
void ExternalSortExecStreamImpl::mergeFirstResult | ( | ) | [private] |
Performs enough merging to be able to start returning results.
Definition at line 286 of file ExternalSortExecStreamImpl.cpp.
References deleteStoredRunInfo(), nParallel, ExternalSortInfo::nSortMemPages, optimizeRunOrder(), pFinalRunAccessor, pMerger, pOutputWriter, releaseResources(), runLoaders, sortInfo, storedRuns, storeFinalRun, storeRun(), and TRACE_FINE.
Referenced by computeFirstResultParallel(), and execute().
00287 { 00288 if (storedRuns.size()) { 00289 for (uint i = 0; i < nParallel; i++) { 00290 runLoaders[i]->releaseResources(); 00291 } 00292 00293 if (!pMerger) { 00294 pMerger.reset(new ExternalSortMerger(sortInfo)); 00295 pMerger->initRunAccess(); 00296 } 00297 00298 uint iFirstRun = storedRuns.size() - 1; 00299 while (iFirstRun > 0) { 00300 uint nRunsToMerge; 00301 00302 // REVIEW jvs 13-June-2004: I had to change this to account for 00303 // the output buffer needed during merge. Not sure why it worked 00304 // in BB? 00305 uint nMergePages = sortInfo.nSortMemPages - 1; 00306 if (storedRuns.size() <= nMergePages) { 00307 nRunsToMerge = storedRuns.size(); 00308 } else { 00309 nRunsToMerge = std::min<uint>( 00310 storedRuns.size() - nMergePages + 1, 00311 nMergePages); 00312 } 00313 00314 optimizeRunOrder(); 00315 iFirstRun = storedRuns.size() - nRunsToMerge; 00316 00317 FENNEL_TRACE( 00318 TRACE_FINE, 00319 "merging from run " << iFirstRun 00320 << " with run count = " << nRunsToMerge); 00321 00322 pMerger->startMerge( 00323 storedRuns.begin() + iFirstRun, nRunsToMerge); 00324 if ((iFirstRun > 0) || storeFinalRun) { 00325 storeRun(*pMerger); 00326 deleteStoredRunInfo(iFirstRun,nRunsToMerge); 00327 } 00328 } 00329 00330 if (storedRuns.size() == 1) { 00331 if (!pFinalRunAccessor) { 00332 pFinalRunAccessor.reset(new ExternalSortRunAccessor(sortInfo)); 00333 } 00334 00335 FENNEL_TRACE( 00336 TRACE_FINE, 00337 "fetching from final run"); 00338 00339 pFinalRunAccessor->initRead(); 00340 pFinalRunAccessor->startRead(storedRuns[0]); 00341 pMerger->releaseResources(); 00342 pOutputWriter->setSubStream(*pFinalRunAccessor); 00343 } else { 00344 FENNEL_TRACE( 00345 TRACE_FINE, 00346 "fetching from final merge with run count = " 00347 << storedRuns.size()); 00348 00349 pOutputWriter->setSubStream(*pMerger); 00350 } 00351 } 00352 }
void ExternalSortExecStreamImpl::optimizeRunOrder | ( | ) | [private] |
Adjusts run order for optimal merging.
Definition at line 354 of file ExternalSortExecStreamImpl.cpp.
References storedRuns.
Referenced by mergeFirstResult().
00355 { 00356 uint i = storedRuns.size() - 1; 00357 while ((i > 0) 00358 && (storedRuns[i]->getWrittenPageCount() 00359 > storedRuns[i - 1]->getWrittenPageCount())) 00360 { 00361 std::swap(storedRuns[i],storedRuns[i - 1]); 00362 i--; 00363 } 00364 }
Deletes information on runs after they have been merged.
iFirstRun | 0-based deletion start position in storedRuns | |
nRuns | number of storedRuns entries to delete |
Definition at line 366 of file ExternalSortExecStreamImpl.cpp.
References storedRunMutex, and storedRuns.
Referenced by mergeFirstResult().
00367 { 00368 StrictMutexGuard mutexGuard(storedRunMutex); 00369 storedRuns.erase( 00370 storedRuns.begin() + iFirstRun, 00371 storedRuns.begin() + iFirstRun + nRuns); 00372 }
ExternalSortRunLoader & ExternalSortExecStreamImpl::reserveRunLoader | ( | ) | [private] |
Reserves one run loader for use during parallel sort, blocking on runLoaderAvailable until one becomes unavailable.
Definition at line 437 of file ExternalSortExecStreamImpl.cpp.
References nParallel, runLoaderAvailable, runLoaderMutex, and runLoaders.
Referenced by computeFirstResultParallel().
00438 { 00439 StrictMutexGuard mutexGuard(runLoaderMutex); 00440 for (;;) { 00441 for (uint i = 0; i < nParallel; ++i) { 00442 ExternalSortRunLoader &runLoader = *(runLoaders[i]); 00443 if (!runLoader.runningParallelTask) { 00444 runLoader.runningParallelTask = true; 00445 return runLoader; 00446 } 00447 } 00448 runLoaderAvailable.wait(mutexGuard); 00449 } 00450 }
void ExternalSortExecStreamImpl::unreserveRunLoader | ( | ExternalSortRunLoader & | runLoader | ) | [private] |
Unreserves one run loader after its contents have been stored, making it available to other threads.
runLoader | loader to unreserve |
Definition at line 452 of file ExternalSortExecStreamImpl.cpp.
References runLoaderAvailable, runLoaderMutex, and ExternalSortRunLoader::runningParallelTask.
Referenced by computeFirstResultParallel(), and ExternalSortTask::execute().
00454 { 00455 StrictMutexGuard mutexGuard(runLoaderMutex); 00456 runLoader.runningParallelTask = false; 00457 runLoaderAvailable.notify_all(); 00458 }
void ExternalSortExecStreamImpl::releaseResources | ( | ) | [private] |
Releases resources associated with this stream.
Definition at line 242 of file ExternalSortExecStreamImpl.cpp.
References pFinalRunAccessor, pMerger, pOutputWriter, runLoaders, and storedRuns.
Referenced by closeImpl(), mergeFirstResult(), and open().
00243 { 00244 if (pFinalRunAccessor) { 00245 pFinalRunAccessor->releaseResources(); 00246 } 00247 00248 runLoaders.reset(); 00249 pMerger.reset(); 00250 pOutputWriter.reset(); 00251 pFinalRunAccessor.reset(); 00252 storedRuns.clear(); 00253 }
void ExternalSortExecStreamImpl::closeImpl | ( | ) | [private, virtual] |
Implements ClosableObject.
ExecStream implementations may override this to release any resources acquired while open.
Reimplemented from ExecStream.
Definition at line 236 of file ExternalSortExecStreamImpl.cpp.
References ExecStream::closeImpl(), and releaseResources().
00237 { 00238 releaseResources(); 00239 ConduitExecStream::closeImpl(); 00240 }
void ExternalSortExecStreamImpl::prepare | ( | ExternalSortExecStreamParams const & | params | ) | [virtual] |
Implements ExternalSortExecStream.
Definition at line 69 of file ExternalSortExecStreamImpl.cpp.
References ExternalSortInfo::cbPage, ExternalSortExecStreamParams::descendingKeyColumns, ExternalSortInfo::descendingKeyColumns, ExternalSortExecStreamParams::distinctness, DUP_ALLOW, DUP_DISCARD, DUP_FAIL, ExternalSortExecStreamParams::earlyClose, earlyClose, ExternalSortExecStreamParams::estimatedNumRows, estimatedNumRows, ExternalSortInfo::externalSegmentAccessor, ExternalSortInfo::keyDesc, ExternalSortExecStreamParams::keyProj, ExternalSortInfo::keyProj, ExternalSortInfo::memSegmentAccessor, nParallel, ExternalSortInfo::nSortMemPages, SegmentAccessor::pCacheAccessor, SingleInputExecStream::pInAccessor, ConduitExecStream::prepare(), TupleDescriptor::projectFrom(), SegmentAccessor::pSegment, ExternalSortExecStreamParams::pTempSegment, pTempSegment, resultsReady, sortInfo, ExternalSortExecStreamParams::storeFinalRun, storeFinalRun, and ExternalSortInfo::tupleDesc.
00071 { 00072 ConduitExecStream::prepare(params); 00073 00074 pTempSegment = params.pTempSegment; 00075 resultsReady = false; 00076 nParallel = 1; 00077 storeFinalRun = params.storeFinalRun; 00078 estimatedNumRows = params.estimatedNumRows; 00079 earlyClose = params.earlyClose; 00080 00081 switch (params.distinctness) { 00082 case DUP_ALLOW: 00083 break; 00084 case DUP_DISCARD: 00085 // TODO 00086 permAssert(false); 00087 case DUP_FAIL: 00088 // TODO 00089 permAssert(false); 00090 } 00091 00092 TupleDescriptor const &srcRecDef = pInAccessor->getTupleDesc(); 00093 sortInfo.keyProj = params.keyProj; 00094 assert(params.outputTupleDesc == srcRecDef); 00095 sortInfo.tupleDesc = srcRecDef; 00096 sortInfo.keyDesc.projectFrom(sortInfo.tupleDesc,params.keyProj); 00097 sortInfo.descendingKeyColumns = params.descendingKeyColumns; 00098 if (sortInfo.descendingKeyColumns.empty()) { 00099 // default is all ascending 00100 sortInfo.descendingKeyColumns.resize(sortInfo.keyProj.size(), false); 00101 } 00102 sortInfo.cbPage = params.pTempSegment->getFullPageSize(); 00103 sortInfo.memSegmentAccessor = params.scratchAccessor; 00104 sortInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor; 00105 sortInfo.externalSegmentAccessor.pSegment = params.pTempSegment; 00106 sortInfo.nSortMemPages = 0; 00107 }
void ExternalSortExecStreamImpl::open | ( | bool | restart | ) | [virtual] |
Opens this stream, acquiring any resources needed in order to be able to fetch data.
A precondition is that input streams must already be opened. A stream can be closed and reopened.
restart | if true, the stream must be already open, and should reset itself to start from the beginning of its result set |
Reimplemented from ConduitExecStream.
Definition at line 160 of file ExternalSortExecStreamImpl.cpp.
References nParallel, ExternalSortInfo::nSortMemPages, ExternalSortInfo::nSortMemPagesPerRun, ConduitExecStream::open(), pOutputWriter, releaseResources(), resultsReady, runLoaders, and sortInfo.
00161 { 00162 if (restart) { 00163 releaseResources(); 00164 } 00165 00166 ConduitExecStream::open(restart); 00167 00168 // divvy up available memory by degree of parallelism 00169 sortInfo.nSortMemPagesPerRun = (sortInfo.nSortMemPages / nParallel); 00170 00171 // subtract off one page per run for I/O buffering 00172 assert(sortInfo.nSortMemPagesPerRun > 0); 00173 sortInfo.nSortMemPagesPerRun--; 00174 00175 // need at least two non-I/O pages per run: one for keys and one for data 00176 assert(sortInfo.nSortMemPagesPerRun > 1); 00177 00178 runLoaders.reset(new SharedExternalSortRunLoader[nParallel]); 00179 for (uint i = 0; i < nParallel; ++i) { 00180 runLoaders[i].reset(new ExternalSortRunLoader(sortInfo)); 00181 } 00182 00183 pOutputWriter.reset(new ExternalSortOutput(sortInfo)); 00184 00185 for (uint i = 0; i < nParallel; ++i) { 00186 runLoaders[i]->startRun(); 00187 } 00188 00189 // default to local sort as output obj 00190 pOutputWriter->setSubStream(*(runLoaders[0])); 00191 00192 resultsReady = false; 00193 }
ExecStreamResult ExternalSortExecStreamImpl::execute | ( | ExecStreamQuantum const & | quantum | ) | [virtual] |
Executes this stream.
quantum | governs the maximum amount of execution to perform |
Implements ExecStream.
Definition at line 195 of file ExternalSortExecStreamImpl.cpp.
References ExecStreamGraphImpl::closeProducers(), computeFirstResult(), computeFirstResultParallel(), earlyClose, EXECBUF_EOS, EXECRC_BUF_UNDERFLOW, EXECRC_YIELD, ExecStream::getGraph(), ExecStream::getStreamId(), ExternalSortRunLoader::isStarted(), mergeFirstResult(), nParallel, SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, pOutputWriter, ConduitExecStream::precheckConduitBuffers(), resultsReady, runLoaders, sortRun(), storedRuns, storeFinalRun, and storeRun().
00197 { 00198 if (!resultsReady) { 00199 if (pInAccessor->getState() != EXECBUF_EOS) { 00200 ExecStreamResult rc = precheckConduitBuffers(); 00201 if (rc != EXECRC_YIELD) { 00202 return rc; 00203 } 00204 if (nParallel > 1) { 00205 // FIXME 00206 computeFirstResultParallel(); 00207 } else { 00208 computeFirstResult(); 00209 return EXECRC_BUF_UNDERFLOW; 00210 } 00211 } else { 00212 ExternalSortRunLoader &runLoader = *(runLoaders[0]); 00213 if (runLoader.isStarted()) { 00214 sortRun(runLoader); 00215 if (storedRuns.size() || storeFinalRun) { 00216 // store last run 00217 storeRun(runLoader); 00218 } 00219 } 00220 mergeFirstResult(); 00221 00222 // close the producers now that we've read all input 00223 if (earlyClose) { 00224 ExecStreamGraphImpl &graphImpl = 00225 dynamic_cast<ExecStreamGraphImpl&>(getGraph()); 00226 graphImpl.closeProducers(getStreamId()); 00227 } 00228 00229 resultsReady = true; 00230 } 00231 } 00232 00233 return pOutputWriter->fetch(*pOutAccessor); 00234 }
void ExternalSortExecStreamImpl::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity, | |||
ExecStreamResourceSettingType & | optType | |||
) | [virtual] |
Determines resource requirements for this stream.
Default implementation declares zero resource requirements.
minQuantity | receives the minimum resource quantity needed by this stream in order to execute | |
optQuantity | receives the resource quantity needed by this stream in order to execute optimally | |
optType | Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting. |
Reimplemented from ExecStream.
Definition at line 109 of file ExternalSortExecStreamImpl.cpp.
References estimatedNumRows, EXEC_RESOURCE_ESTIMATE, EXEC_RESOURCE_UNBOUNDED, ExecStream::getResourceRequirements(), isMAXU(), max(), MAXU, ExternalSortInfo::memSegmentAccessor, ExecStreamResourceQuantity::nCachePages, SingleOutputExecStream::pOutAccessor, SegmentAccessor::pSegment, and sortInfo.
00113 { 00114 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00115 00116 // REVIEW 00117 uint minPages = 3; 00118 minQuantity.nCachePages += minPages; 00119 00120 // if no estimated row count is available, request an unbounded amount 00121 // from the resource governor; otherwise, estimate the number of pages 00122 // for an in-memory sort 00123 if (isMAXU(estimatedNumRows)) { 00124 optType = EXEC_RESOURCE_UNBOUNDED; 00125 } else { 00126 // use the average of the min and max rowsizes 00127 // TODO - use stats to come up with a more accurate average 00128 RecordNum nPages = 00129 estimatedNumRows * 00130 ((pOutAccessor->getScratchTupleAccessor().getMaxByteCount() + 00131 pOutAccessor->getScratchTupleAccessor().getMinByteCount()) / 2) / 00132 sortInfo.memSegmentAccessor.pSegment->getUsablePageSize(); 00133 uint numPages; 00134 if (nPages >= uint(MAXU)) { 00135 numPages = uint(MAXU) - 1; 00136 } else { 00137 numPages = uint(nPages); 00138 } 00139 // make sure the opt is bigger than the min; otherwise, the 00140 // resource governor won't try to give it extra 00141 optQuantity.nCachePages += std::max(minPages + 1, numPages); 00142 optType = EXEC_RESOURCE_ESTIMATE; 00143 } 00144 }
void ExternalSortExecStreamImpl::setResourceAllocation | ( | ExecStreamResourceQuantity & | quantity | ) | [virtual] |
Sets current resource allocation for this stream.
If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.
quantity | allocated resource quantity |
Reimplemented from ExecStream.
Definition at line 146 of file ExternalSortExecStreamImpl.cpp.
References ExecStreamResourceQuantity::nCachePages, nParallel, ExternalSortInfo::nSortMemPages, ExecStreamResourceQuantity::nThreads, ExecStream::setResourceAllocation(), and sortInfo.
00148 { 00149 // REVIEW 00150 ConduitExecStream::setResourceAllocation(quantity); 00151 sortInfo.nSortMemPages = quantity.nCachePages; 00152 nParallel = quantity.nThreads + 1; 00153 00154 // NOTE jvs 10-Nov-2004: parallel sort is currently disabled 00155 // as an effect of the scheduler-revamp. We may resurrect it, or 00156 // we may decide to handle parallelism up at the scheduler level. 00157 assert(nParallel == 1); 00158 }
ExternalSortExecStream * ExternalSortExecStream::newExternalSortExecStream | ( | ) | [static, inherited] |
Factory method.
Definition at line 33 of file ExternalSortExecStreamImpl.cpp.
Referenced by LbmExecStreamTestBase::initSorterExecStream(), LbmSearchTest::loadTableAndIndex(), LhxAggExecStreamTest::testGroupCountImpl(), LhxJoinExecStreamTest::testImpl(), ExternalSortExecStreamTest::testImpl(), LbmLoadBitmapTest::testLoad(), LhxAggExecStreamTest::testSingleValueImpl(), and ExecStreamFactory::visit().
00034 { 00035 return new ExternalSortExecStreamImpl(); 00036 }
void ConduitExecStream::prepare | ( | ConduitExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 42 of file ConduitExecStream.cpp.
References SingleInputExecStream::pInAccessor, SingleOutputExecStream::pOutAccessor, SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
Referenced by prepare(), LcsClusterAppendExecStream::prepare(), LbmNormalizerExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SegBufferReaderExecStream::prepare(), SegBufferExecStream::prepare(), ScratchBufferExecStream::prepare(), ReshapeExecStream::prepare(), DoubleBufferExecStream::prepare(), CopyExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), and CalcExecStream::prepare().
00043 { 00044 SingleInputExecStream::prepare(params); 00045 00046 if (params.outputTupleDesc.empty()) { 00047 pOutAccessor->setTupleShape( 00048 pInAccessor->getTupleDesc(), 00049 pInAccessor->getTupleFormat()); 00050 } 00051 00052 SingleOutputExecStream::prepare(params); 00053 }
void SingleInputExecStream::prepare | ( | SingleInputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 44 of file SingleInputExecStream.cpp.
References SingleInputExecStream::getInputBufProvision(), SingleInputExecStream::pInAccessor, and ExecStream::prepare().
Referenced by JavaSinkExecStream::prepare(), MockConsumerExecStream::prepare(), DiffluenceExecStream::prepare(), and ConduitExecStream::prepare().
00045 { 00046 ExecStream::prepare(params); 00047 00048 assert(pInAccessor); 00049 assert(pInAccessor->getProvision() == getInputBufProvision()); 00050 }
void ExecStream::prepare | ( | ExecStreamParams const & | params | ) | [virtual, inherited] |
Prepares this stream for execution.
A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.
params | instance of stream parameterization class which should be used to prepare this stream |
Definition at line 84 of file ExecStream.cpp.
References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.
Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().
00085 { 00086 if (pGraph) { 00087 pDynamicParamManager = pGraph->getDynamicParamManager(); 00088 } 00089 pQuotaAccessor = params.pCacheAccessor; 00090 pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor; 00091 }
void SingleOutputExecStream::prepare | ( | SingleOutputExecStreamParams const & | params | ) | [virtual, inherited] |
Definition at line 48 of file SingleOutputExecStream.cpp.
References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().
Referenced by BTreeExecStream::prepare(), FlatFileExecStreamImpl::prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().
00049 { 00050 ExecStream::prepare(params); 00051 assert(pOutAccessor); 00052 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00053 if (pOutAccessor->getTupleDesc().empty()) { 00054 assert(!params.outputTupleDesc.empty()); 00055 pOutAccessor->setTupleShape( 00056 params.outputTupleDesc, 00057 params.outputTupleFormat); 00058 } 00059 }
ExecStreamResult ConduitExecStream::precheckConduitBuffers | ( | ) | [protected, inherited] |
Checks the state of the input and output buffers.
If input empty, requests production. If input EOS, propagates that to output buffer. If output full, returns EXECRC_OVERFLOW.
Definition at line 61 of file ConduitExecStream.cpp.
References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_NONEMPTY, EXECBUF_OVERFLOW, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_YIELD, SingleInputExecStream::pInAccessor, and SingleOutputExecStream::pOutAccessor.
Referenced by execute(), FtrsTableWriterExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreeInsertExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), ReshapeExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), and CalcExecStream::execute().
00062 { 00063 switch (pInAccessor->getState()) { 00064 case EXECBUF_EMPTY: 00065 pInAccessor->requestProduction(); 00066 return EXECRC_BUF_UNDERFLOW; 00067 case EXECBUF_UNDERFLOW: 00068 return EXECRC_BUF_UNDERFLOW; 00069 case EXECBUF_EOS: 00070 pOutAccessor->markEOS(); 00071 return EXECRC_EOS; 00072 case EXECBUF_NONEMPTY: 00073 case EXECBUF_OVERFLOW: 00074 break; 00075 default: 00076 permAssert(false); 00077 } 00078 if (pOutAccessor->getState() == EXECBUF_OVERFLOW) { 00079 return EXECRC_BUF_OVERFLOW; 00080 } 00081 return EXECRC_YIELD; 00082 }
void ConduitExecStream::setOutputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | outAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for outputs from this stream.
This method is only ever called once, before prepare.
outAccessors | buffer accessors ordered by output stream |
Reimplemented from SingleInputExecStream.
Definition at line 36 of file ConduitExecStream.cpp.
References SingleOutputExecStream::setOutputBufAccessors().
00038 { 00039 SingleOutputExecStream::setOutputBufAccessors(outAccessors); 00040 }
void ConduitExecStream::setInputBufAccessors | ( | std::vector< SharedExecStreamBufAccessor > const & | inAccessors | ) | [virtual, inherited] |
Initializes the buffer accessors for inputs to this stream.
This method is only ever called once, before prepare.
inAccessors | buffer accessors ordered by input stream |
Reimplemented from SingleInputExecStream.
Definition at line 30 of file ConduitExecStream.cpp.
References SingleInputExecStream::setInputBufAccessors().
00032 { 00033 SingleInputExecStream::setInputBufAccessors(inAccessors); 00034 }
ExecStreamBufProvision SingleInputExecStream::getInputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
Reimplemented from ExecStream.
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, and SegBufferWriterExecStream.
Definition at line 62 of file SingleInputExecStream.cpp.
References BUFPROV_PRODUCER.
Referenced by SingleInputExecStream::prepare().
00063 { 00064 return BUFPROV_PRODUCER; 00065 }
bool ExecStream::canEarlyClose | ( | ) | [virtual, inherited] |
Reimplemented in SegBufferWriterExecStream.
Definition at line 49 of file ExecStream.cpp.
ExecStreamGraph & ExecStream::getGraph | ( | ) | const [inline, inherited] |
Definition at line 293 of file ExecStream.h.
References ExecStream::pGraph.
Referenced by execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
ExecStreamId ExecStream::getStreamId | ( | ) | const [inline, inherited] |
Definition at line 288 of file ExecStream.h.
References ExecStream::id.
Referenced by execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().
00289 { 00290 return id; 00291 }
void ExecStream::getResourceRequirements | ( | ExecStreamResourceQuantity & | minQuantity, | |
ExecStreamResourceQuantity & | optQuantity | |||
) | [virtual, inherited] |
Reimplemented in DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, FlatFileExecStreamImpl, BTreeInsertExecStream, BTreeReadExecStream, FtrsTableWriterExecStream, LbmChopperExecStream, LbmSplicerExecStream, LcsClusterAppendExecStream, LcsClusterReplaceExecStream, LcsRowScanBaseExecStream, and LcsRowScanExecStream.
Definition at line 102 of file ExecStream.cpp.
References ExecStreamResourceQuantity::nCachePages, and ExecStreamResourceQuantity::nThreads.
00105 { 00106 minQuantity.nThreads = 0; 00107 minQuantity.nCachePages = 0; 00108 optQuantity = minQuantity; 00109 }
void ExecStream::setName | ( | std::string const & | ) | [virtual, inherited] |
Sets unique name of this stream.
Definition at line 157 of file ExecStream.cpp.
References ExecStream::name.
00158 { 00159 name = nameInit; 00160 }
std::string const & ExecStream::getName | ( | ) | const [virtual, inherited] |
Definition at line 162 of file ExecStream.cpp.
References ExecStream::name.
Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00163 { 00164 return name; 00165 }
bool ExecStream::mayBlock | ( | ) | const [virtual, inherited] |
Queries whether this stream's implementation may block when execute() is called.
For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.
Definition at line 167 of file ExecStream.cpp.
void ExecStream::checkAbort | ( | ) | const [virtual, inherited] |
Checks whether there is an abort request for this stream's scheduler.
Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.
Definition at line 72 of file ExecStream.cpp.
References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.
Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().
00073 { 00074 if (!pGraph) { 00075 return; 00076 } 00077 ExecStreamScheduler *pScheduler = pGraph->getScheduler(); 00078 if (!pScheduler) { 00079 return; 00080 } 00081 pScheduler->checkAbort(); 00082 }
ExecStreamBufProvision ExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented in BarrierExecStream, DiffluenceExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, SegBufferWriterExecStream, SingleOutputExecStream, SplitterExecStream, ValuesExecStream, JavaTransformExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.
Definition at line 172 of file ExecStream.cpp.
References BUFPROV_NONE.
00173 { 00174 return BUFPROV_NONE; 00175 }
ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision | ( | ) | const [virtual, inherited] |
Queries the BufferProvision which this stream is capable of when producing tuples.
Reimplemented from ExecStream.
Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.
Definition at line 69 of file SingleOutputExecStream.cpp.
References BUFPROV_CONSUMER.
Referenced by SingleOutputExecStream::prepare().
00070 { 00071 return BUFPROV_CONSUMER; 00072 }
ExecStreamBufProvision ExecStream::getOutputBufConversion | ( | ) | const [virtual, inherited] |
Queries the BufferProvision to which this stream needs its output to be converted, if any.
Reimplemented in JavaTransformExecStream.
Definition at line 177 of file ExecStream.cpp.
References BUFPROV_NONE.
00178 { 00179 return BUFPROV_NONE; 00180 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, | |
std::string | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pTraceTarget | the TraceTarget to which messages will be sent | |
name | the name of this source |
Definition at line 46 of file TraceSource.cpp.
References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().
00049 { 00050 assert(!pTraceTarget.get()); 00051 00052 pTraceTarget = pTraceTargetInit; 00053 name = nameInit; 00054 if (isTracing()) { 00055 minimumLevel = pTraceTarget->getSourceTraceLevel(name); 00056 } else { 00057 minimumLevel = TRACE_OFF; 00058 } 00059 }
void TraceSource::trace | ( | TraceLevel | level, | |
std::string | message | |||
) | const [inherited] |
Records a trace message.
Normally only called via FENNEL_TRACE.
level | severity level of event being trace | |
message | the text of the message |
Definition at line 61 of file TraceSource.cpp.
References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().
Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().
00062 { 00063 if (isTracing()) { 00064 getTraceTarget().notifyTrace(name,level,message); 00065 } 00066 }
bool TraceSource::isTracing | ( | ) | const [inline, inherited] |
Definition at line 88 of file TraceSource.h.
Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().
00089 { 00090 return pTraceTarget.get() ? true : false; 00091 }
bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
Determines whether a particular level is being traced.
level | trace level to test |
Definition at line 100 of file TraceSource.h.
Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().
00101 { 00102 return level >= minimumLevel; 00103 }
TraceTarget& TraceSource::getTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 108 of file TraceSource.h.
Referenced by TraceSource::trace().
00109 { 00110 assert(isTracing()); 00111 return *(pTraceTarget.get()); 00112 }
SharedTraceTarget TraceSource::getSharedTraceTarget | ( | ) | const [inline, inherited] |
Definition at line 117 of file TraceSource.h.
Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().
00118 { 00119 return pTraceTarget; 00120 }
std::string TraceSource::getTraceSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also TraceSources.
Definition at line 127 of file TraceSource.h.
Referenced by LcsClusterAppendExecStream::initLoad().
00128 { 00129 return name; 00130 }
void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
TraceLevel TraceSource::getMinimumTraceLevel | ( | ) | const [inline, inherited] |
void TraceSource::disableTracing | ( | ) | [inherited] |
Definition at line 68 of file TraceSource.cpp.
References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.
Referenced by TestBase::afterTestCase().
00069 { 00070 pTraceTarget.reset(); 00071 minimumLevel = TRACE_OFF; 00072 }
void ErrorSource::initErrorSource | ( | SharedErrorTarget | pErrorTarget, | |
const std::string & | name | |||
) | [virtual, inherited] |
For use when initialization has to be deferred until after construction.
pErrorTarget | the ErrorTarget to which errors will be posted | |
name | the name of this source |
Definition at line 47 of file ErrorSource.cpp.
References ErrorSource::name, and ErrorSource::pErrorTarget.
Referenced by ErrorSource::ErrorSource().
00050 { 00051 pErrorTarget = pErrorTargetInit; 00052 name = nameInit; 00053 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
void * | address, | |||
long | capacity, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 55 of file ErrorSource.cpp.
References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().
Referenced by FlatFileExecStreamImpl::logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().
00058 { 00059 if (hasTarget()) { 00060 getErrorTarget().notifyError( 00061 name, level, message, address, capacity, index); 00062 } 00063 }
void ErrorSource::postError | ( | ErrorLevel | level, | |
const std::string & | message, | |||
const TupleDescriptor & | errorDesc, | |||
const TupleData & | errorTuple, | |||
int | index | |||
) | [inherited] |
Posts an exception, such as a row exception.
Definition at line 65 of file ErrorSource.cpp.
References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().
00068 { 00069 if (!hasTarget()) { 00070 return; 00071 } 00072 00073 if (!pErrorBuf) { 00074 errorAccessor.compute(errorDesc); 00075 uint cbMax = errorAccessor.getMaxByteCount(); 00076 pErrorBuf.reset(new FixedBuffer[cbMax]); 00077 } 00078 00079 uint cbTuple = errorAccessor.getByteCount(errorTuple); 00080 errorAccessor.marshal(errorTuple, pErrorBuf.get()); 00081 postError(level, message, pErrorBuf.get(), cbTuple, index); 00082 }
bool ErrorSource::hasTarget | ( | ) | const [inline, inherited] |
Definition at line 112 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00113 { 00114 return pErrorTarget.get() ? true : false; 00115 }
ErrorTarget& ErrorSource::getErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 120 of file ErrorSource.h.
Referenced by ErrorSource::postError().
00121 { 00122 assert(hasTarget()); 00123 return *(pErrorTarget.get()); 00124 }
SharedErrorTarget ErrorSource::getSharedErrorTarget | ( | ) | const [inline, inherited] |
Definition at line 129 of file ErrorSource.h.
00130 { 00131 return pErrorTarget; 00132 }
std::string ErrorSource::getErrorSourceName | ( | ) | const [inline, inherited] |
Gets the name of this source.
Useful to construct nested names for subcomponents that are also ErrorSources.
Definition at line 139 of file ErrorSource.h.
00140 { 00141 return name; 00142 }
void ErrorSource::setErrorSourceName | ( | std::string const & | n | ) | [inline, inherited] |
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 148 of file ErrorSource.h.
00149 { 00150 name = n; 00151 }
void ErrorSource::disableTarget | ( | ) | [inherited] |
Definition at line 84 of file ErrorSource.cpp.
References ErrorSource::pErrorTarget.
00085 { 00086 pErrorTarget.reset(); 00087 }
friend class ExternalSortTask [friend] |
Definition at line 55 of file ExternalSortExecStreamImpl.h.
Segment to use for storing runs externally.
Definition at line 60 of file ExternalSortExecStreamImpl.h.
Referenced by prepare().
Global information shared with subcomponents.
Definition at line 65 of file ExternalSortExecStreamImpl.h.
Referenced by getResourceRequirements(), mergeFirstResult(), open(), prepare(), setResourceAllocation(), and storeRun().
uint ExternalSortExecStreamImpl::nParallel [private] |
Maximum number of parallel threads to use.
Definition at line 70 of file ExternalSortExecStreamImpl.h.
Referenced by computeFirstResultParallel(), execute(), mergeFirstResult(), open(), prepare(), reserveRunLoader(), and setResourceAllocation().
boost::scoped_array<SharedExternalSortRunLoader> ExternalSortExecStreamImpl::runLoaders [private] |
Array of helpers used to load, quicksort, and store runs.
This array has size equal to nParallel. For non-parallel sort (nParallel=1), this means there's just one entry. For parallel sort, this array acts as an availability queue governed by runLoaderAvailable, runLoaderMutex, and the ExternalSortRunLoader::runningParallelTask flag.
Definition at line 79 of file ExternalSortExecStreamImpl.h.
Referenced by computeFirstResult(), execute(), mergeFirstResult(), open(), releaseResources(), and reserveRunLoader().
Thread pool used during parallel sort.
Definition at line 84 of file ExternalSortExecStreamImpl.h.
Referenced by computeFirstResultParallel().
Condition variable used to signal availability of entries in runLoaders array.
Definition at line 90 of file ExternalSortExecStreamImpl.h.
Referenced by reserveRunLoader(), and unreserveRunLoader().
Synchronization for availability status in runLoaders array.
Definition at line 95 of file ExternalSortExecStreamImpl.h.
Referenced by reserveRunLoader(), and unreserveRunLoader().
boost::scoped_ptr<ExternalSortRunAccessor> ExternalSortExecStreamImpl::pFinalRunAccessor [private] |
Helper used to read final run when stored externally.
Definition at line 100 of file ExternalSortExecStreamImpl.h.
Referenced by mergeFirstResult(), and releaseResources().
boost::scoped_ptr<ExternalSortMerger> ExternalSortExecStreamImpl::pMerger [private] |
Helper used to merge runs.
Definition at line 105 of file ExternalSortExecStreamImpl.h.
Referenced by mergeFirstResult(), and releaseResources().
boost::scoped_ptr<ExternalSortOutput> ExternalSortExecStreamImpl::pOutputWriter [private] |
Helper used to write XO output.
Definition at line 110 of file ExternalSortExecStreamImpl.h.
Referenced by execute(), mergeFirstResult(), open(), and releaseResources().
Synchronization for storedRuns.
Definition at line 115 of file ExternalSortExecStreamImpl.h.
Referenced by deleteStoredRunInfo(), and storeRun().
std::vector<SharedSegStreamAllocation> ExternalSortExecStreamImpl::storedRuns [private] |
Information on runs stored externally.
Definition at line 120 of file ExternalSortExecStreamImpl.h.
Referenced by deleteStoredRunInfo(), execute(), mergeFirstResult(), optimizeRunOrder(), releaseResources(), and storeRun().
bool ExternalSortExecStreamImpl::resultsReady [private] |
Whether the XO is ready to start writing results.
Definition at line 125 of file ExternalSortExecStreamImpl.h.
Referenced by computeFirstResultParallel(), execute(), open(), and prepare().
bool ExternalSortExecStreamImpl::storeFinalRun [private] |
Whether to materialize one big final run, or return results directly from last merge stage.
Definition at line 131 of file ExternalSortExecStreamImpl.h.
Referenced by execute(), mergeFirstResult(), and prepare().
int ExternalSortExecStreamImpl::estimatedNumRows [private] |
Estimate of the number of rows in the sort input.
If < 0, no stats were available to estimate this value.
Definition at line 137 of file ExternalSortExecStreamImpl.h.
Referenced by getResourceRequirements(), and prepare().
bool ExternalSortExecStreamImpl::earlyClose [private] |
If true, close producers once all input has been read.
Definition at line 142 of file ExternalSortExecStreamImpl.h.
SharedExecStreamBufAccessor SingleInputExecStream::pInAccessor [protected, inherited] |
Definition at line 51 of file SingleInputExecStream.h.
Referenced by SortedAggExecStream::compareGroupByKeys(), computeFirstResult(), execute(), LbmSplicerExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeSearchExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), JavaSinkExecStream::execute(), UncollectExecStream::execute(), SplitterExecStream::execute(), SortedAggExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferReaderExecStream::execute(), SegBufferExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockConsumerExecStream::execute(), DoubleBufferExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), BernoulliSamplingExecStream::execute(), CalcExecStream::execute(), BTreePrefetchSearchExecStream::getNextPageForPrefetch(), LcsClusterReplaceExecStream::getTupleForLoad(), LcsClusterAppendExecStream::getTupleForLoad(), LbmSplicerExecStream::getValidatedTuple(), LcsClusterReplaceExecStream::initTupleLoadParams(), BTreeSearchExecStream::innerSearchLoop(), BTreePrefetchSearchExecStream::innerSearchLoop(), LcsClusterReplaceExecStream::open(), LbmNormalizerExecStream::open(), SingleInputExecStream::open(), SegBufferWriterExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), MockConsumerExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), LcsClusterAppendExecStream::postProcessTuple(), ConduitExecStream::precheckConduitBuffers(), prepare(), LcsClusterAppendExecStream::prepare(), LbmSplicerExecStream::prepare(), LbmNormalizerExecStream::prepare(), BTreeSearchExecStream::prepare(), BTreeInsertExecStream::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleInputExecStream::prepare(), SegBufferReaderExecStream::prepare(), ReshapeExecStream::prepare(), DiffluenceExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), BernoulliSamplingExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), BTreeSearchExecStream::readSearchKey(), LhxAggExecStream::setAggComputers(), LhxAggExecStream::setHashInfo(), SingleInputExecStream::setInputBufAccessors(), and JavaSinkExecStream::stuffByteBuffer().
bool ExecStream::isOpen [protected, inherited] |
Whether this stream is currently open.
Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.
Definition at line 61 of file ExecStream.h.
Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().
ExecStreamGraph* ExecStream::pGraph [protected, inherited] |
Dataflow graph containing this stream.
Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.
Definition at line 68 of file ExecStream.h.
Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().
ExecStreamId ExecStream::id [protected, inherited] |
Identifier for this stream; local to its containing graph.
Definition at line 73 of file ExecStream.h.
Referenced by ExecStream::getStreamId().
std::string ExecStream::name [protected, inherited] |
Name of stream, as known by optimizer.
Reimplemented from TraceSource.
Definition at line 78 of file ExecStream.h.
Referenced by ExecStream::ExecStream(), ExecStream::getName(), FlatFileExecStreamImpl::open(), and ExecStream::setName().
SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited] |
The dynamic parameter manager available to this stream.
(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 85 of file ExecStream.h.
Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().
SharedLogicalTxn ExecStream::pTxn [protected, inherited] |
The transaction embracing the stream.
Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())
Definition at line 94 of file ExecStream.h.
Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().
ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited] |
Resource quantities currently allocated to this stream.
Definition at line 100 of file ExecStream.h.
Referenced by ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited] |
CacheAccessor used for quota tracking.
Definition at line 105 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited] |
CacheAccessor used for scratch page quota tracking.
Definition at line 110 of file ExecStream.h.
Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().
SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited] |
Definition at line 56 of file SingleOutputExecStream.h.
Referenced by LcsClusterAppendExecStream::compress(), execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), FlatFileExecStreamImpl::execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), FlatFileExecStreamImpl::prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().