FlatFileExecStreamImpl.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2004-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/common/FennelResource.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StoredTypeDescriptor.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029 
00030 #include "fennel/flatfile/FlatFileExecStreamImpl.h"
00031 
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $");
00033 
00034 FlatFileExecStream *FlatFileExecStream::newFlatFileExecStream()
00035 {
00036     return new FlatFileExecStreamImpl();
00037 }
00038 
00039 // NOTE: keep this consistent with the Farrago java file
00040 //   com.lucidera.farrago.namespace.flatfile.FlatFileFennelRel.java
00041 const uint FlatFileExecStreamImpl::MAX_ROW_ERROR_TEXT_WIDTH = 4000;
00042 
00043 void FlatFileExecStreamImpl::prepare(
00044     FlatFileExecStreamParams const &params)
00045 {
00046     SingleOutputExecStream::prepare(params);
00047 
00048     header = params.header;
00049     dataFilePath = params.dataFilePath;
00050     lenient = params.lenient;
00051     trim = params.trim;
00052     mapped = params.mapped;
00053     columnNames = params.columnNames;
00054 
00055     dataTuple.compute(pOutAccessor->getTupleDesc());
00056 
00057     scratchAccessor = params.scratchAccessor;
00058     bufferLock.accessSegment(scratchAccessor);
00059 
00060     mode = params.mode;
00061     rowDesc = readTupleDescriptor(pOutAccessor->getTupleDesc());
00062     rowDesc.setLenient(lenient);
00063     pBuffer.reset(
00064         new FlatFileBuffer(params.dataFilePath),
00065         ClosableObjectDestructor());
00066     pParser.reset(new FlatFileParser(
00067                       params.fieldDelim, params.rowDelim,
00068                       params.quoteChar, params.escapeChar,
00069                       params.trim));
00070 
00071     numRowsScan = params.numRowsScan;
00072     textDesc = params.outputTupleDesc;
00073 }
00074 
00075 void FlatFileExecStreamImpl::getResourceRequirements(
00076     ExecStreamResourceQuantity &minQuantity,
00077     ExecStreamResourceQuantity &optQuantity)
00078 {
00079     SingleOutputExecStream::getResourceRequirements(minQuantity,optQuantity);
00080     minQuantity.nCachePages += 2;
00081     optQuantity = minQuantity;
00082 }
00083 
00084 void FlatFileExecStreamImpl::open(bool restart)
00085 {
00086     if (restart) {
00087         releaseResources();
00088     }
00089     SingleOutputExecStream::open(restart);
00090 
00091     if (!restart) {
00092         bufferLock.allocatePage();
00093         uint cbPageSize = bufferLock.getPage().getCache().getPageSize();
00094         pBufferStorage = bufferLock.getPage().getWritableData();
00095         pBuffer->setStorage((char*)pBufferStorage, cbPageSize);
00096     }
00097     pBuffer->open();
00098     pBuffer->read();
00099     next = pBuffer->getReadPtr();
00100     isRowPending = false;
00101     nRowsOutput = nRowErrors = 0;
00102     lastResult.reset();
00103 
00104     if (header) {
00105         FlatFileRowDescriptor headerDesc;
00106         for (uint i = 0; i < rowDesc.size(); i++) {
00107             headerDesc.push_back(
00108                 FlatFileColumnDescriptor(
00109                     FLAT_FILE_MAX_COLUMN_NAME_LEN));
00110         }
00111         headerDesc.setLenient(lenient);
00112         if (mapped) {
00113             headerDesc.setUnbounded();
00114         }
00115         pParser->scanRow(
00116             pBuffer->getReadPtr(), pBuffer->getSize(), headerDesc, lastResult);
00117         pBuffer->setReadPtr(lastResult.next);
00118         if (lastResult.status != FlatFileRowParseResult::NO_STATUS) {
00119             logError(lastResult);
00120             try {
00121                 checkRowDelimiter();
00122             } catch (FennelExcn e) {
00123                 reason = e.getMessage();
00124             }
00125             throw FennelExcn(
00126                 FennelResource::instance().flatfileNoHeader(
00127                     dataFilePath, reason));
00128         }
00129 
00130         // Generate mapping from text file columns to output columns.
00131         // Match names in the header with output field names. Names in
00132         // the header are always trimmed.
00133         if (mapped) {
00134             if (! lenient) {
00135                 throw FennelExcn(
00136                     FennelResource::instance()
00137                     .flatfileMappedRequiresLenient());
00138             }
00139 
00140             pParser->stripQuoting(lastResult, true);
00141             uint nFields = lastResult.getReadCount();
00142             int found = 0;
00143 
00144             VectorOfUint columnMap;
00145             columnMap.resize(nFields);
00146             for (uint i = 0; i < nFields; i++) {
00147                 char *n = lastResult.getColumn(i);
00148                 if (n == NULL) {
00149                     columnMap[i] = MAXU;
00150                 } else {
00151                     std::string name(
00152                         n,
00153                         lastResult.getColumnSize(i));
00154                     columnMap[i] = findField(name);
00155                     if (!isMAXU(columnMap[i])) {
00156                         found++;
00157                     }
00158                 }
00159             }
00160             if (found == 0) {
00161                 throw FennelExcn(
00162                     FennelResource::instance().flatfileNoMappedColumns(
00163                         std::string(" "),
00164                         std::string(" ")));
00165             }
00166             rowDesc.setMap(columnMap);
00167         }
00168     }
00169 
00170     done = false;
00171 }
00172 
00173 ExecStreamResult FlatFileExecStreamImpl::execute(
00174     ExecStreamQuantum const &quantum)
00175 {
00176     // detect whether scan was previously finished
00177     if (done && !isRowPending) {
00178         pOutAccessor->markEOS();
00179         return EXECRC_EOS;
00180     }
00181     // detect whether output buffer is capable of accepting more data
00182     if (pOutAccessor->getState() == EXECBUF_OVERFLOW
00183         || pOutAccessor->getState() == EXECBUF_EOS) {
00184         return EXECRC_BUF_OVERFLOW;
00185     }
00186 
00187     // read up to the number of (good or bad) tuples specified by quantum
00188     for (uint nTuples = 0; nTuples < quantum.nTuplesMax;) {
00189         // ready the next row for output
00190         while (!isRowPending) {
00191             // check quantum, since this loop doesn't break until a good
00192             // row is read
00193             if (nTuples >= quantum.nTuplesMax) {
00194                 break;
00195             }
00196 
00197             if ((numRowsScan > 0 && numRowsScan == nRowsOutput)
00198                 || pBuffer->isDone())
00199             {
00200                 done = true;
00201                 break;
00202             }
00203             pParser->scanRow(
00204                 pBuffer->getReadPtr(),pBuffer->getSize(),rowDesc,lastResult);
00205             nTuples++;
00206 
00207             switch (lastResult.status) {
00208             case FlatFileRowParseResult::INCOMPLETE_COLUMN:
00209                 if (pBuffer->isFull()) {
00210                     lastResult.status = FlatFileRowParseResult::ROW_TOO_LARGE;
00211                 } else if (!pBuffer->isComplete()) {
00212                     pBuffer->read();
00213                     continue;
00214                 }
00215             case FlatFileRowParseResult::NO_COLUMN_DELIM:
00216             case FlatFileRowParseResult::TOO_FEW_COLUMNS:
00217             case FlatFileRowParseResult::TOO_MANY_COLUMNS:
00218                 logError(lastResult);
00219                 nRowErrors++;
00220                 pBuffer->setReadPtr(lastResult.next);
00221                 continue;
00222             case FlatFileRowParseResult::NO_STATUS:
00223                 handleTuple(lastResult, dataTuple);
00224                 pBuffer->setReadPtr(lastResult.next);
00225                 break;
00226             default:
00227                 permAssert(false);
00228             }
00229         }
00230 
00231         // describe produces one row after it's done reading input
00232         if (mode == FLATFILE_MODE_DESCRIBE && done && !isRowPending) {
00233             describeStream(dataTuple);
00234         }
00235 
00236         // try to output pending rows
00237         if (isRowPending) {
00238             if (!pOutAccessor->produceTuple(dataTuple)) {
00239                 return EXECRC_BUF_OVERFLOW;
00240             }
00241             isRowPending = false;
00242             nRowsOutput++;
00243         }
00244 
00245         // close stream if no more rows are available
00246         if (done) {
00247             pOutAccessor->markEOS();
00248             return EXECRC_EOS;
00249         }
00250     }
00251     return EXECRC_QUANTUM_EXPIRED;
00252 }
00253 
00254 FlatFileRowDescriptor FlatFileExecStreamImpl::readTupleDescriptor(
00255     const TupleDescriptor &tupleDesc)
00256 {
00257     StandardTypeDescriptorFactory typeFactory;
00258     FlatFileRowDescriptor rowDesc;
00259     for (uint i = 0; i < tupleDesc.size(); i++) {
00260         TupleAttributeDescriptor attr = tupleDesc[i];
00261         StandardTypeDescriptorOrdinal ordinal =
00262             StandardTypeDescriptorOrdinal(
00263                 attr.pTypeDescriptor->getOrdinal());
00264         if (StandardTypeDescriptor::isTextArray(ordinal)) {
00265             rowDesc.push_back(FlatFileColumnDescriptor(attr.cbStorage));
00266         } else {
00267             rowDesc.push_back(
00268                 FlatFileColumnDescriptor(FLAT_FILE_MAX_NON_CHAR_VALUE_LEN));
00269         }
00270     }
00271     if (mode == FLATFILE_MODE_DESCRIBE) {
00272         rowDesc.setUnbounded();
00273     }
00274     return rowDesc;
00275 }
00276 
00277 uint FlatFileExecStreamImpl::findField(const std::string &name)
00278 {
00279     for (uint i = 0; i < columnNames.size(); i++) {
00280         if (strcasecmp(name.c_str(), columnNames[i].c_str()) == 0) {
00281             return i;
00282         }
00283     }
00284     return MAXU;
00285 }
00286 
00287 void FlatFileExecStreamImpl::handleTuple(
00288     FlatFileRowParseResult &result,
00289     TupleData &tuple)
00290 {
00291     TupleData *pTupleData = &tuple;
00292 
00293     // Describe array is initialized here, because describe requires
00294     // an unbounded scan, and the number of fields are not known
00295     // until the scan is in progress. Note that we use the first row
00296     // to determine how many field sizes to return, an imperfect guess.
00297     if (mode == FLATFILE_MODE_DESCRIBE) {
00298         if (fieldSizes.size() == 0) {
00299             fieldSizes.resize(result.getReadCount(), 0);
00300         }
00301         // If not lenient, check for rows with wrong number of columns
00302         if ((!lenient) && fieldSizes.size() != result.getReadCount()) {
00303             FlatFileRowParseResult detail = result;
00304             if (detail.getReadCount() > fieldSizes.size()) {
00305                 detail.status = FlatFileRowParseResult::TOO_MANY_COLUMNS;
00306             } else {
00307                 detail.status = FlatFileRowParseResult::TOO_FEW_COLUMNS;
00308             }
00309             logError(detail);
00310             return;
00311         }
00312     }
00313 
00314     // Prepare values for returning
00315     pParser->stripQuoting(result, trim);
00316     for (uint i = 0; i < result.getReadCount(); i++) {
00317         if (mode == FLATFILE_MODE_DESCRIBE) {
00318             if (i < fieldSizes.size()) {
00319                 fieldSizes[i] = max(fieldSizes[i], result.getColumnSize(i));
00320             }
00321             continue;
00322         }
00323         (*pTupleData)[i].pData = (PConstBuffer) result.getColumn(i);
00324         // quietly truncate long columns
00325         (*pTupleData)[i].cbData =
00326             std::min(result.getColumnSize(i), textDesc[i].cbStorage);
00327     }
00328 
00329     if (mode != FLATFILE_MODE_DESCRIBE) {
00330         isRowPending = true;
00331     } else {
00332         // in describe mode, use this to count how many rows have been
00333         // read, though this is an abuse of the variable's intended purpose
00334         nRowsOutput++;
00335     }
00336 }
00337 
00338 void FlatFileExecStreamImpl::describeStream(TupleData &tupleData)
00339 {
00340     if (fieldSizes.size() == 0) {
00341         throw FennelExcn(
00342             FennelResource::instance().flatfileDescribeFailed(dataFilePath));
00343     }
00344 
00345     std::ostringstream oss;
00346     for (int i = 0; i < fieldSizes.size(); i++) {
00347         oss << fieldSizes[i];
00348         if (i != fieldSizes.size() - 1) {
00349             oss << " ";
00350         }
00351     }
00352     // NOTE: this newly created string is saved as part of the stream
00353     // to avoid being popped off the stack
00354     describeResult = oss.str();
00355     const char *value = describeResult.c_str();
00356     uint cbValue = describeResult.size() * sizeof(char);
00357 
00358     assert(tupleData.size() == 1);
00359     tupleData[0].pData = (PConstBuffer) value;
00360     tupleData[0].cbData = cbValue;
00361     isRowPending = true;
00362 }
00363 
00364 void FlatFileExecStreamImpl::logError(const FlatFileRowParseResult &result)
00365 {
00366     switch (result.status) {
00367     case FlatFileRowParseResult::INCOMPLETE_COLUMN:
00368         reason = FennelResource::instance().incompleteColumn();
00369         break;
00370     case FlatFileRowParseResult::ROW_TOO_LARGE:
00371         reason = FennelResource::instance().rowTextTooLong();
00372         break;
00373     case FlatFileRowParseResult::NO_COLUMN_DELIM:
00374         reason = FennelResource::instance().noColumnDelimiter();
00375         break;
00376     case FlatFileRowParseResult::TOO_FEW_COLUMNS:
00377         reason = FennelResource::instance().tooFewColumns();
00378         break;
00379     case FlatFileRowParseResult::TOO_MANY_COLUMNS:
00380         reason = FennelResource::instance().tooManyColumns();
00381         break;
00382     default:
00383         permAssert(false);
00384     }
00385     logError(reason, result);
00386 }
00387 
00388 void FlatFileExecStreamImpl::logError(
00389     const std::string reason,
00390     const FlatFileRowParseResult &result)
00391 {
00392     this->reason = reason;
00393 
00394     // initialize logging objects
00395     if (errorDesc.size() == 0) {
00396         // TODO: get project specific type factory
00397         StandardTypeDescriptorFactory typeFactory;
00398         StoredTypeDescriptor const &typeDesc =
00399             typeFactory.newDataType(STANDARD_TYPE_VARCHAR);
00400         bool nullable = true;
00401 
00402         errorDesc.push_back(
00403             TupleAttributeDescriptor(
00404                 typeDesc,
00405                 nullable,
00406                 MAX_ROW_ERROR_TEXT_WIDTH));
00407 
00408         errorTuple.compute(errorDesc);
00409     }
00410 
00411     uint length = result.next - result.current;
00412     length = std::min(length, MAX_ROW_ERROR_TEXT_WIDTH);
00413     errorTuple[0].pData = (PConstBuffer) result.current;
00414     errorTuple[0].cbData = length;
00415 
00416     postError(ROW_ERROR, reason, errorDesc, errorTuple, -1);
00417 }
00418 
00419 void FlatFileExecStreamImpl::checkRowDelimiter()
00420 {
00421     if (pBuffer->isDone() && lastResult.nRowDelimsRead == 0) {
00422         throw FennelExcn(
00423             FennelResource::instance().noRowDelimiter(dataFilePath));
00424     }
00425 }
00426 
00427 void FlatFileExecStreamImpl::closeImpl()
00428 {
00429     releaseResources();
00430     SingleOutputExecStream::closeImpl();
00431 }
00432 
00433 void FlatFileExecStreamImpl::releaseResources()
00434 {
00435     if (pBuffer) {
00436         pBuffer->close();
00437     }
00438 }
00439 
00440 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $");
00441 
00442 // End FlatFileExecStreamImpl.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1