FlatFileExecStreamImpl Class Reference

FlatFileExecStreamImpl implements the FlatFileExecStream interface. More...

#include <FlatFileExecStreamImpl.h>

Inheritance diagram for FlatFileExecStreamImpl:

FlatFileExecStream SingleOutputExecStream ExecStream ClosableObject TraceSource ErrorSource List of all members.

Public Member Functions

virtual void prepare (FlatFileExecStreamParams const &params)
virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity)
virtual void open (bool restart)
 Opens this stream, acquiring any resources needed in order to be able to fetch data.
virtual ExecStreamResult execute (ExecStreamQuantum const &quantum)
 Executes this stream.
virtual void prepare (SingleOutputExecStreamParams const &params)
virtual void prepare (ExecStreamParams const &params)
 Prepares this stream for execution.
virtual void setInputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &inAccessors)
 Initializes the buffer accessors for inputs to this stream.
virtual void setOutputBufAccessors (std::vector< SharedExecStreamBufAccessor > const &outAccessors)
 Initializes the buffer accessors for outputs from this stream.
virtual ExecStreamBufProvision getOutputBufProvision () const
 Queries the BufferProvision which this stream is capable of when producing tuples.
virtual bool canEarlyClose ()
 
Returns:
true if the stream can be closed early

ExecStreamGraphgetGraph () const
 
Returns:
reference to containing graph

ExecStreamId getStreamId () const
 
Returns:
the identifier for this stream within containing graph

virtual void getResourceRequirements (ExecStreamResourceQuantity &minQuantity, ExecStreamResourceQuantity &optQuantity, ExecStreamResourceSettingType &optType)
 Determines resource requirements for this stream.
virtual void setResourceAllocation (ExecStreamResourceQuantity &quantity)
 Sets current resource allocation for this stream.
virtual void setName (std::string const &)
 Sets unique name of this stream.
virtual std::string const & getName () const
 
Returns:
the name of this stream, as known by the optimizer

virtual bool mayBlock () const
 Queries whether this stream's implementation may block when execute() is called.
virtual void checkAbort () const
 Checks whether there is an abort request for this stream's scheduler.
virtual ExecStreamBufProvision getOutputBufConversion () const
 Queries the BufferProvision to which this stream needs its output to be converted, if any.
virtual ExecStreamBufProvision getInputBufProvision () const
 Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.
bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
 For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
 Records a trace message.
bool isTracing () const
 
Returns:
true iff tracing is enabled for this source

bool isTracingLevel (TraceLevel level) const
 Determines whether a particular level is being traced.
TraceTargetgetTraceTarget () const
 
Returns:
the TraceTarget for this source

SharedTraceTarget getSharedTraceTarget () const
 
Returns:
the SharedTraceTarget for this source

std::string getTraceSourceName () const
 Gets the name of this source.
void setTraceSourceName (std::string const &n)
 Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
virtual void initErrorSource (SharedErrorTarget pErrorTarget, const std::string &name)
 For use when initialization has to be deferred until after construction.
void postError (ErrorLevel level, const std::string &message, void *address, long capacity, int index)
 Posts an exception, such as a row exception.
void postError (ErrorLevel level, const std::string &message, const TupleDescriptor &errorDesc, const TupleData &errorTuple, int index)
 Posts an exception, such as a row exception.
bool hasTarget () const
 
Returns:
true iff an error target has been set

ErrorTargetgetErrorTarget () const
 
Returns:
the ErrorTarget for this source

SharedErrorTarget getSharedErrorTarget () const
 
Returns:
the SharedErrorTarget for this source

std::string getErrorSourceName () const
 Gets the name of this source.
void setErrorSourceName (std::string const &n)
 Sets the name of this source.
void disableTarget ()

Static Public Member Functions

static FlatFileExecStreamnewFlatFileExecStream ()
 Factory method.

Protected Attributes

SharedExecStreamBufAccessor pOutAccessor
bool isOpen
 Whether this stream is currently open.
ExecStreamGraphpGraph
 Dataflow graph containing this stream.
ExecStreamId id
 Identifier for this stream; local to its containing graph.
std::string name
 Name of stream, as known by optimizer.
SharedDynamicParamManager pDynamicParamManager
 The dynamic parameter manager available to this stream.
SharedLogicalTxn pTxn
 The transaction embracing the stream.
ExecStreamResourceQuantity resourceAllocation
 Resource quantities currently allocated to this stream.
SharedCacheAccessor pQuotaAccessor
 CacheAccessor used for quota tracking.
SharedCacheAccessor pScratchQuotaAccessor
 CacheAccessor used for scratch page quota tracking.
bool needsClose

Private Member Functions

virtual void closeImpl ()
 Implements ClosableObject.
void releaseResources ()
 Releases resources associated with this stream.
uint findField (const std::string &name)
 Finds an output column by its name and returns the column's index.
FlatFileRowDescriptor readTupleDescriptor (const TupleDescriptor &tupleDesc)
 Translates a TupleDescriptor into a FlatFileRowDescriptor.
void handleTuple (FlatFileRowParseResult &result, TupleData &tuple)
 Processes a row of input data.
void describeStream (TupleData &tupleData)
 Based on rows sampled, generate an output row with a description of the stream.
void logError (const FlatFileRowParseResult &result)
 Logs a single error to file.
void logError (const std::string reason, const FlatFileRowParseResult &result)
 Logs a single error to file.
void checkRowDelimiter ()
 Throws an error if a row delimiter was not found.

Private Attributes

std::string dataFilePath
bool header
bool lenient
bool trim
bool mapped
std::vector< std::string > columnNames
FlatFileRowDescriptor rowDesc
SharedFlatFileBuffer pBuffer
PBuffer pBufferStorage
char * next
SharedFlatFileParser pParser
FlatFileRowParseResult lastResult
TupleDescriptor textDesc
TupleData textTuple
TupleData dataTuple
bool isRowPending
FlatFileMode mode
int numRowsScan
bool done
VectorOfUint fieldSizes
std::string describeResult
SegPageLock bufferLock
SegmentAccessor scratchAccessor
uint nRowsOutput
uint nRowErrors
std::string reason
TupleDescriptor errorDesc
TupleData errorTuple

Static Private Attributes

static const uint MAX_ROW_ERROR_TEXT_WIDTH

Detailed Description

FlatFileExecStreamImpl implements the FlatFileExecStream interface.

Author:
John Pham
Version:
Id
//open/dev/fennel/flatfile/FlatFileExecStreamImpl.h#2

Definition at line 54 of file FlatFileExecStreamImpl.h.


Member Function Documentation

void FlatFileExecStreamImpl::closeImpl (  )  [private, virtual]

Implements ClosableObject.

ExecStream implementations may override this to release any resources acquired while open.

Reimplemented from ExecStream.

Definition at line 427 of file FlatFileExecStreamImpl.cpp.

References ExecStream::closeImpl(), and releaseResources().

00428 {
00429     releaseResources();
00430     SingleOutputExecStream::closeImpl();
00431 }

void FlatFileExecStreamImpl::releaseResources (  )  [private]

Releases resources associated with this stream.

Definition at line 433 of file FlatFileExecStreamImpl.cpp.

References pBuffer.

Referenced by closeImpl(), and open().

00434 {
00435     if (pBuffer) {
00436         pBuffer->close();
00437     }
00438 }

uint FlatFileExecStreamImpl::findField ( const std::string &  name  )  [private]

Finds an output column by its name and returns the column's index.

Performs a case insensitive comparison and uses the first matching column. If column could not be found, this function returns MAXU.

Definition at line 277 of file FlatFileExecStreamImpl.cpp.

References columnNames, and MAXU.

Referenced by open().

00278 {
00279     for (uint i = 0; i < columnNames.size(); i++) {
00280         if (strcasecmp(name.c_str(), columnNames[i].c_str()) == 0) {
00281             return i;
00282         }
00283     }
00284     return MAXU;
00285 }

FlatFileRowDescriptor FlatFileExecStreamImpl::readTupleDescriptor ( const TupleDescriptor tupleDesc  )  [private]

Translates a TupleDescriptor into a FlatFileRowDescriptor.

The major attributes required for parsing a column are whether it is a character column (which can be quoted) and the maximum length of the column.

If the column is a character column, it's maximum length is determined by the maximum size of the column. A default length is used for other columns.

Parameters:
tupleDesc tuple descriptor used for inferring row descriptor

Definition at line 254 of file FlatFileExecStreamImpl.cpp.

References FLAT_FILE_MAX_NON_CHAR_VALUE_LEN, FLATFILE_MODE_DESCRIBE, StandardTypeDescriptor::isTextArray(), mode, rowDesc, and FlatFileRowDescriptor::setUnbounded().

Referenced by prepare().

00256 {
00257     StandardTypeDescriptorFactory typeFactory;
00258     FlatFileRowDescriptor rowDesc;
00259     for (uint i = 0; i < tupleDesc.size(); i++) {
00260         TupleAttributeDescriptor attr = tupleDesc[i];
00261         StandardTypeDescriptorOrdinal ordinal =
00262             StandardTypeDescriptorOrdinal(
00263                 attr.pTypeDescriptor->getOrdinal());
00264         if (StandardTypeDescriptor::isTextArray(ordinal)) {
00265             rowDesc.push_back(FlatFileColumnDescriptor(attr.cbStorage));
00266         } else {
00267             rowDesc.push_back(
00268                 FlatFileColumnDescriptor(FLAT_FILE_MAX_NON_CHAR_VALUE_LEN));
00269         }
00270     }
00271     if (mode == FLATFILE_MODE_DESCRIBE) {
00272         rowDesc.setUnbounded();
00273     }
00274     return rowDesc;
00275 }

void FlatFileExecStreamImpl::handleTuple ( FlatFileRowParseResult result,
TupleData tuple 
) [private]

Processes a row of input data.

For regular queries and sampling queries, this produces a tuple. However, for a describe, a tuple is not produced until the end.

Parameters:
result result of parsing text row
tuple tuple data

Definition at line 287 of file FlatFileExecStreamImpl.cpp.

References fieldSizes, FLATFILE_MODE_DESCRIBE, FlatFileRowParseResult::getColumn(), FlatFileRowParseResult::getColumnSize(), FlatFileRowParseResult::getReadCount(), isRowPending, lenient, logError(), max(), min(), mode, nRowsOutput, pParser, FlatFileRowParseResult::status, textDesc, FlatFileRowParseResult::TOO_FEW_COLUMNS, FlatFileRowParseResult::TOO_MANY_COLUMNS, and trim.

Referenced by execute().

00290 {
00291     TupleData *pTupleData = &tuple;
00292 
00293     // Describe array is initialized here, because describe requires
00294     // an unbounded scan, and the number of fields are not known
00295     // until the scan is in progress. Note that we use the first row
00296     // to determine how many field sizes to return, an imperfect guess.
00297     if (mode == FLATFILE_MODE_DESCRIBE) {
00298         if (fieldSizes.size() == 0) {
00299             fieldSizes.resize(result.getReadCount(), 0);
00300         }
00301         // If not lenient, check for rows with wrong number of columns
00302         if ((!lenient) && fieldSizes.size() != result.getReadCount()) {
00303             FlatFileRowParseResult detail = result;
00304             if (detail.getReadCount() > fieldSizes.size()) {
00305                 detail.status = FlatFileRowParseResult::TOO_MANY_COLUMNS;
00306             } else {
00307                 detail.status = FlatFileRowParseResult::TOO_FEW_COLUMNS;
00308             }
00309             logError(detail);
00310             return;
00311         }
00312     }
00313 
00314     // Prepare values for returning
00315     pParser->stripQuoting(result, trim);
00316     for (uint i = 0; i < result.getReadCount(); i++) {
00317         if (mode == FLATFILE_MODE_DESCRIBE) {
00318             if (i < fieldSizes.size()) {
00319                 fieldSizes[i] = max(fieldSizes[i], result.getColumnSize(i));
00320             }
00321             continue;
00322         }
00323         (*pTupleData)[i].pData = (PConstBuffer) result.getColumn(i);
00324         // quietly truncate long columns
00325         (*pTupleData)[i].cbData =
00326             std::min(result.getColumnSize(i), textDesc[i].cbStorage);
00327     }
00328 
00329     if (mode != FLATFILE_MODE_DESCRIBE) {
00330         isRowPending = true;
00331     } else {
00332         // in describe mode, use this to count how many rows have been
00333         // read, though this is an abuse of the variable's intended purpose
00334         nRowsOutput++;
00335     }
00336 }

void FlatFileExecStreamImpl::describeStream ( TupleData tupleData  )  [private]

Based on rows sampled, generate an output row with a description of the stream.

Definition at line 338 of file FlatFileExecStreamImpl.cpp.

References dataFilePath, describeResult, fieldSizes, and isRowPending.

Referenced by execute().

00339 {
00340     if (fieldSizes.size() == 0) {
00341         throw FennelExcn(
00342             FennelResource::instance().flatfileDescribeFailed(dataFilePath));
00343     }
00344 
00345     std::ostringstream oss;
00346     for (int i = 0; i < fieldSizes.size(); i++) {
00347         oss << fieldSizes[i];
00348         if (i != fieldSizes.size() - 1) {
00349             oss << " ";
00350         }
00351     }
00352     // NOTE: this newly created string is saved as part of the stream
00353     // to avoid being popped off the stack
00354     describeResult = oss.str();
00355     const char *value = describeResult.c_str();
00356     uint cbValue = describeResult.size() * sizeof(char);
00357 
00358     assert(tupleData.size() == 1);
00359     tupleData[0].pData = (PConstBuffer) value;
00360     tupleData[0].cbData = cbValue;
00361     isRowPending = true;
00362 }

void FlatFileExecStreamImpl::logError ( const FlatFileRowParseResult result  )  [private]

Logs a single error to file.

The reason for the error is read from the row parse result.

Definition at line 364 of file FlatFileExecStreamImpl.cpp.

References FlatFileRowParseResult::INCOMPLETE_COLUMN, FlatFileRowParseResult::NO_COLUMN_DELIM, reason, FlatFileRowParseResult::ROW_TOO_LARGE, FlatFileRowParseResult::status, FlatFileRowParseResult::TOO_FEW_COLUMNS, and FlatFileRowParseResult::TOO_MANY_COLUMNS.

Referenced by execute(), handleTuple(), and open().

00365 {
00366     switch (result.status) {
00367     case FlatFileRowParseResult::INCOMPLETE_COLUMN:
00368         reason = FennelResource::instance().incompleteColumn();
00369         break;
00370     case FlatFileRowParseResult::ROW_TOO_LARGE:
00371         reason = FennelResource::instance().rowTextTooLong();
00372         break;
00373     case FlatFileRowParseResult::NO_COLUMN_DELIM:
00374         reason = FennelResource::instance().noColumnDelimiter();
00375         break;
00376     case FlatFileRowParseResult::TOO_FEW_COLUMNS:
00377         reason = FennelResource::instance().tooFewColumns();
00378         break;
00379     case FlatFileRowParseResult::TOO_MANY_COLUMNS:
00380         reason = FennelResource::instance().tooManyColumns();
00381         break;
00382     default:
00383         permAssert(false);
00384     }
00385     logError(reason, result);
00386 }

void FlatFileExecStreamImpl::logError ( const std::string  reason,
const FlatFileRowParseResult result 
) [private]

Logs a single error to file.

Definition at line 388 of file FlatFileExecStreamImpl.cpp.

References TupleData::compute(), FlatFileRowParseResult::current, errorDesc, errorTuple, MAX_ROW_ERROR_TEXT_WIDTH, min(), FlatFileRowParseResult::next, ErrorSource::postError(), ROW_ERROR, and STANDARD_TYPE_VARCHAR.

00391 {
00392     this->reason = reason;
00393 
00394     // initialize logging objects
00395     if (errorDesc.size() == 0) {
00396         // TODO: get project specific type factory
00397         StandardTypeDescriptorFactory typeFactory;
00398         StoredTypeDescriptor const &typeDesc =
00399             typeFactory.newDataType(STANDARD_TYPE_VARCHAR);
00400         bool nullable = true;
00401 
00402         errorDesc.push_back(
00403             TupleAttributeDescriptor(
00404                 typeDesc,
00405                 nullable,
00406                 MAX_ROW_ERROR_TEXT_WIDTH));
00407 
00408         errorTuple.compute(errorDesc);
00409     }
00410 
00411     uint length = result.next - result.current;
00412     length = std::min(length, MAX_ROW_ERROR_TEXT_WIDTH);
00413     errorTuple[0].pData = (PConstBuffer) result.current;
00414     errorTuple[0].cbData = length;
00415 
00416     postError(ROW_ERROR, reason, errorDesc, errorTuple, -1);
00417 }

void FlatFileExecStreamImpl::checkRowDelimiter (  )  [private]

Throws an error if a row delimiter was not found.

Definition at line 419 of file FlatFileExecStreamImpl.cpp.

References dataFilePath, lastResult, FlatFileRowParseResult::nRowDelimsRead, and pBuffer.

Referenced by open().

00420 {
00421     if (pBuffer->isDone() && lastResult.nRowDelimsRead == 0) {
00422         throw FennelExcn(
00423             FennelResource::instance().noRowDelimiter(dataFilePath));
00424     }
00425 }

void FlatFileExecStreamImpl::prepare ( FlatFileExecStreamParams const &  params  )  [virtual]

Implements FlatFileExecStream.

Definition at line 43 of file FlatFileExecStreamImpl.cpp.

References SegPageLock::accessSegment(), bufferLock, FlatFileExecStreamParams::columnNames, columnNames, TupleData::compute(), FlatFileExecStreamParams::dataFilePath, dataFilePath, dataTuple, FlatFileExecStreamParams::escapeChar, FlatFileExecStreamParams::fieldDelim, FlatFileExecStreamParams::header, header, FlatFileExecStreamParams::lenient, lenient, FlatFileExecStreamParams::mapped, mapped, FlatFileExecStreamParams::mode, mode, FlatFileExecStreamParams::numRowsScan, numRowsScan, SingleOutputExecStreamParams::outputTupleDesc, pBuffer, SingleOutputExecStream::pOutAccessor, pParser, SingleOutputExecStream::prepare(), FlatFileExecStreamParams::quoteChar, readTupleDescriptor(), FlatFileExecStreamParams::rowDelim, rowDesc, ExecStreamParams::scratchAccessor, scratchAccessor, FlatFileRowDescriptor::setLenient(), textDesc, FlatFileExecStreamParams::trim, and trim.

00045 {
00046     SingleOutputExecStream::prepare(params);
00047 
00048     header = params.header;
00049     dataFilePath = params.dataFilePath;
00050     lenient = params.lenient;
00051     trim = params.trim;
00052     mapped = params.mapped;
00053     columnNames = params.columnNames;
00054 
00055     dataTuple.compute(pOutAccessor->getTupleDesc());
00056 
00057     scratchAccessor = params.scratchAccessor;
00058     bufferLock.accessSegment(scratchAccessor);
00059 
00060     mode = params.mode;
00061     rowDesc = readTupleDescriptor(pOutAccessor->getTupleDesc());
00062     rowDesc.setLenient(lenient);
00063     pBuffer.reset(
00064         new FlatFileBuffer(params.dataFilePath),
00065         ClosableObjectDestructor());
00066     pParser.reset(new FlatFileParser(
00067                       params.fieldDelim, params.rowDelim,
00068                       params.quoteChar, params.escapeChar,
00069                       params.trim));
00070 
00071     numRowsScan = params.numRowsScan;
00072     textDesc = params.outputTupleDesc;
00073 }

void FlatFileExecStreamImpl::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity 
) [virtual]

Reimplemented from ExecStream.

Definition at line 75 of file FlatFileExecStreamImpl.cpp.

References ExecStream::getResourceRequirements(), and ExecStreamResourceQuantity::nCachePages.

00078 {
00079     SingleOutputExecStream::getResourceRequirements(minQuantity,optQuantity);
00080     minQuantity.nCachePages += 2;
00081     optQuantity = minQuantity;
00082 }

void FlatFileExecStreamImpl::open ( bool  restart  )  [virtual]

Opens this stream, acquiring any resources needed in order to be able to fetch data.

A precondition is that input streams must already be opened. A stream can be closed and reopened.

Parameters:
restart if true, the stream must be already open, and should reset itself to start from the beginning of its result set

Reimplemented from SingleOutputExecStream.

Definition at line 84 of file FlatFileExecStreamImpl.cpp.

References SegPageLock::allocatePage(), bufferLock, checkRowDelimiter(), dataFilePath, done, findField(), FLAT_FILE_MAX_COLUMN_NAME_LEN, CachePage::getCache(), FlatFileRowParseResult::getColumn(), FlatFileRowParseResult::getColumnSize(), FennelExcn::getMessage(), SegPageLock::getPage(), Cache::getPageSize(), FlatFileRowParseResult::getReadCount(), CachePage::getWritableData(), header, isMAXU(), isRowPending, lastResult, lenient, logError(), mapped, MAXU, ExecStream::name, FlatFileRowParseResult::next, next, FlatFileRowParseResult::NO_STATUS, nRowErrors, nRowsOutput, SingleOutputExecStream::open(), pBuffer, pBufferStorage, pParser, reason, releaseResources(), FlatFileRowParseResult::reset(), rowDesc, FlatFileRowDescriptor::setLenient(), FlatFileRowDescriptor::setMap(), FlatFileRowDescriptor::setUnbounded(), and FlatFileRowParseResult::status.

00085 {
00086     if (restart) {
00087         releaseResources();
00088     }
00089     SingleOutputExecStream::open(restart);
00090 
00091     if (!restart) {
00092         bufferLock.allocatePage();
00093         uint cbPageSize = bufferLock.getPage().getCache().getPageSize();
00094         pBufferStorage = bufferLock.getPage().getWritableData();
00095         pBuffer->setStorage((char*)pBufferStorage, cbPageSize);
00096     }
00097     pBuffer->open();
00098     pBuffer->read();
00099     next = pBuffer->getReadPtr();
00100     isRowPending = false;
00101     nRowsOutput = nRowErrors = 0;
00102     lastResult.reset();
00103 
00104     if (header) {
00105         FlatFileRowDescriptor headerDesc;
00106         for (uint i = 0; i < rowDesc.size(); i++) {
00107             headerDesc.push_back(
00108                 FlatFileColumnDescriptor(
00109                     FLAT_FILE_MAX_COLUMN_NAME_LEN));
00110         }
00111         headerDesc.setLenient(lenient);
00112         if (mapped) {
00113             headerDesc.setUnbounded();
00114         }
00115         pParser->scanRow(
00116             pBuffer->getReadPtr(), pBuffer->getSize(), headerDesc, lastResult);
00117         pBuffer->setReadPtr(lastResult.next);
00118         if (lastResult.status != FlatFileRowParseResult::NO_STATUS) {
00119             logError(lastResult);
00120             try {
00121                 checkRowDelimiter();
00122             } catch (FennelExcn e) {
00123                 reason = e.getMessage();
00124             }
00125             throw FennelExcn(
00126                 FennelResource::instance().flatfileNoHeader(
00127                     dataFilePath, reason));
00128         }
00129 
00130         // Generate mapping from text file columns to output columns.
00131         // Match names in the header with output field names. Names in
00132         // the header are always trimmed.
00133         if (mapped) {
00134             if (! lenient) {
00135                 throw FennelExcn(
00136                     FennelResource::instance()
00137                     .flatfileMappedRequiresLenient());
00138             }
00139 
00140             pParser->stripQuoting(lastResult, true);
00141             uint nFields = lastResult.getReadCount();
00142             int found = 0;
00143 
00144             VectorOfUint columnMap;
00145             columnMap.resize(nFields);
00146             for (uint i = 0; i < nFields; i++) {
00147                 char *n = lastResult.getColumn(i);
00148                 if (n == NULL) {
00149                     columnMap[i] = MAXU;
00150                 } else {
00151                     std::string name(
00152                         n,
00153                         lastResult.getColumnSize(i));
00154                     columnMap[i] = findField(name);
00155                     if (!isMAXU(columnMap[i])) {
00156                         found++;
00157                     }
00158                 }
00159             }
00160             if (found == 0) {
00161                 throw FennelExcn(
00162                     FennelResource::instance().flatfileNoMappedColumns(
00163                         std::string(" "),
00164                         std::string(" ")));
00165             }
00166             rowDesc.setMap(columnMap);
00167         }
00168     }
00169 
00170     done = false;
00171 }

ExecStreamResult FlatFileExecStreamImpl::execute ( ExecStreamQuantum const &  quantum  )  [virtual]

Executes this stream.

Parameters:
quantum governs the maximum amount of execution to perform
Returns:
code indicating reason execution ceased

Implements ExecStream.

Definition at line 173 of file FlatFileExecStreamImpl.cpp.

References dataTuple, describeStream(), done, EXECBUF_EOS, EXECBUF_OVERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_EOS, FLATFILE_MODE_DESCRIBE, handleTuple(), FlatFileRowParseResult::INCOMPLETE_COLUMN, isRowPending, lastResult, logError(), mode, FlatFileRowParseResult::next, FlatFileRowParseResult::NO_COLUMN_DELIM, FlatFileRowParseResult::NO_STATUS, nRowErrors, nRowsOutput, ExecStreamQuantum::nTuplesMax, numRowsScan, pBuffer, SingleOutputExecStream::pOutAccessor, pParser, FlatFileRowParseResult::ROW_TOO_LARGE, rowDesc, FlatFileRowParseResult::status, FlatFileRowParseResult::TOO_FEW_COLUMNS, and FlatFileRowParseResult::TOO_MANY_COLUMNS.

00175 {
00176     // detect whether scan was previously finished
00177     if (done && !isRowPending) {
00178         pOutAccessor->markEOS();
00179         return EXECRC_EOS;
00180     }
00181     // detect whether output buffer is capable of accepting more data
00182     if (pOutAccessor->getState() == EXECBUF_OVERFLOW
00183         || pOutAccessor->getState() == EXECBUF_EOS) {
00184         return EXECRC_BUF_OVERFLOW;
00185     }
00186 
00187     // read up to the number of (good or bad) tuples specified by quantum
00188     for (uint nTuples = 0; nTuples < quantum.nTuplesMax;) {
00189         // ready the next row for output
00190         while (!isRowPending) {
00191             // check quantum, since this loop doesn't break until a good
00192             // row is read
00193             if (nTuples >= quantum.nTuplesMax) {
00194                 break;
00195             }
00196 
00197             if ((numRowsScan > 0 && numRowsScan == nRowsOutput)
00198                 || pBuffer->isDone())
00199             {
00200                 done = true;
00201                 break;
00202             }
00203             pParser->scanRow(
00204                 pBuffer->getReadPtr(),pBuffer->getSize(),rowDesc,lastResult);
00205             nTuples++;
00206 
00207             switch (lastResult.status) {
00208             case FlatFileRowParseResult::INCOMPLETE_COLUMN:
00209                 if (pBuffer->isFull()) {
00210                     lastResult.status = FlatFileRowParseResult::ROW_TOO_LARGE;
00211                 } else if (!pBuffer->isComplete()) {
00212                     pBuffer->read();
00213                     continue;
00214                 }
00215             case FlatFileRowParseResult::NO_COLUMN_DELIM:
00216             case FlatFileRowParseResult::TOO_FEW_COLUMNS:
00217             case FlatFileRowParseResult::TOO_MANY_COLUMNS:
00218                 logError(lastResult);
00219                 nRowErrors++;
00220                 pBuffer->setReadPtr(lastResult.next);
00221                 continue;
00222             case FlatFileRowParseResult::NO_STATUS:
00223                 handleTuple(lastResult, dataTuple);
00224                 pBuffer->setReadPtr(lastResult.next);
00225                 break;
00226             default:
00227                 permAssert(false);
00228             }
00229         }
00230 
00231         // describe produces one row after it's done reading input
00232         if (mode == FLATFILE_MODE_DESCRIBE && done && !isRowPending) {
00233             describeStream(dataTuple);
00234         }
00235 
00236         // try to output pending rows
00237         if (isRowPending) {
00238             if (!pOutAccessor->produceTuple(dataTuple)) {
00239                 return EXECRC_BUF_OVERFLOW;
00240             }
00241             isRowPending = false;
00242             nRowsOutput++;
00243         }
00244 
00245         // close stream if no more rows are available
00246         if (done) {
00247             pOutAccessor->markEOS();
00248             return EXECRC_EOS;
00249         }
00250     }
00251     return EXECRC_QUANTUM_EXPIRED;
00252 }

FlatFileExecStream * FlatFileExecStream::newFlatFileExecStream (  )  [static, inherited]

Factory method.

Returns:
new FlatFileExecStream instance

Definition at line 34 of file FlatFileExecStreamImpl.cpp.

Referenced by FlatFileExecStreamTest::testStream(), and ExecStreamFactory::visit().

00035 {
00036     return new FlatFileExecStreamImpl();
00037 }

void SingleOutputExecStream::prepare ( SingleOutputExecStreamParams const &  params  )  [virtual, inherited]

Definition at line 48 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::getOutputBufProvision(), SingleOutputExecStreamParams::outputTupleDesc, SingleOutputExecStreamParams::outputTupleFormat, SingleOutputExecStream::pOutAccessor, and ExecStream::prepare().

Referenced by BTreeExecStream::prepare(), prepare(), ValuesExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), ConfluenceExecStream::prepare(), and ConduitExecStream::prepare().

00049 {
00050     ExecStream::prepare(params);
00051     assert(pOutAccessor);
00052     assert(pOutAccessor->getProvision() == getOutputBufProvision());
00053     if (pOutAccessor->getTupleDesc().empty()) {
00054         assert(!params.outputTupleDesc.empty());
00055         pOutAccessor->setTupleShape(
00056             params.outputTupleDesc,
00057             params.outputTupleFormat);
00058     }
00059 }

void ExecStream::prepare ( ExecStreamParams const &  params  )  [virtual, inherited]

Prepares this stream for execution.

A precondition is that input streams must already be defined and prepared. As an effect of this call, the tuple shape should be defined for all output buffers and remain unchanged for the lifetime of the stream. This method is only ever called once, before the first open. Although this method is virtual, derived classes may choose to define an overloaded version instead with a specialized covariant parameter class.

Parameters:
params instance of stream parameterization class which should be used to prepare this stream

Definition at line 84 of file ExecStream.cpp.

References ExecStreamGraph::getDynamicParamManager(), SegmentAccessor::pCacheAccessor, ExecStreamParams::pCacheAccessor, ExecStream::pDynamicParamManager, ExecStream::pGraph, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStreamParams::scratchAccessor.

Referenced by JavaTransformExecStream::prepare(), SingleOutputExecStream::prepare(), and SingleInputExecStream::prepare().

00085 {
00086     if (pGraph) {
00087         pDynamicParamManager = pGraph->getDynamicParamManager();
00088     }
00089     pQuotaAccessor = params.pCacheAccessor;
00090     pScratchQuotaAccessor = params.scratchAccessor.pCacheAccessor;
00091 }

void SingleOutputExecStream::setInputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  inAccessors  )  [virtual, inherited]

Initializes the buffer accessors for inputs to this stream.

This method is only ever called once, before prepare.

Parameters:
inAccessors buffer accessors ordered by input stream

Implements ExecStream.

Reimplemented in ConduitExecStream, and ConfluenceExecStream.

Definition at line 35 of file SingleOutputExecStream.cpp.

00037 {
00038     assert(inAccessors.size() == 0);
00039 }

void SingleOutputExecStream::setOutputBufAccessors ( std::vector< SharedExecStreamBufAccessor > const &  outAccessors  )  [virtual, inherited]

Initializes the buffer accessors for outputs from this stream.

This method is only ever called once, before prepare.

Parameters:
outAccessors buffer accessors ordered by output stream

Implements ExecStream.

Reimplemented in ConduitExecStream.

Definition at line 41 of file SingleOutputExecStream.cpp.

References SingleOutputExecStream::pOutAccessor.

Referenced by ConduitExecStream::setOutputBufAccessors().

00043 {
00044     assert(outAccessors.size() == 1);
00045     pOutAccessor = outAccessors[0];
00046 }

ExecStreamBufProvision SingleOutputExecStream::getOutputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream is capable of when producing tuples.

Returns:
supported model; default is BUFPROV_NONE

Reimplemented from ExecStream.

Reimplemented in BarrierExecStream, DoubleBufferExecStream, MergeExecStream, MockResourceExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferReaderExecStream, ValuesExecStream, FtrsTableWriterExecStream, and LcsClusterAppendExecStream.

Definition at line 69 of file SingleOutputExecStream.cpp.

References BUFPROV_CONSUMER.

Referenced by SingleOutputExecStream::prepare().

00070 {
00071     return BUFPROV_CONSUMER;
00072 }

bool ExecStream::canEarlyClose (  )  [virtual, inherited]

Returns:
true if the stream can be closed early

Reimplemented in SegBufferWriterExecStream.

Definition at line 49 of file ExecStream.cpp.

00050 {
00051     return true;
00052 }

ExecStreamGraph & ExecStream::getGraph (  )  const [inline, inherited]

Returns:
reference to containing graph

Definition at line 293 of file ExecStream.h.

References ExecStream::pGraph.

Referenced by ExternalSortExecStreamImpl::execute(), JavaSinkExecStream::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamFetch(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00294 {
00295     assert(pGraph);
00296     return *pGraph;
00297 }

ExecStreamId ExecStream::getStreamId (  )  const [inline, inherited]

Returns:
the identifier for this stream within containing graph

Definition at line 288 of file ExecStream.h.

References ExecStream::id.

Referenced by ExternalSortExecStreamImpl::execute(), SegBufferWriterExecStream::execute(), SegBufferExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), Java_net_sf_farrago_fennel_FennelStorage_tupleStreamTransformFetch(), JavaTransformExecStream::open(), SingleInputExecStream::open(), ConfluenceExecStream::open(), CartesianJoinExecStream::prepare(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), LbmMinusExecStream::restartSubtrahends(), ExecStreamScheduler::tracePostExecution(), ExecStreamScheduler::tracePreExecution(), ExecStreamScheduler::traceStreamBuffers(), and ParallelExecStreamScheduler::tryExecuteTask().

00289 {
00290     return id;
00291 }

void ExecStream::getResourceRequirements ( ExecStreamResourceQuantity minQuantity,
ExecStreamResourceQuantity optQuantity,
ExecStreamResourceSettingType optType 
) [virtual, inherited]

Determines resource requirements for this stream.

Default implementation declares zero resource requirements.

Parameters:
minQuantity receives the minimum resource quantity needed by this stream in order to execute
optQuantity receives the resource quantity needed by this stream in order to execute optimally
optType Receives the value indicating the accuracy of the optQuantity parameter. This parameter is optional and defaults to EXEC_RESOURCE_ACCURATE if omitted. If the optimum setting is an estimate or no value can be specified (e.g., due to lack of statistics), then this parameter needs to be used to indicate a non-accurate optimum resource setting.

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 93 of file ExecStream.cpp.

References EXEC_RESOURCE_ACCURATE.

Referenced by ExternalSortExecStreamImpl::getResourceRequirements(), LcsRowScanBaseExecStream::getResourceRequirements(), LcsClusterAppendExecStream::getResourceRequirements(), LbmUnionExecStream::getResourceRequirements(), LbmSplicerExecStream::getResourceRequirements(), LbmGeneratorExecStream::getResourceRequirements(), LbmChopperExecStream::getResourceRequirements(), LhxJoinExecStream::getResourceRequirements(), LhxAggExecStream::getResourceRequirements(), FtrsTableWriterExecStream::getResourceRequirements(), BTreeReadExecStream::getResourceRequirements(), BTreeInsertExecStream::getResourceRequirements(), getResourceRequirements(), SegBufferWriterExecStream::getResourceRequirements(), SegBufferReaderExecStream::getResourceRequirements(), SegBufferExecStream::getResourceRequirements(), ScratchBufferExecStream::getResourceRequirements(), and DoubleBufferExecStream::getResourceRequirements().

00097 {
00098     getResourceRequirements(minQuantity, optQuantity);
00099     optType = EXEC_RESOURCE_ACCURATE;
00100 }

void ExecStream::setResourceAllocation ( ExecStreamResourceQuantity quantity  )  [virtual, inherited]

Sets current resource allocation for this stream.

If called while the stream is open, this indicates a request for the stream to dynamically adjust its memory usage. If the stream is incapable of honoring the request, it should update quantity with the actual amounts still in use.

Parameters:
quantity allocated resource quantity

Reimplemented in MockResourceExecStream, BTreePrefetchSearchExecStream, LhxAggExecStream, LhxJoinExecStream, LbmGeneratorExecStream, LbmUnionExecStream, and ExternalSortExecStreamImpl.

Definition at line 111 of file ExecStream.cpp.

References ExecStreamResourceQuantity::nCachePages, ExecStream::pQuotaAccessor, ExecStream::pScratchQuotaAccessor, and ExecStream::resourceAllocation.

Referenced by ExternalSortExecStreamImpl::setResourceAllocation(), LbmUnionExecStream::setResourceAllocation(), LbmGeneratorExecStream::setResourceAllocation(), LhxJoinExecStream::setResourceAllocation(), LhxAggExecStream::setResourceAllocation(), and BTreePrefetchSearchExecStream::setResourceAllocation().

00113 {
00114     resourceAllocation = quantity;
00115     if (pQuotaAccessor) {
00116         pQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00117     }
00118     if (pScratchQuotaAccessor) {
00119         pScratchQuotaAccessor->setMaxLockedPages(quantity.nCachePages);
00120     }
00121 }

void ExecStream::setName ( std::string const &   )  [virtual, inherited]

Sets unique name of this stream.

Definition at line 157 of file ExecStream.cpp.

References ExecStream::name.

00158 {
00159     name = nameInit;
00160 }

std::string const & ExecStream::getName (  )  const [virtual, inherited]

Returns:
the name of this stream, as known by the optimizer

Definition at line 162 of file ExecStream.cpp.

References ExecStream::name.

Referenced by DfsTreeExecStreamScheduler::findNextConsumer(), ParallelExecStreamScheduler::readStream(), DfsTreeExecStreamScheduler::readStream(), ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00163 {
00164     return name;
00165 }

bool ExecStream::mayBlock (  )  const [virtual, inherited]

Queries whether this stream's implementation may block when execute() is called.

For accurate scheduling, non-blocking implementations are preferred; the scheduler must be aware of the potential for blocking so that it can allocate extra threads accordingly.

Returns:
whether stream may block; default is false

Definition at line 167 of file ExecStream.cpp.

00168 {
00169     return false;
00170 }

void ExecStream::checkAbort (  )  const [virtual, inherited]

Checks whether there is an abort request for this stream's scheduler.

Normally, streams don't need to check this, since the scheduler services abort requests in between quanta. However, streams which enter long-running loops need to check for themselves. If an abort is scheduled, this method will throw an AbortExcn automatically.

Definition at line 72 of file ExecStream.cpp.

References ExecStreamScheduler::checkAbort(), ExecStreamGraph::getScheduler(), and ExecStream::pGraph.

Referenced by LhxJoinExecStream::execute(), LhxAggExecStream::execute(), ExternalSortRunAccessor::fetch(), and ExternalSortMerger::fetch().

00073 {
00074     if (!pGraph) {
00075         return;
00076     }
00077     ExecStreamScheduler *pScheduler = pGraph->getScheduler();
00078     if (!pScheduler) {
00079         return;
00080     }
00081     pScheduler->checkAbort();
00082 }

ExecStreamBufProvision ExecStream::getOutputBufConversion (  )  const [virtual, inherited]

Queries the BufferProvision to which this stream needs its output to be converted, if any.

Returns:
required conversion; default is BUFPROV_NONE

Reimplemented in JavaTransformExecStream.

Definition at line 177 of file ExecStream.cpp.

References BUFPROV_NONE.

00178 {
00179     return BUFPROV_NONE;
00180 }

ExecStreamBufProvision ExecStream::getInputBufProvision (  )  const [virtual, inherited]

Queries the BufferProvision which this stream requires of its inputs when consuming their tuples.

Returns:
required model; default is BUFPROV_NONE

Reimplemented in ConfluenceExecStream, DoubleBufferExecStream, ScratchBufferExecStream, SegBufferExecStream, SegBufferWriterExecStream, SingleInputExecStream, and JavaTransformExecStream.

Definition at line 182 of file ExecStream.cpp.

References BUFPROV_NONE.

00183 {
00184     return BUFPROV_NONE;
00185 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void TraceSource::initTraceSource ( SharedTraceTarget  pTraceTarget,
std::string  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pTraceTarget the TraceTarget to which messages will be sent
name the name of this source

Definition at line 46 of file TraceSource.cpp.

References TraceSource::isTracing(), TraceSource::minimumLevel, TraceSource::name, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::beforeTestCase(), TestBase::TestBase(), and TraceSource::TraceSource().

00049 {
00050     assert(!pTraceTarget.get());
00051 
00052     pTraceTarget = pTraceTargetInit;
00053     name = nameInit;
00054     if (isTracing()) {
00055         minimumLevel = pTraceTarget->getSourceTraceLevel(name);
00056     } else {
00057         minimumLevel = TRACE_OFF;
00058     }
00059 }

void TraceSource::trace ( TraceLevel  level,
std::string  message 
) const [inherited]

Records a trace message.

Normally only called via FENNEL_TRACE.

Parameters:
level severity level of event being trace
message the text of the message

Definition at line 61 of file TraceSource.cpp.

References TraceSource::getTraceTarget(), TraceSource::isTracing(), TraceSource::name, and TraceTarget::notifyTrace().

Referenced by Calculator::exec(), and ExecStreamScheduler::traceStreamBufferContents().

00062 {
00063     if (isTracing()) {
00064         getTraceTarget().notifyTrace(name,level,message);
00065     }
00066 }

bool TraceSource::isTracing (  )  const [inline, inherited]

Returns:
true iff tracing is enabled for this source

Definition at line 88 of file TraceSource.h.

Referenced by TraceSource::initTraceSource(), CalcExecStream::prepare(), and TraceSource::trace().

00089     {
00090         return pTraceTarget.get() ? true : false;
00091     }

bool TraceSource::isTracingLevel ( TraceLevel  level  )  const [inline, inherited]

Determines whether a particular level is being traced.

Parameters:
level trace level to test
Returns:
true iff tracing is enabled for the given level

Definition at line 100 of file TraceSource.h.

Referenced by ExecStreamScheduler::addGraph(), SimpleExecStreamGovernor::assignCachePages(), SimpleExecStreamGovernor::distributeCachePages(), Calculator::exec(), ExecStreamScheduler::ExecStreamScheduler(), LcsClusterNodeWriter::getLastClusterPageForWrite(), LcsClusterNodeWriter::moveFromTempToIndex(), JavaSinkExecStream::stuffByteBuffer(), and ExecStreamScheduler::traceStreamBuffers().

00101     {
00102         return level >= minimumLevel;
00103     }

TraceTarget& TraceSource::getTraceTarget (  )  const [inline, inherited]

Returns:
the TraceTarget for this source

Definition at line 108 of file TraceSource.h.

Referenced by TraceSource::trace().

00109     {
00110         assert(isTracing());
00111         return *(pTraceTarget.get());
00112     }

SharedTraceTarget TraceSource::getSharedTraceTarget (  )  const [inline, inherited]

Returns:
the SharedTraceTarget for this source

Definition at line 117 of file TraceSource.h.

Referenced by Database::init(), LcsClusterAppendExecStream::initLoad(), and CalcExecStream::prepare().

00118     {
00119         return pTraceTarget;
00120     }

std::string TraceSource::getTraceSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also TraceSources.

Returns:
the name

Definition at line 127 of file TraceSource.h.

Referenced by LcsClusterAppendExecStream::initLoad().

00128     {
00129         return name;
00130     }

void TraceSource::setTraceSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137     {
00138         name = n;
00139     }

TraceLevel TraceSource::getMinimumTraceLevel (  )  const [inline, inherited]

Definition at line 141 of file TraceSource.h.

00142     {
00143         return minimumLevel;
00144     }

void TraceSource::disableTracing (  )  [inherited]

Definition at line 68 of file TraceSource.cpp.

References TraceSource::minimumLevel, TraceSource::pTraceTarget, and TRACE_OFF.

Referenced by TestBase::afterTestCase().

00069 {
00070     pTraceTarget.reset();
00071     minimumLevel = TRACE_OFF;
00072 }

void ErrorSource::initErrorSource ( SharedErrorTarget  pErrorTarget,
const std::string &  name 
) [virtual, inherited]

For use when initialization has to be deferred until after construction.

Parameters:
pErrorTarget the ErrorTarget to which errors will be posted
name the name of this source

Definition at line 47 of file ErrorSource.cpp.

References ErrorSource::name, and ErrorSource::pErrorTarget.

Referenced by ErrorSource::ErrorSource().

00050 {
00051     pErrorTarget = pErrorTargetInit;
00052     name = nameInit;
00053 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
void *  address,
long  capacity,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 55 of file ErrorSource.cpp.

References ErrorSource::getErrorTarget(), ErrorSource::hasTarget(), ErrorSource::name, and ErrorTarget::notifyError().

Referenced by logError(), ErrorSource::postError(), and LbmSplicerExecStream::postViolation().

00058 {
00059     if (hasTarget()) {
00060         getErrorTarget().notifyError(
00061             name, level, message, address, capacity, index);
00062     }
00063 }

void ErrorSource::postError ( ErrorLevel  level,
const std::string &  message,
const TupleDescriptor errorDesc,
const TupleData errorTuple,
int  index 
) [inherited]

Posts an exception, such as a row exception.

See also:
ErrorTarget for a description of the parameters

Definition at line 65 of file ErrorSource.cpp.

References TupleAccessor::compute(), ErrorSource::errorAccessor, FixedBuffer, TupleAccessor::getByteCount(), TupleAccessor::getMaxByteCount(), ErrorSource::hasTarget(), TupleAccessor::marshal(), ErrorSource::pErrorBuf, and ErrorSource::postError().

00068 {
00069     if (!hasTarget()) {
00070         return;
00071     }
00072 
00073     if (!pErrorBuf) {
00074         errorAccessor.compute(errorDesc);
00075         uint cbMax = errorAccessor.getMaxByteCount();
00076         pErrorBuf.reset(new FixedBuffer[cbMax]);
00077     }
00078 
00079     uint cbTuple = errorAccessor.getByteCount(errorTuple);
00080     errorAccessor.marshal(errorTuple, pErrorBuf.get());
00081     postError(level, message, pErrorBuf.get(), cbTuple, index);
00082 }

bool ErrorSource::hasTarget (  )  const [inline, inherited]

Returns:
true iff an error target has been set

Definition at line 112 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00113     {
00114         return pErrorTarget.get() ? true : false;
00115     }

ErrorTarget& ErrorSource::getErrorTarget (  )  const [inline, inherited]

Returns:
the ErrorTarget for this source

Definition at line 120 of file ErrorSource.h.

Referenced by ErrorSource::postError().

00121     {
00122         assert(hasTarget());
00123         return *(pErrorTarget.get());
00124     }

SharedErrorTarget ErrorSource::getSharedErrorTarget (  )  const [inline, inherited]

Returns:
the SharedErrorTarget for this source

Definition at line 129 of file ErrorSource.h.

00130     {
00131         return pErrorTarget;
00132     }

std::string ErrorSource::getErrorSourceName (  )  const [inline, inherited]

Gets the name of this source.

Useful to construct nested names for subcomponents that are also ErrorSources.

Returns:
the name

Definition at line 139 of file ErrorSource.h.

00140     {
00141         return name;
00142     }

void ErrorSource::setErrorSourceName ( std::string const &  n  )  [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 148 of file ErrorSource.h.

00149     {
00150         name = n;
00151     }

void ErrorSource::disableTarget (  )  [inherited]

Definition at line 84 of file ErrorSource.cpp.

References ErrorSource::pErrorTarget.

00085 {
00086     pErrorTarget.reset();
00087 }


Member Data Documentation

const uint FlatFileExecStreamImpl::MAX_ROW_ERROR_TEXT_WIDTH [static, private]

Definition at line 58 of file FlatFileExecStreamImpl.h.

Referenced by logError().

std::string FlatFileExecStreamImpl::dataFilePath [private]

Definition at line 61 of file FlatFileExecStreamImpl.h.

Referenced by checkRowDelimiter(), describeStream(), open(), and prepare().

bool FlatFileExecStreamImpl::header [private]

Definition at line 62 of file FlatFileExecStreamImpl.h.

Referenced by open(), and prepare().

bool FlatFileExecStreamImpl::lenient [private]

Definition at line 63 of file FlatFileExecStreamImpl.h.

Referenced by handleTuple(), open(), and prepare().

bool FlatFileExecStreamImpl::trim [private]

Definition at line 64 of file FlatFileExecStreamImpl.h.

Referenced by handleTuple(), and prepare().

bool FlatFileExecStreamImpl::mapped [private]

Definition at line 65 of file FlatFileExecStreamImpl.h.

Referenced by open(), and prepare().

std::vector<std::string> FlatFileExecStreamImpl::columnNames [private]

Definition at line 66 of file FlatFileExecStreamImpl.h.

Referenced by findField(), and prepare().

FlatFileRowDescriptor FlatFileExecStreamImpl::rowDesc [private]

Definition at line 68 of file FlatFileExecStreamImpl.h.

Referenced by execute(), open(), prepare(), and readTupleDescriptor().

SharedFlatFileBuffer FlatFileExecStreamImpl::pBuffer [private]

Definition at line 69 of file FlatFileExecStreamImpl.h.

Referenced by checkRowDelimiter(), execute(), open(), prepare(), and releaseResources().

PBuffer FlatFileExecStreamImpl::pBufferStorage [private]

Definition at line 70 of file FlatFileExecStreamImpl.h.

Referenced by open().

char* FlatFileExecStreamImpl::next [private]

Definition at line 71 of file FlatFileExecStreamImpl.h.

Referenced by open().

SharedFlatFileParser FlatFileExecStreamImpl::pParser [private]

Definition at line 72 of file FlatFileExecStreamImpl.h.

Referenced by execute(), handleTuple(), open(), and prepare().

FlatFileRowParseResult FlatFileExecStreamImpl::lastResult [private]

Definition at line 73 of file FlatFileExecStreamImpl.h.

Referenced by checkRowDelimiter(), execute(), and open().

TupleDescriptor FlatFileExecStreamImpl::textDesc [private]

Definition at line 74 of file FlatFileExecStreamImpl.h.

Referenced by handleTuple(), and prepare().

TupleData FlatFileExecStreamImpl::textTuple [private]

Definition at line 75 of file FlatFileExecStreamImpl.h.

TupleData FlatFileExecStreamImpl::dataTuple [private]

Definition at line 75 of file FlatFileExecStreamImpl.h.

Referenced by execute(), and prepare().

bool FlatFileExecStreamImpl::isRowPending [private]

Definition at line 76 of file FlatFileExecStreamImpl.h.

Referenced by describeStream(), execute(), handleTuple(), and open().

FlatFileMode FlatFileExecStreamImpl::mode [private]

Definition at line 79 of file FlatFileExecStreamImpl.h.

Referenced by execute(), handleTuple(), prepare(), and readTupleDescriptor().

int FlatFileExecStreamImpl::numRowsScan [private]

Definition at line 80 of file FlatFileExecStreamImpl.h.

Referenced by execute(), and prepare().

bool FlatFileExecStreamImpl::done [private]

Definition at line 81 of file FlatFileExecStreamImpl.h.

Referenced by execute(), and open().

VectorOfUint FlatFileExecStreamImpl::fieldSizes [private]

Definition at line 82 of file FlatFileExecStreamImpl.h.

Referenced by describeStream(), and handleTuple().

std::string FlatFileExecStreamImpl::describeResult [private]

Definition at line 83 of file FlatFileExecStreamImpl.h.

Referenced by describeStream().

SegPageLock FlatFileExecStreamImpl::bufferLock [private]

Definition at line 85 of file FlatFileExecStreamImpl.h.

Referenced by open(), and prepare().

SegmentAccessor FlatFileExecStreamImpl::scratchAccessor [private]

Definition at line 86 of file FlatFileExecStreamImpl.h.

Referenced by prepare().

uint FlatFileExecStreamImpl::nRowsOutput [private]

Definition at line 89 of file FlatFileExecStreamImpl.h.

Referenced by execute(), handleTuple(), and open().

uint FlatFileExecStreamImpl::nRowErrors [private]

Definition at line 89 of file FlatFileExecStreamImpl.h.

Referenced by execute(), and open().

std::string FlatFileExecStreamImpl::reason [private]

Definition at line 90 of file FlatFileExecStreamImpl.h.

Referenced by logError(), and open().

TupleDescriptor FlatFileExecStreamImpl::errorDesc [private]

Definition at line 91 of file FlatFileExecStreamImpl.h.

Referenced by logError().

TupleData FlatFileExecStreamImpl::errorTuple [private]

Definition at line 92 of file FlatFileExecStreamImpl.h.

Referenced by logError().

SharedExecStreamBufAccessor SingleOutputExecStream::pOutAccessor [protected, inherited]

Definition at line 56 of file SingleOutputExecStream.h.

Referenced by LcsClusterAppendExecStream::compress(), ExternalSortExecStreamImpl::execute(), LcsRowScanExecStream::execute(), LbmUnionExecStream::execute(), LbmNormalizerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), FtrsTableWriterExecStream::execute(), BTreeSortExecStream::execute(), BTreeSearchUniqueExecStream::execute(), BTreeScanExecStream::execute(), BTreePrefetchSearchExecStream::execute(), BTreeInsertExecStream::execute(), execute(), ValuesExecStream::execute(), UncollectExecStream::execute(), SortedAggExecStream::execute(), SegBufferReaderExecStream::execute(), ScratchBufferExecStream::execute(), ReshapeExecStream::execute(), MockResourceExecStream::execute(), MockProducerExecStream::execute(), MergeExecStream::execute(), DoubleBufferExecStream::execute(), CorrelationJoinExecStream::execute(), CopyExecStream::execute(), CollectExecStream::execute(), CartesianJoinExecStream::execute(), BernoulliSamplingExecStream::execute(), BarrierExecStream::execute(), CalcExecStream::execute(), LbmGeneratorExecStream::flushEntry(), MockProducerExecStream::getProducedRowCount(), ExternalSortExecStreamImpl::getResourceRequirements(), BTreeSearchExecStream::innerFetchLoop(), LbmUnionExecStream::open(), LbmChopperExecStream::open(), LbmBitOpExecStream::open(), SingleOutputExecStream::open(), SegBufferReaderExecStream::open(), SegBufferExecStream::open(), ScratchBufferExecStream::open(), DoubleBufferExecStream::open(), CollectExecStream::open(), SegBufferExecStream::openBufferForRead(), ConduitExecStream::precheckConduitBuffers(), LcsRowScanExecStream::prepare(), LcsRowScanBaseExecStream::prepare(), LcsClusterAppendExecStream::prepare(), LbmUnionExecStream::prepare(), LbmGeneratorExecStream::prepare(), LhxJoinExecStream::prepare(), LhxAggExecStream::prepare(), FtrsTableWriterExecStream::prepare(), prepare(), UncollectExecStream::prepare(), SortedAggExecStream::prepare(), SingleOutputExecStream::prepare(), ReshapeExecStream::prepare(), MockResourceExecStream::prepare(), MockProducerExecStream::prepare(), MergeExecStream::prepare(), CorrelationJoinExecStream::prepare(), ConduitExecStream::prepare(), CollectExecStream::prepare(), CartesianJoinExecStream::prepare(), BernoulliSamplingExecStream::prepare(), BarrierExecStream::prepare(), CalcExecStream::prepare(), SortedAggExecStream::produce(), LbmBitOpExecStream::producePendingOutput(), LbmUnionExecStream::produceTuple(), LbmNormalizerExecStream::produceTuple(), LbmMinusExecStream::produceTuple(), LbmChopperExecStream::produceTuple(), LbmBitOpExecStream::produceTuple(), LbmBitOpExecStream::readInput(), LbmMinusExecStream::readMinuendInput(), and SingleOutputExecStream::setOutputBufAccessors().

bool ExecStream::isOpen [protected, inherited]

Whether this stream is currently open.

Note that this is not quite the opposite of the inherited ClosableObject.needsClose, since a stream needs to be closed before destruction if it has been prepared but never opened.

Definition at line 61 of file ExecStream.h.

Referenced by ExecStream::closeImpl(), ExecStream::ExecStream(), and ExecStream::open().

ExecStreamGraph* ExecStream::pGraph [protected, inherited]

Dataflow graph containing this stream.

Note that we don't use a weak_ptr for this because it needs to be accessed frequently during execution, and the extra locking overhead would be frivolous.

Definition at line 68 of file ExecStream.h.

Referenced by ExecStream::checkAbort(), ExecStream::ExecStream(), CorrelationJoinExecStream::execute(), ExecStream::getGraph(), JavaTransformExecStream::open(), SingleInputExecStream::open(), MergeExecStream::open(), ExecStream::open(), CorrelationJoinExecStream::open(), ConfluenceExecStream::open(), ExecStream::prepare(), CartesianJoinExecStream::prepare(), and LbmMinusExecStream::restartSubtrahends().

ExecStreamId ExecStream::id [protected, inherited]

Identifier for this stream; local to its containing graph.

Definition at line 73 of file ExecStream.h.

Referenced by ExecStream::getStreamId().

std::string ExecStream::name [protected, inherited]

Name of stream, as known by optimizer.

Reimplemented from TraceSource.

Definition at line 78 of file ExecStream.h.

Referenced by ExecStream::ExecStream(), ExecStream::getName(), open(), and ExecStream::setName().

SharedDynamicParamManager ExecStream::pDynamicParamManager [protected, inherited]

The dynamic parameter manager available to this stream.

(Obtained at prepare() time. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 85 of file ExecStream.h.

Referenced by CorrelationJoinExecStream::close(), SegBufferReaderExecStream::closeImpl(), LbmUnionExecStream::execute(), LbmSplicerExecStream::execute(), LbmGeneratorExecStream::execute(), LbmChopperExecStream::execute(), CorrelationJoinExecStream::execute(), BarrierExecStream::execute(), LcsClusterReplaceExecStream::open(), LbmUnionExecStream::open(), LbmSplicerExecStream::open(), LbmGeneratorExecStream::open(), LbmBitOpExecStream::open(), BTreeSearchExecStream::open(), BTreeInsertExecStream::open(), SegBufferWriterExecStream::open(), SegBufferReaderExecStream::open(), NestedLoopJoinExecStream::open(), CorrelationJoinExecStream::open(), ExecStream::prepare(), CalcExecStream::prepare(), NestedLoopJoinExecStream::processLeftInput(), LbmSearchExecStream::reachedTupleLimit(), ReshapeExecStream::readDynamicParams(), SegBufferWriterExecStream::readReaderRefCount(), BTreeSearchExecStream::readSearchKey(), BTreeSearchExecStream::readUpperBoundKey(), LbmSearchExecStream::setAdditionalKeys(), and LbmBitOpExecStream::writeStartRidParamValue().

SharedLogicalTxn ExecStream::pTxn [protected, inherited]

The transaction embracing the stream.

Obtained at open() time; but not released at close() time, to allow TableWriters to replay a txn. Keep a shared pointer in case the stream is reassigned to another graph for execution; cf ExecStreamGraph::mergeFrom())

Definition at line 94 of file ExecStream.h.

Referenced by FtrsTableWriterExecStream::commitSavepoint(), FtrsTableWriterExecStream::createSavepoint(), FtrsTableWriterExecStream::open(), ExecStream::open(), and FtrsTableWriterExecStream::rollbackSavepoint().

ExecStreamResourceQuantity ExecStream::resourceAllocation [protected, inherited]

Resource quantities currently allocated to this stream.

Definition at line 100 of file ExecStream.h.

Referenced by ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pQuotaAccessor [protected, inherited]

CacheAccessor used for quota tracking.

Definition at line 105 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

SharedCacheAccessor ExecStream::pScratchQuotaAccessor [protected, inherited]

CacheAccessor used for scratch page quota tracking.

Definition at line 110 of file ExecStream.h.

Referenced by ExecStream::open(), ExecStream::prepare(), and ExecStream::setResourceAllocation().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:32 2009 for Fennel by  doxygen 1.5.1