ExecStreamFactory.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2003-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/ExecStreamFactory.h"
00026 #include "fennel/farrago/JavaSinkExecStream.h"
00027 #include "fennel/farrago/JavaTransformExecStream.h"
00028 #include "fennel/farrago/CmdInterpreter.h"
00029 #include "fennel/ftrs/BTreePrefetchSearchExecStream.h"
00030 #include "fennel/ftrs/BTreeScanExecStream.h"
00031 #include "fennel/ftrs/BTreeSearchExecStream.h"
00032 #include "fennel/ftrs/BTreeSearchUniqueExecStream.h"
00033 #include "fennel/ftrs/FtrsTableWriterExecStream.h"
00034 #include "fennel/ftrs/BTreeSortExecStream.h"
00035 #include "fennel/exec/MergeExecStream.h"
00036 #include "fennel/exec/SegBufferExecStream.h"
00037 #include "fennel/exec/SegBufferReaderExecStream.h"
00038 #include "fennel/exec/SegBufferWriterExecStream.h"
00039 #include "fennel/exec/SplitterExecStream.h"
00040 #include "fennel/exec/BarrierExecStream.h"
00041 #include "fennel/exec/ValuesExecStream.h"
00042 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00043 #include "fennel/ftrs/FtrsTableWriterFactory.h"
00044 #include "fennel/exec/CartesianJoinExecStream.h"
00045 #include "fennel/exec/SortedAggExecStream.h"
00046 #include "fennel/exec/MockProducerExecStream.h"
00047 #include "fennel/exec/ReshapeExecStream.h"
00048 #include "fennel/exec/NestedLoopJoinExecStream.h"
00049 #include "fennel/exec/BernoulliSamplingExecStream.h"
00050 #include "fennel/calculator/CalcExecStream.h"
00051 #include "fennel/exec/CollectExecStream.h"
00052 #include "fennel/exec/UncollectExecStream.h"
00053 #include "fennel/exec/CorrelationJoinExecStream.h"
00054 #include "fennel/db/Database.h"
00055 #include "fennel/db/CheckpointThread.h"
00056 #include "fennel/tuple/TupleDescriptor.h"
00057 #include "fennel/tuple/TupleAccessor.h"
00058 #include "fennel/cache/QuotaCacheAccessor.h"
00059 #include "fennel/segment/SegmentFactory.h"
00060 #include "fennel/sorter/ExternalSortExecStream.h"
00061 #include "fennel/flatfile/FlatFileExecStream.h"
00062 #include "fennel/hashexe/LhxJoinExecStream.h"
00063 #include "fennel/hashexe/LhxAggExecStream.h"
00064 
00065 FENNEL_BEGIN_CPPFILE(
00066         "$Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $");
00067 
00068 ExecStreamFactory::ExecStreamFactory(
00069     SharedDatabase pDatabaseInit,
00070     SharedFtrsTableWriterFactory pTableWriterFactoryInit,
00071     CmdInterpreter::StreamGraphHandle *pStreamGraphHandleInit)
00072 {
00073     pDatabase = pDatabaseInit;
00074     pTableWriterFactory = pTableWriterFactoryInit;
00075     pStreamGraphHandle = pStreamGraphHandleInit;
00076     pGraphEmbryo = NULL;
00077 }
00078 
00079 SharedDatabase ExecStreamFactory::getDatabase()
00080 {
00081     return pDatabase;
00082 }
00083 
00084 void ExecStreamFactory::setGraphEmbryo(
00085     ExecStreamGraphEmbryo &graphEmbryo)
00086 {
00087     pGraphEmbryo = &graphEmbryo;
00088 }
00089 
00090 void ExecStreamFactory::setScratchAccessor(
00091     SegmentAccessor &scratchAccessorInit)
00092 {
00093     scratchAccessor = scratchAccessorInit;
00094 }
00095 
00096 void ExecStreamFactory::addSubFactory(
00097     SharedExecStreamSubFactory pSubFactory)
00098 {
00099     subFactories.push_back(pSubFactory);
00100 }
00101 
00102 ExecStreamEmbryo const &ExecStreamFactory::visitStream(
00103     ProxyExecutionStreamDef &streamDef)
00104 {
00105     bool created = false;
00106 
00107     // first give sub-factories a shot
00108     std::vector<SharedExecStreamSubFactory>::iterator ppSubFactory;
00109     for (ppSubFactory = subFactories.begin();
00110          ppSubFactory != subFactories.end(); ++ppSubFactory)
00111     {
00112         ExecStreamSubFactory &subFactory = **ppSubFactory;
00113         created = subFactory.createStream(
00114             *this,
00115             streamDef,
00116             embryo);
00117         if (created) {
00118             break;
00119         }
00120     }
00121 
00122     if (!created) {
00123         // dispatch based on polymorphic stream type
00124         invokeVisit(streamDef);
00125     }
00126     embryo.getStream()->setName(streamDef.getName());
00127     return embryo;
00128 }
00129 
00130 void ExecStreamFactory::invokeVisit(
00131     ProxyExecutionStreamDef &streamDef)
00132 {
00133     FemVisitor::visitTbl.accept(*this,streamDef);
00134 }
00135 
00136 // NOTE:  if you are adding a new stream implementation, be careful to follow
00137 // the pattern set by the existing methods:
00138 // (1) declare params on stack
00139 // (2) assign values to params
00140 // (3) call embryo.init(new YourVerySpecialExecStream(), params)
00141 
00142 void ExecStreamFactory::visit(ProxyBarrierStreamDef &streamDef)
00143 {
00144     BarrierExecStreamParams params;
00145     readTupleStreamParams(params, streamDef);
00146     params.returnMode = streamDef.getReturnMode();
00147     readBarrierDynamicParams(params, streamDef);
00148     embryo.init(new BarrierExecStream(), params);
00149 }
00150 
00151 void ExecStreamFactory::readBarrierDynamicParams(
00152     BarrierExecStreamParams &params,
00153     ProxyBarrierStreamDef &streamDef)
00154 {
00155     SharedProxyDynamicParameter dynamicParam = streamDef.getDynamicParameter();
00156     for (; dynamicParam; ++dynamicParam) {
00157         DynamicParamId p = (DynamicParamId) dynamicParam->getParameterId();
00158         params.parameterIds.push_back(p);
00159     }
00160 }
00161 
00162 void ExecStreamFactory::visit(ProxyBufferingTupleStreamDef &streamDef)
00163 {
00164     SegBufferExecStreamParams params;
00165     readTupleStreamParams(params, streamDef);
00166     params.multipass = streamDef.isMultipass();
00167     if (!streamDef.isInMemory()) {
00168         params.scratchAccessor.pSegment = pDatabase->getTempSegment();
00169         params.scratchAccessor.pCacheAccessor = params.pCacheAccessor;
00170     }
00171     embryo.init(new SegBufferExecStream(), params);
00172 }
00173 
00174 void ExecStreamFactory::visit(ProxyBufferWriterStreamDef &streamDef)
00175 {
00176     SegBufferWriterExecStreamParams params;
00177     readExecStreamParams(params, streamDef);
00178     readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00179     if (!streamDef.isInMemory()) {
00180         params.scratchAccessor.pSegment = pDatabase->getTempSegment();
00181         params.scratchAccessor.pCacheAccessor = params.pCacheAccessor;
00182     }
00183     assert(streamDef.isMultipass());
00184     params.readerRefCountParamId =
00185         readDynamicParamId(streamDef.getReaderRefCountParamId());
00186     embryo.init(new SegBufferWriterExecStream(), params);
00187 }
00188 
00189 void ExecStreamFactory::visit(ProxyBufferReaderStreamDef &streamDef)
00190 {
00191     SegBufferReaderExecStreamParams params;
00192     readTupleStreamParams(params, streamDef);
00193     if (!streamDef.isInMemory()) {
00194         params.scratchAccessor.pSegment = pDatabase->getTempSegment();
00195         params.scratchAccessor.pCacheAccessor = params.pCacheAccessor;
00196     }
00197     assert(streamDef.isMultipass());
00198     params.readerRefCountParamId =
00199         readDynamicParamId(streamDef.getReaderRefCountParamId());
00200     embryo.init(new SegBufferReaderExecStream(), params);
00201 }
00202 
00203 void ExecStreamFactory::visit(ProxyCartesianProductStreamDef &streamDef)
00204 {
00205     CartesianJoinExecStreamParams params;
00206     readTupleStreamParams(params, streamDef);
00207     params.leftOuter = streamDef.isLeftOuter();
00208     embryo.init(new CartesianJoinExecStream(), params);
00209 }
00210 
00211 void ExecStreamFactory::visit(ProxyIndexLoaderDef &streamDef)
00212 {
00213     BTreeInsertExecStreamParams params;
00214     readTupleStreamParams(params, streamDef);
00215     readBTreeStreamParams(params, streamDef);
00216     params.distinctness = streamDef.getDistinctness();
00217     params.monotonic = streamDef.isMonotonic();
00218     embryo.init(new BTreeInsertExecStream(), params);
00219 }
00220 
00221 void ExecStreamFactory::visit(ProxyIndexScanDef &streamDef)
00222 {
00223     BTreeScanExecStreamParams params;
00224     readBTreeReadStreamParams(params, streamDef);
00225     embryo.init(
00226         new BTreeScanExecStream(),
00227         params);
00228 }
00229 
00230 void ExecStreamFactory::visit(ProxyIndexSearchDef &streamDef)
00231 {
00232     assert(!(streamDef.isUniqueKey() && streamDef.isPrefetch()));
00233     if (streamDef.isPrefetch()) {
00234         BTreePrefetchSearchExecStreamParams params;
00235         initBTreePrefetchSearchParams(params, streamDef);
00236         embryo.init(
00237             new BTreePrefetchSearchExecStream(),
00238             params);
00239     } else {
00240         BTreeSearchExecStreamParams params;
00241         readBTreeSearchStreamParams(params, streamDef);
00242         embryo.init(
00243             streamDef.isUniqueKey()
00244             ? new BTreeSearchUniqueExecStream() : new BTreeSearchExecStream(),
00245             params);
00246     }
00247 }
00248 
00249 void ExecStreamFactory::initBTreePrefetchSearchParams(
00250     BTreePrefetchSearchExecStreamParams &params,
00251     ProxyIndexSearchDef &streamDef)
00252 {
00253     readBTreeSearchStreamParams(params, streamDef);
00254     // Need a private scratch segment because scratch pages are
00255     // deallocated when the stream is closed.
00256     createPrivateScratchSegment(params);
00257 }
00258 
00259 void ExecStreamFactory::visit(ProxyJavaSinkStreamDef &streamDef)
00260 {
00261     JavaSinkExecStreamParams params;
00262     readExecStreamParams(params, streamDef);
00263     params.pStreamGraphHandle = pStreamGraphHandle;
00264     params.javaFennelPipeTupleIterId = streamDef.getStreamId();
00265     embryo.init(new JavaSinkExecStream(), params);
00266 }
00267 
00268 void ExecStreamFactory::visit(ProxyJavaTransformStreamDef &streamDef)
00269 {
00270     JavaTransformExecStreamParams params;
00271 
00272     readExecStreamParams(params, streamDef);
00273 
00274     readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00275 
00276     params.pStreamGraphHandle = pStreamGraphHandle;
00277     params.javaClassName = streamDef.getJavaClassName();
00278     embryo.init(new JavaTransformExecStream(), params);
00279 }
00280 
00281 void ExecStreamFactory::visit(ProxyMergeStreamDef &streamDef)
00282 {
00283     MergeExecStreamParams params;
00284     readTupleStreamParams(params, streamDef);
00285     if (!streamDef.isSequential()) {
00286         params.isParallel = true;
00287     }
00288     // prePullInputs parameter isn't actually supported yet
00289     assert(!streamDef.isPrePullInputs());
00290     embryo.init(new MergeExecStream(), params);
00291 }
00292 
00293 void ExecStreamFactory::visit(ProxyMockTupleStreamDef &streamDef)
00294 {
00295     MockProducerExecStreamParams params;
00296     readTupleStreamParams(params, streamDef);
00297     params.nRows = streamDef.getRowCount();
00298     embryo.init(new MockProducerExecStream(), params);
00299 }
00300 
00301 void ExecStreamFactory::visit(ProxyTableDeleterDef &streamDef)
00302 {
00303     FtrsTableWriterExecStreamParams params;
00304     params.actionType = FtrsTableWriter::ACTION_DELETE;
00305     readTableWriterStreamParams(params, streamDef);
00306     embryo.init(new FtrsTableWriterExecStream(), params);
00307 }
00308 
00309 void ExecStreamFactory::visit(ProxyTableInserterDef &streamDef)
00310 {
00311     FtrsTableWriterExecStreamParams params;
00312     params.actionType = FtrsTableWriter::ACTION_INSERT;
00313     readTableWriterStreamParams(params, streamDef);
00314     embryo.init(new FtrsTableWriterExecStream(), params);
00315 }
00316 
00317 void ExecStreamFactory::visit(ProxyTableUpdaterDef &streamDef)
00318 {
00319     FtrsTableWriterExecStreamParams params;
00320     params.actionType = FtrsTableWriter::ACTION_UPDATE;
00321     SharedProxyTupleProjection pUpdateProj = streamDef.getUpdateProj();
00322     CmdInterpreter::readTupleProjection(
00323         params.updateProj,
00324         pUpdateProj);
00325     readTableWriterStreamParams(params, streamDef);
00326     embryo.init(new FtrsTableWriterExecStream(), params);
00327 }
00328 
00329 void ExecStreamFactory::visit(ProxySortedAggStreamDef &streamDef)
00330 {
00331     SortedAggExecStreamParams params;
00332     readAggStreamParams(params, streamDef);
00333     embryo.init(new SortedAggExecStream(), params);
00334 }
00335 
00336 void ExecStreamFactory::implementSortWithBTree(ProxySortingStreamDef &streamDef)
00337 {
00338     BTreeSortExecStreamParams params;
00339     readTupleStreamParams(params,streamDef);
00340     params.distinctness = streamDef.getDistinctness();
00341     params.monotonic = false;
00342     params.pSegment = pDatabase->getTempSegment();
00343     params.rootPageId = NULL_PAGE_ID;
00344     params.segmentId = Database::TEMP_SEGMENT_ID;
00345     params.pageOwnerId = ANON_PAGE_OWNER_ID;
00346     params.pRootMap = NULL;
00347     params.rootPageIdParamId = DynamicParamId(0);
00348     CmdInterpreter::readTupleProjection(
00349         params.keyProj,
00350         streamDef.getKeyProj());
00351     // TODO jvs 3-Dec-2006:  pass along streamDef.getDescendingProj() once
00352     // btree can deal with it
00353     params.tupleDesc = params.outputTupleDesc;
00354     embryo.init(new BTreeSortExecStream(), params);
00355 }
00356 
00357 void ExecStreamFactory::visit(ProxySplitterStreamDef &streamDef)
00358 {
00359     SplitterExecStreamParams params;
00360     readExecStreamParams(params, streamDef);
00361     readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00362     embryo.init(new SplitterExecStream(), params);
00363 }
00364 
00365 
00366 void ExecStreamFactory::visit(ProxyValuesStreamDef &streamDef)
00367 {
00368     ValuesExecStreamParams params;
00369     readTupleStreamParams(params, streamDef);
00370 
00371     // Get the Java String object so that we can pass it to the decoder.
00372     jobject tupleBytesBase64 = streamDef.pEnv->CallObjectMethod(
00373         streamDef.jObject, ProxyValuesStreamDef::meth_getTupleBytesBase64);
00374 
00375     // Call back into Java again to perform the decode.
00376     jbyteArray jbytes = (jbyteArray) streamDef.pEnv->CallStaticObjectMethod(
00377         JniUtil::classRhBase64,
00378         JniUtil::methBase64Decode,
00379         tupleBytesBase64);
00380 
00381     // Copy the bytes from Java to our tuple buffer.
00382     params.bufSize = streamDef.pEnv->GetArrayLength(jbytes);
00383     params.pTupleBuffer.reset(new FixedBuffer[params.bufSize]);
00384     streamDef.pEnv->GetByteArrayRegion(
00385         jbytes, 0, params.bufSize,
00386         reinterpret_cast<jbyte *>(params.pTupleBuffer.get()));
00387 
00388     embryo.init(new ValuesExecStream(), params);
00389 }
00390 
00391 void ExecStreamFactory::visit(ProxyReshapeStreamDef &streamDef)
00392 {
00393     ReshapeExecStreamParams params;
00394     readTupleStreamParams(params, streamDef);
00395 
00396     params.compOp = streamDef.getCompareOp();
00397     if (params.compOp != COMP_NOOP) {
00398         // Get the Java String object so that we can pass it to the decoder.
00399         jobject tupleBytesBase64 = streamDef.pEnv->CallObjectMethod(
00400             streamDef.jObject,
00401             ProxyReshapeStreamDef::meth_getTupleCompareBytesBase64);
00402 
00403         // Call back into Java again to perform the decode.
00404         jbyteArray jbytes = (jbyteArray) streamDef.pEnv->CallStaticObjectMethod(
00405             JniUtil::classRhBase64,
00406             JniUtil::methBase64Decode,
00407             tupleBytesBase64);
00408 
00409         // Copy the bytes from Java to our tuple buffer.
00410         int bufSize = streamDef.pEnv->GetArrayLength(jbytes);
00411         params.pCompTupleBuffer.reset(new FixedBuffer[bufSize]);
00412         streamDef.pEnv->GetByteArrayRegion(
00413             jbytes, 0, bufSize,
00414             reinterpret_cast<jbyte *>(params.pCompTupleBuffer.get()));
00415 
00416         CmdInterpreter::readTupleProjection(
00417             params.inputCompareProj, streamDef.getInputCompareProjection());
00418     }
00419 
00420     CmdInterpreter::readTupleProjection(
00421         params.outputProj, streamDef.getOutputProjection());
00422 
00423     SharedProxyReshapeParameter dynamicParam = streamDef.getReshapeParameter();
00424     for (; dynamicParam; ++dynamicParam) {
00425         int offset = dynamicParam->getCompareOffset();
00426         ReshapeParameter reshapeParam(
00427             DynamicParamId(dynamicParam->getDynamicParamId()),
00428             (offset < 0) ? MAXU : uint(offset),
00429             dynamicParam->isOutputParam());
00430         params.dynamicParameters.push_back(reshapeParam);
00431     }
00432 
00433     embryo.init(new ReshapeExecStream(), params);
00434 }
00435 
00436 void ExecStreamFactory::visit(ProxyNestedLoopJoinStreamDef &streamDef)
00437 {
00438     NestedLoopJoinExecStreamParams params;
00439     readTupleStreamParams(params, streamDef);
00440     params.leftOuter = streamDef.isLeftOuter();
00441 
00442     SharedProxyCorrelation dynamicParam = streamDef.getLeftJoinKey();
00443     for (; dynamicParam; ++dynamicParam) {
00444         NestedLoopJoinKey joinKey(
00445             DynamicParamId(dynamicParam->getId()),
00446             dynamicParam->getOffset());
00447         params.leftJoinKeys.push_back(joinKey);
00448     }
00449 
00450     embryo.init(new NestedLoopJoinExecStream(), params);
00451 }
00452 
00453 void ExecStreamFactory::visit(ProxyBernoulliSamplingStreamDef &streamDef)
00454 {
00455     BernoulliSamplingExecStreamParams params;
00456     readTupleStreamParams(params, streamDef);
00457 
00458     params.samplingRate = streamDef.getSamplingRate();
00459     params.isRepeatable = streamDef.isRepeatable();
00460     params.repeatableSeed = streamDef.getRepeatableSeed();
00461 
00462     embryo.init(new BernoulliSamplingExecStream(), params);
00463 }
00464 
00465 void ExecStreamFactory::visit(ProxyCalcTupleStreamDef &streamDef)
00466 {
00467     CalcExecStreamParams params;
00468     readTupleStreamParams(params, streamDef);
00469     params.program = streamDef.getProgram();
00470     params.isFilter = streamDef.isFilter();
00471     embryo.init(
00472         new CalcExecStream(),
00473         params);
00474 }
00475 
00476 void ExecStreamFactory::visit(ProxyCorrelationJoinStreamDef &streamDef)
00477 {
00478     CorrelationJoinExecStreamParams params;
00479     readTupleStreamParams(params, streamDef);
00480     SharedProxyCorrelation pCorrelation = streamDef.getCorrelations();
00481     for (; pCorrelation; ++pCorrelation) {
00482         Correlation correlation(
00483             DynamicParamId(pCorrelation->getId()),
00484             pCorrelation->getOffset());
00485         params.correlations.push_back(correlation);
00486     }
00487     embryo.init(new CorrelationJoinExecStream(), params);
00488 }
00489 
00490 void ExecStreamFactory::visit(ProxyCollectTupleStreamDef &streamDef)
00491 {
00492     CollectExecStreamParams params;
00493     readTupleStreamParams(params, streamDef);
00494     embryo.init(new CollectExecStream(), params);
00495 }
00496 
00497 void ExecStreamFactory::visit(ProxyUncollectTupleStreamDef &streamDef)
00498 {
00499     UncollectExecStreamParams params;
00500     readTupleStreamParams(params, streamDef);
00501     embryo.init(new UncollectExecStream(), params);
00502 }
00503 
00504 void ExecStreamFactory::visit(ProxySortingStreamDef &streamDef)
00505 {
00506     if (streamDef.getDistinctness() != DUP_ALLOW) {
00507         // can't handle it; fall back to BTree-based sort
00508         implementSortWithBTree(streamDef);
00509         return;
00510     }
00511 
00512     SharedDatabase pDatabase = getDatabase();
00513 
00514     ExternalSortExecStreamParams params;
00515 
00516     readTupleStreamParams(params, streamDef);
00517 
00518     // ExternalSortStream requires a private ScratchSegment.
00519     createPrivateScratchSegment(params);
00520 
00521     params.distinctness = streamDef.getDistinctness();
00522     params.pTempSegment = pDatabase->getTempSegment();
00523     params.storeFinalRun = false;
00524     params.estimatedNumRows = streamDef.getEstimatedNumRows();
00525     params.earlyClose = streamDef.isEarlyClose();
00526     CmdInterpreter::readTupleProjection(
00527         params.keyProj,
00528         streamDef.getKeyProj());
00529     params.descendingKeyColumns.resize(params.keyProj.size(), false);
00530     if (streamDef.getDescendingProj()) {
00531         TupleProjection descendingProj;
00532         CmdInterpreter::readTupleProjection(
00533             descendingProj,
00534             streamDef.getDescendingProj());
00535         for (uint i = 0; i < descendingProj.size(); ++i) {
00536             params.descendingKeyColumns[descendingProj[i]] = true;
00537         }
00538     }
00539     embryo.init(
00540         ExternalSortExecStream::newExternalSortExecStream(),
00541         params);
00542 }
00543 
00544 char ExecStreamFactory::readCharParam(const std::string &val)
00545 {
00546     assert(val.size() <= 1);
00547     if (val.size() == 0) {
00548         return 0;
00549     }
00550     return val.at(0);
00551 }
00552 
00553 void ExecStreamFactory::visit(ProxyFlatFileTupleStreamDef &streamDef)
00554 {
00555     FlatFileExecStreamParams params;
00556     readTupleStreamParams(params, streamDef);
00557 
00558     assert(streamDef.getDataFilePath().size() > 0);
00559     params.dataFilePath = streamDef.getDataFilePath();
00560     params.errorFilePath = streamDef.getErrorFilePath();
00561     params.fieldDelim = readCharParam(streamDef.getFieldDelimiter());
00562     params.rowDelim = readCharParam(streamDef.getRowDelimiter());
00563     params.quoteChar = readCharParam(streamDef.getQuoteCharacter());
00564     params.escapeChar = readCharParam(streamDef.getEscapeCharacter());
00565     params.header = streamDef.isHasHeader();
00566     params.lenient = streamDef.isLenient();
00567     params.trim = streamDef.isTrim();
00568     params.mapped = streamDef.isMapped();
00569     readColumnList(streamDef, params.columnNames);
00570 
00571     params.numRowsScan = streamDef.getNumRowsScan();
00572     params.calcProgram = streamDef.getCalcProgram();
00573     if (params.numRowsScan > 0 && params.calcProgram.size() > 0) {
00574         params.mode = FLATFILE_MODE_SAMPLE;
00575     } else if (params.numRowsScan > 0) {
00576         params.mode = FLATFILE_MODE_DESCRIBE;
00577     } else if (params.numRowsScan == 0 && params.calcProgram.size() == 0) {
00578         params.mode = FLATFILE_MODE_QUERY_TEXT;
00579     }
00580     embryo.init(FlatFileExecStream::newFlatFileExecStream(), params);
00581 }
00582 
00583 void ExecStreamFactory::visit(ProxyLhxJoinStreamDef &streamDef)
00584 {
00585     TupleProjection tmpProj;
00586 
00587     LhxJoinExecStreamParams params;
00588     readTupleStreamParams(params, streamDef);
00589 
00590     /*
00591      * LhxJoinExecStream requires a private ScratchSegment.
00592      */
00593     createPrivateScratchSegment(params);
00594 
00595     /*
00596      * External segment to store partitions.
00597      */
00598     SharedDatabase pDatabase = getDatabase();
00599     params.pTempSegment = pDatabase->getTempSegment();
00600 
00601     /*
00602      * These fields are currently not used by the optimizer. We know that
00603      * optimizer only supports inner equi hash join.
00604      */
00605     params.leftInner     = streamDef.isLeftInner();
00606     params.leftOuter     = streamDef.isLeftOuter();
00607     params.rightInner    = streamDef.isRightInner();
00608     params.rightOuter    = streamDef.isRightOuter();
00609     params.setopDistinct = streamDef.isSetopDistinct();
00610     params.setopAll      = streamDef.isSetopAll();
00611 
00612     /*
00613      * Set forcePartitionLevel to 0 to turn off force partitioning.
00614      */
00615     params.forcePartitionLevel = 0;
00616     params.enableJoinFilter    = true;
00617     params.enableSubPartStat   = true;
00618     params.enableSwing         = true;
00619 
00620     CmdInterpreter::readTupleProjection(
00621         params.leftKeyProj, streamDef.getLeftKeyProj());
00622 
00623     CmdInterpreter::readTupleProjection(
00624         params.rightKeyProj, streamDef.getRightKeyProj());
00625 
00626     CmdInterpreter::readTupleProjection(
00627         params.filterNullKeyProj, streamDef.getFilterNullProj());
00628 
00629     /*
00630      * The optimizer currently estimates these two values.
00631      */
00632     params.cndKeys = streamDef.getCndBuildKeys();
00633     params.numRows = streamDef.getNumBuildRows();
00634 
00635     embryo.init(new LhxJoinExecStream(), params);
00636 }
00637 
00638 void ExecStreamFactory::visit(ProxyLhxAggStreamDef &streamDef)
00639 {
00640     LhxAggExecStreamParams params;
00641     readAggStreamParams(params, streamDef);
00642 
00643     /*
00644      * LhxAggExecStream requires a private ScratchSegment.
00645      */
00646     createPrivateScratchSegment(params);
00647 
00648     /*
00649      * External segment to store partitions.
00650      */
00651     SharedDatabase pDatabase = getDatabase();
00652     params.pTempSegment = pDatabase->getTempSegment();
00653 
00654     /*
00655      * The optimizer currently estimates these two values.
00656      */
00657     params.cndGroupByKeys = streamDef.getCndGroupByKeys();
00658     params.numRows = streamDef.getNumRows();
00659 
00660     /*
00661      * Set forcePartitionLevel to 0 to turn off force partitioning.
00662      */
00663     params.forcePartitionLevel = 0;
00664 
00665     /*
00666      * NOTE:
00667      * Hash aggregation partitions partially aggregated results to disk.
00668      * The stat currently keeps track of the tuple count before
00669      * aggregation, so it is not very accurate. Disable sub partition stats
00670      * for now.
00671      */
00672     params.enableSubPartStat = false;
00673 
00674     embryo.init(new LhxAggExecStream(), params);
00675 }
00676 
00677 void ExecStreamFactory::readColumnList(
00678     ProxyFlatFileTupleStreamDef &streamDef,
00679     std::vector<std::string> &names)
00680 {
00681     SharedProxyColumnName pColumnName = streamDef.getColumn();
00682 
00683     for (; pColumnName; ++pColumnName) {
00684         names.push_back(pColumnName->getName());
00685     }
00686 }
00687 
00688 void ExecStreamFactory::readExecStreamParams(
00689     ExecStreamParams &params,
00690     ProxyExecutionStreamDef &streamDef)
00691 {
00692     createQuotaAccessors(params);
00693 }
00694 
00695 void ExecStreamFactory::readTupleDescriptor(
00696     TupleDescriptor& desc,
00697     SharedProxyTupleDescriptor def)
00698 {
00699     assert(def);
00700     CmdInterpreter::readTupleDescriptor(
00701         desc, *def, pDatabase->getTypeFactory());
00702 }
00703 
00704 void ExecStreamFactory::readTupleStreamParams(
00705     SingleOutputExecStreamParams &params,
00706     ProxyTupleStreamDef &streamDef)
00707 {
00708     readExecStreamParams(params,streamDef);
00709     readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00710 }
00711 
00712 void ExecStreamFactory::createPrivateScratchSegment(ExecStreamParams &params)
00713 {
00714     // Make sure global scratch segment was already set up.
00715     assert(params.pCacheAccessor);
00716 
00717     params.scratchAccessor =
00718         pDatabase->getSegmentFactory()->newScratchSegment(
00719             pDatabase->getCache());
00720     SharedQuotaCacheAccessor pSuperQuotaAccessor =
00721         boost::dynamic_pointer_cast<QuotaCacheAccessor>(
00722             params.pCacheAccessor);
00723     params.scratchAccessor.pCacheAccessor.reset(
00724         new QuotaCacheAccessor(
00725             pSuperQuotaAccessor,
00726             params.scratchAccessor.pCacheAccessor,
00727             UINT_MAX));
00728 }
00729 
00730 void ExecStreamFactory::createQuotaAccessors(
00731     ExecStreamParams &params)
00732 {
00733     assert(pGraphEmbryo);
00734     pGraphEmbryo->initStreamParams(params);
00735 }
00736 
00737 void ExecStreamFactory::readTableWriterStreamParams(
00738     FtrsTableWriterExecStreamParams &params,
00739     ProxyTableWriterDef &streamDef)
00740 {
00741     readTupleStreamParams(params, streamDef);
00742     params.pTableWriterFactory = pTableWriterFactory;
00743     params.tableId = ANON_PAGE_OWNER_ID;
00744     params.pActionMutex = &(pDatabase->getCheckpointThread()->getActionMutex());
00745 
00746     SharedProxyIndexWriterDef pIndexWriterDef = streamDef.getIndexWriter();
00747     for (; pIndexWriterDef; ++pIndexWriterDef) {
00748         FtrsTableIndexWriterParams indexParams;
00749         // all index writers share some common attributes
00750         indexParams.pCacheAccessor = params.pCacheAccessor;
00751         indexParams.scratchAccessor = params.scratchAccessor;
00752         readIndexWriterParams(indexParams, *pIndexWriterDef);
00753         SharedProxyTupleProjection pInputProj =
00754             pIndexWriterDef->getInputProj();
00755         if (pInputProj) {
00756             CmdInterpreter::readTupleProjection(
00757                 indexParams.inputProj,
00758                 pInputProj);
00759         } else {
00760             // this is the clustered index; use it as a table ID
00761             params.tableId = indexParams.pageOwnerId;
00762         }
00763         params.indexParams.push_back(indexParams);
00764     }
00765     assert(params.tableId != ANON_PAGE_OWNER_ID);
00766 }
00767 
00768 void ExecStreamFactory::readBTreeStreamParams(
00769     BTreeExecStreamParams &params,
00770     ProxyIndexAccessorDef &streamDef)
00771 {
00772     assert(params.pCacheAccessor);
00773     readBTreeParams(params, streamDef);
00774 }
00775 
00776 void ExecStreamFactory::readBTreeParams(
00777     BTreeParams &params,
00778     ProxyIndexAccessorDef &streamDef)
00779 {
00780     params.rootPageIdParamId =
00781         readDynamicParamId(streamDef.getRootPageIdParamId());
00782     if (params.rootPageIdParamId > DynamicParamId(0) &&
00783         streamDef.getRootPageId() == -1)
00784     {
00785         // In the case where the btree is dynamically created during
00786         // runtime, the btree will be created in the temp segment
00787         params.segmentId = Database::TEMP_SEGMENT_ID;
00788         params.pageOwnerId = ANON_PAGE_OWNER_ID;
00789         params.pSegment = pDatabase->getTempSegment();
00790         params.rootPageId = NULL_PAGE_ID;
00791         params.pRootMap = NULL;
00792     } else {
00793         params.segmentId = SegmentId(streamDef.getSegmentId());
00794         params.pageOwnerId = PageOwnerId(streamDef.getIndexId());
00795         assert(VALID_PAGE_OWNER_ID(params.pageOwnerId));
00796         // Set the btree to read from the appropriate segment, depending
00797         // on whether or not the reader needs to see uncommitted data
00798         // created upstream in the stream graph.
00799         if (streamDef.isReadOnlyCommittedData()) {
00800             params.pSegment =
00801                 pDatabase->getSegmentById(
00802                     params.segmentId,
00803                     pStreamGraphHandle->pReadCommittedSegment);
00804         } else {
00805             params.pSegment =
00806                 pDatabase->getSegmentById(
00807                     params.segmentId,
00808                     pStreamGraphHandle->pSegment);
00809         }
00810         if (streamDef.getRootPageId() != -1) {
00811             params.rootPageId = PageId(streamDef.getRootPageId());
00812             params.pRootMap = NULL;
00813         } else {
00814             params.rootPageId = NULL_PAGE_ID;
00815             if (params.rootPageIdParamId == DynamicParamId(0)) {
00816                 params.pRootMap = pStreamGraphHandle;
00817             }
00818         }
00819     }
00820     readTupleDescriptor(params.tupleDesc, streamDef.getTupleDesc());
00821     CmdInterpreter::readTupleProjection(
00822         params.keyProj,
00823         streamDef.getKeyProj());
00824 
00825 }
00826 
00827 DynamicParamId ExecStreamFactory::readDynamicParamId(const int val)
00828 {
00829     // NOTE: zero is a special code for no parameter id
00830     uint (id) = (val < 0) ? 0 : (uint) val;
00831     return (DynamicParamId) id;
00832 }
00833 
00834 void ExecStreamFactory::readBTreeReadStreamParams(
00835     BTreeReadExecStreamParams &params,
00836     ProxyIndexScanDef &streamDef)
00837 {
00838     readTupleStreamParams(params, streamDef);
00839     readBTreeStreamParams(params, streamDef);
00840     CmdInterpreter::readTupleProjection(
00841         params.outputProj,
00842         streamDef.getOutputProj());
00843 }
00844 
00845 void ExecStreamFactory::readIndexWriterParams(
00846     FtrsTableIndexWriterParams &params,
00847     ProxyIndexWriterDef &indexWriterDef)
00848 {
00849     readBTreeStreamParams(params, indexWriterDef);
00850     params.distinctness = indexWriterDef.getDistinctness();
00851     params.updateInPlace = indexWriterDef.isUpdateInPlace();
00852 }
00853 
00854 void ExecStreamFactory::readBTreeSearchStreamParams(
00855     BTreeSearchExecStreamParams &params,
00856     ProxyIndexSearchDef &streamDef)
00857 {
00858     readBTreeReadStreamParams(params, streamDef);
00859     params.outerJoin = streamDef.isOuterJoin();
00860     if (streamDef.getInputKeyProj()) {
00861         CmdInterpreter::readTupleProjection(
00862             params.inputKeyProj,
00863             streamDef.getInputKeyProj());
00864     }
00865     if (streamDef.getInputJoinProj()) {
00866         CmdInterpreter::readTupleProjection(
00867             params.inputJoinProj,
00868             streamDef.getInputJoinProj());
00869     }
00870     if (streamDef.getInputDirectiveProj()) {
00871         CmdInterpreter::readTupleProjection(
00872             params.inputDirectiveProj,
00873             streamDef.getInputDirectiveProj());
00874     }
00875 
00876     SharedProxyCorrelation dynamicParam = streamDef.getSearchKeyParameter();
00877     for (; dynamicParam; ++dynamicParam) {
00878         BTreeSearchKeyParameter searchKeyParam(
00879             DynamicParamId(dynamicParam->getId()),
00880             dynamicParam->getOffset());
00881         params.searchKeyParams.push_back(searchKeyParam);
00882     }
00883 }
00884 
00885 void ExecStreamFactory::readAggStreamParams(
00886     SortedAggExecStreamParams &params,
00887     ProxyAggStreamDef &streamDef)
00888 {
00889     readTupleStreamParams(params,streamDef);
00890     SharedProxyAggInvocation pAggInvocation = streamDef.getAggInvocation();
00891     for (; pAggInvocation; ++pAggInvocation) {
00892         AggInvocation aggInvocation;
00893         aggInvocation.aggFunction = pAggInvocation->getFunction();
00894         aggInvocation.iInputAttr =
00895             pAggInvocation->getInputAttributeIndex();
00896         params.aggInvocations.push_back(aggInvocation);
00897     }
00898     params.groupByKeyCount = streamDef.getGroupingPrefixSize();
00899 }
00900 
00901 ExecStreamSubFactory::~ExecStreamSubFactory()
00902 {
00903 }
00904 
00905 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $");
00906 
00907 // End ExecStreamFactory.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1