00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/common/FennelResource.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StoredTypeDescriptor.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029
00030 #include "fennel/flatfile/FlatFileExecStreamImpl.h"
00031
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $");
00033
00034 FlatFileExecStream *FlatFileExecStream::newFlatFileExecStream()
00035 {
00036 return new FlatFileExecStreamImpl();
00037 }
00038
00039
00040
00041 const uint FlatFileExecStreamImpl::MAX_ROW_ERROR_TEXT_WIDTH = 4000;
00042
00043 void FlatFileExecStreamImpl::prepare(
00044 FlatFileExecStreamParams const ¶ms)
00045 {
00046 SingleOutputExecStream::prepare(params);
00047
00048 header = params.header;
00049 dataFilePath = params.dataFilePath;
00050 lenient = params.lenient;
00051 trim = params.trim;
00052 mapped = params.mapped;
00053 columnNames = params.columnNames;
00054
00055 dataTuple.compute(pOutAccessor->getTupleDesc());
00056
00057 scratchAccessor = params.scratchAccessor;
00058 bufferLock.accessSegment(scratchAccessor);
00059
00060 mode = params.mode;
00061 rowDesc = readTupleDescriptor(pOutAccessor->getTupleDesc());
00062 rowDesc.setLenient(lenient);
00063 pBuffer.reset(
00064 new FlatFileBuffer(params.dataFilePath),
00065 ClosableObjectDestructor());
00066 pParser.reset(new FlatFileParser(
00067 params.fieldDelim, params.rowDelim,
00068 params.quoteChar, params.escapeChar,
00069 params.trim));
00070
00071 numRowsScan = params.numRowsScan;
00072 textDesc = params.outputTupleDesc;
00073 }
00074
00075 void FlatFileExecStreamImpl::getResourceRequirements(
00076 ExecStreamResourceQuantity &minQuantity,
00077 ExecStreamResourceQuantity &optQuantity)
00078 {
00079 SingleOutputExecStream::getResourceRequirements(minQuantity,optQuantity);
00080 minQuantity.nCachePages += 2;
00081 optQuantity = minQuantity;
00082 }
00083
00084 void FlatFileExecStreamImpl::open(bool restart)
00085 {
00086 if (restart) {
00087 releaseResources();
00088 }
00089 SingleOutputExecStream::open(restart);
00090
00091 if (!restart) {
00092 bufferLock.allocatePage();
00093 uint cbPageSize = bufferLock.getPage().getCache().getPageSize();
00094 pBufferStorage = bufferLock.getPage().getWritableData();
00095 pBuffer->setStorage((char*)pBufferStorage, cbPageSize);
00096 }
00097 pBuffer->open();
00098 pBuffer->read();
00099 next = pBuffer->getReadPtr();
00100 isRowPending = false;
00101 nRowsOutput = nRowErrors = 0;
00102 lastResult.reset();
00103
00104 if (header) {
00105 FlatFileRowDescriptor headerDesc;
00106 for (uint i = 0; i < rowDesc.size(); i++) {
00107 headerDesc.push_back(
00108 FlatFileColumnDescriptor(
00109 FLAT_FILE_MAX_COLUMN_NAME_LEN));
00110 }
00111 headerDesc.setLenient(lenient);
00112 if (mapped) {
00113 headerDesc.setUnbounded();
00114 }
00115 pParser->scanRow(
00116 pBuffer->getReadPtr(), pBuffer->getSize(), headerDesc, lastResult);
00117 pBuffer->setReadPtr(lastResult.next);
00118 if (lastResult.status != FlatFileRowParseResult::NO_STATUS) {
00119 logError(lastResult);
00120 try {
00121 checkRowDelimiter();
00122 } catch (FennelExcn e) {
00123 reason = e.getMessage();
00124 }
00125 throw FennelExcn(
00126 FennelResource::instance().flatfileNoHeader(
00127 dataFilePath, reason));
00128 }
00129
00130
00131
00132
00133 if (mapped) {
00134 if (! lenient) {
00135 throw FennelExcn(
00136 FennelResource::instance()
00137 .flatfileMappedRequiresLenient());
00138 }
00139
00140 pParser->stripQuoting(lastResult, true);
00141 uint nFields = lastResult.getReadCount();
00142 int found = 0;
00143
00144 VectorOfUint columnMap;
00145 columnMap.resize(nFields);
00146 for (uint i = 0; i < nFields; i++) {
00147 char *n = lastResult.getColumn(i);
00148 if (n == NULL) {
00149 columnMap[i] = MAXU;
00150 } else {
00151 std::string name(
00152 n,
00153 lastResult.getColumnSize(i));
00154 columnMap[i] = findField(name);
00155 if (!isMAXU(columnMap[i])) {
00156 found++;
00157 }
00158 }
00159 }
00160 if (found == 0) {
00161 throw FennelExcn(
00162 FennelResource::instance().flatfileNoMappedColumns(
00163 std::string(" "),
00164 std::string(" ")));
00165 }
00166 rowDesc.setMap(columnMap);
00167 }
00168 }
00169
00170 done = false;
00171 }
00172
00173 ExecStreamResult FlatFileExecStreamImpl::execute(
00174 ExecStreamQuantum const &quantum)
00175 {
00176
00177 if (done && !isRowPending) {
00178 pOutAccessor->markEOS();
00179 return EXECRC_EOS;
00180 }
00181
00182 if (pOutAccessor->getState() == EXECBUF_OVERFLOW
00183 || pOutAccessor->getState() == EXECBUF_EOS) {
00184 return EXECRC_BUF_OVERFLOW;
00185 }
00186
00187
00188 for (uint nTuples = 0; nTuples < quantum.nTuplesMax;) {
00189
00190 while (!isRowPending) {
00191
00192
00193 if (nTuples >= quantum.nTuplesMax) {
00194 break;
00195 }
00196
00197 if ((numRowsScan > 0 && numRowsScan == nRowsOutput)
00198 || pBuffer->isDone())
00199 {
00200 done = true;
00201 break;
00202 }
00203 pParser->scanRow(
00204 pBuffer->getReadPtr(),pBuffer->getSize(),rowDesc,lastResult);
00205 nTuples++;
00206
00207 switch (lastResult.status) {
00208 case FlatFileRowParseResult::INCOMPLETE_COLUMN:
00209 if (pBuffer->isFull()) {
00210 lastResult.status = FlatFileRowParseResult::ROW_TOO_LARGE;
00211 } else if (!pBuffer->isComplete()) {
00212 pBuffer->read();
00213 continue;
00214 }
00215 case FlatFileRowParseResult::NO_COLUMN_DELIM:
00216 case FlatFileRowParseResult::TOO_FEW_COLUMNS:
00217 case FlatFileRowParseResult::TOO_MANY_COLUMNS:
00218 logError(lastResult);
00219 nRowErrors++;
00220 pBuffer->setReadPtr(lastResult.next);
00221 continue;
00222 case FlatFileRowParseResult::NO_STATUS:
00223 handleTuple(lastResult, dataTuple);
00224 pBuffer->setReadPtr(lastResult.next);
00225 break;
00226 default:
00227 permAssert(false);
00228 }
00229 }
00230
00231
00232 if (mode == FLATFILE_MODE_DESCRIBE && done && !isRowPending) {
00233 describeStream(dataTuple);
00234 }
00235
00236
00237 if (isRowPending) {
00238 if (!pOutAccessor->produceTuple(dataTuple)) {
00239 return EXECRC_BUF_OVERFLOW;
00240 }
00241 isRowPending = false;
00242 nRowsOutput++;
00243 }
00244
00245
00246 if (done) {
00247 pOutAccessor->markEOS();
00248 return EXECRC_EOS;
00249 }
00250 }
00251 return EXECRC_QUANTUM_EXPIRED;
00252 }
00253
00254 FlatFileRowDescriptor FlatFileExecStreamImpl::readTupleDescriptor(
00255 const TupleDescriptor &tupleDesc)
00256 {
00257 StandardTypeDescriptorFactory typeFactory;
00258 FlatFileRowDescriptor rowDesc;
00259 for (uint i = 0; i < tupleDesc.size(); i++) {
00260 TupleAttributeDescriptor attr = tupleDesc[i];
00261 StandardTypeDescriptorOrdinal ordinal =
00262 StandardTypeDescriptorOrdinal(
00263 attr.pTypeDescriptor->getOrdinal());
00264 if (StandardTypeDescriptor::isTextArray(ordinal)) {
00265 rowDesc.push_back(FlatFileColumnDescriptor(attr.cbStorage));
00266 } else {
00267 rowDesc.push_back(
00268 FlatFileColumnDescriptor(FLAT_FILE_MAX_NON_CHAR_VALUE_LEN));
00269 }
00270 }
00271 if (mode == FLATFILE_MODE_DESCRIBE) {
00272 rowDesc.setUnbounded();
00273 }
00274 return rowDesc;
00275 }
00276
00277 uint FlatFileExecStreamImpl::findField(const std::string &name)
00278 {
00279 for (uint i = 0; i < columnNames.size(); i++) {
00280 if (strcasecmp(name.c_str(), columnNames[i].c_str()) == 0) {
00281 return i;
00282 }
00283 }
00284 return MAXU;
00285 }
00286
00287 void FlatFileExecStreamImpl::handleTuple(
00288 FlatFileRowParseResult &result,
00289 TupleData &tuple)
00290 {
00291 TupleData *pTupleData = &tuple;
00292
00293
00294
00295
00296
00297 if (mode == FLATFILE_MODE_DESCRIBE) {
00298 if (fieldSizes.size() == 0) {
00299 fieldSizes.resize(result.getReadCount(), 0);
00300 }
00301
00302 if ((!lenient) && fieldSizes.size() != result.getReadCount()) {
00303 FlatFileRowParseResult detail = result;
00304 if (detail.getReadCount() > fieldSizes.size()) {
00305 detail.status = FlatFileRowParseResult::TOO_MANY_COLUMNS;
00306 } else {
00307 detail.status = FlatFileRowParseResult::TOO_FEW_COLUMNS;
00308 }
00309 logError(detail);
00310 return;
00311 }
00312 }
00313
00314
00315 pParser->stripQuoting(result, trim);
00316 for (uint i = 0; i < result.getReadCount(); i++) {
00317 if (mode == FLATFILE_MODE_DESCRIBE) {
00318 if (i < fieldSizes.size()) {
00319 fieldSizes[i] = max(fieldSizes[i], result.getColumnSize(i));
00320 }
00321 continue;
00322 }
00323 (*pTupleData)[i].pData = (PConstBuffer) result.getColumn(i);
00324
00325 (*pTupleData)[i].cbData =
00326 std::min(result.getColumnSize(i), textDesc[i].cbStorage);
00327 }
00328
00329 if (mode != FLATFILE_MODE_DESCRIBE) {
00330 isRowPending = true;
00331 } else {
00332
00333
00334 nRowsOutput++;
00335 }
00336 }
00337
00338 void FlatFileExecStreamImpl::describeStream(TupleData &tupleData)
00339 {
00340 if (fieldSizes.size() == 0) {
00341 throw FennelExcn(
00342 FennelResource::instance().flatfileDescribeFailed(dataFilePath));
00343 }
00344
00345 std::ostringstream oss;
00346 for (int i = 0; i < fieldSizes.size(); i++) {
00347 oss << fieldSizes[i];
00348 if (i != fieldSizes.size() - 1) {
00349 oss << " ";
00350 }
00351 }
00352
00353
00354 describeResult = oss.str();
00355 const char *value = describeResult.c_str();
00356 uint cbValue = describeResult.size() * sizeof(char);
00357
00358 assert(tupleData.size() == 1);
00359 tupleData[0].pData = (PConstBuffer) value;
00360 tupleData[0].cbData = cbValue;
00361 isRowPending = true;
00362 }
00363
00364 void FlatFileExecStreamImpl::logError(const FlatFileRowParseResult &result)
00365 {
00366 switch (result.status) {
00367 case FlatFileRowParseResult::INCOMPLETE_COLUMN:
00368 reason = FennelResource::instance().incompleteColumn();
00369 break;
00370 case FlatFileRowParseResult::ROW_TOO_LARGE:
00371 reason = FennelResource::instance().rowTextTooLong();
00372 break;
00373 case FlatFileRowParseResult::NO_COLUMN_DELIM:
00374 reason = FennelResource::instance().noColumnDelimiter();
00375 break;
00376 case FlatFileRowParseResult::TOO_FEW_COLUMNS:
00377 reason = FennelResource::instance().tooFewColumns();
00378 break;
00379 case FlatFileRowParseResult::TOO_MANY_COLUMNS:
00380 reason = FennelResource::instance().tooManyColumns();
00381 break;
00382 default:
00383 permAssert(false);
00384 }
00385 logError(reason, result);
00386 }
00387
00388 void FlatFileExecStreamImpl::logError(
00389 const std::string reason,
00390 const FlatFileRowParseResult &result)
00391 {
00392 this->reason = reason;
00393
00394
00395 if (errorDesc.size() == 0) {
00396
00397 StandardTypeDescriptorFactory typeFactory;
00398 StoredTypeDescriptor const &typeDesc =
00399 typeFactory.newDataType(STANDARD_TYPE_VARCHAR);
00400 bool nullable = true;
00401
00402 errorDesc.push_back(
00403 TupleAttributeDescriptor(
00404 typeDesc,
00405 nullable,
00406 MAX_ROW_ERROR_TEXT_WIDTH));
00407
00408 errorTuple.compute(errorDesc);
00409 }
00410
00411 uint length = result.next - result.current;
00412 length = std::min(length, MAX_ROW_ERROR_TEXT_WIDTH);
00413 errorTuple[0].pData = (PConstBuffer) result.current;
00414 errorTuple[0].cbData = length;
00415
00416 postError(ROW_ERROR, reason, errorDesc, errorTuple, -1);
00417 }
00418
00419 void FlatFileExecStreamImpl::checkRowDelimiter()
00420 {
00421 if (pBuffer->isDone() && lastResult.nRowDelimsRead == 0) {
00422 throw FennelExcn(
00423 FennelResource::instance().noRowDelimiter(dataFilePath));
00424 }
00425 }
00426
00427 void FlatFileExecStreamImpl::closeImpl()
00428 {
00429 releaseResources();
00430 SingleOutputExecStream::closeImpl();
00431 }
00432
00433 void FlatFileExecStreamImpl::releaseResources()
00434 {
00435 if (pBuffer) {
00436 pBuffer->close();
00437 }
00438 }
00439
00440 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $");
00441
00442