00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/ExecStreamFactory.h"
00026 #include "fennel/farrago/JavaSinkExecStream.h"
00027 #include "fennel/farrago/JavaTransformExecStream.h"
00028 #include "fennel/farrago/CmdInterpreter.h"
00029 #include "fennel/ftrs/BTreePrefetchSearchExecStream.h"
00030 #include "fennel/ftrs/BTreeScanExecStream.h"
00031 #include "fennel/ftrs/BTreeSearchExecStream.h"
00032 #include "fennel/ftrs/BTreeSearchUniqueExecStream.h"
00033 #include "fennel/ftrs/FtrsTableWriterExecStream.h"
00034 #include "fennel/ftrs/BTreeSortExecStream.h"
00035 #include "fennel/exec/MergeExecStream.h"
00036 #include "fennel/exec/SegBufferExecStream.h"
00037 #include "fennel/exec/SegBufferReaderExecStream.h"
00038 #include "fennel/exec/SegBufferWriterExecStream.h"
00039 #include "fennel/exec/SplitterExecStream.h"
00040 #include "fennel/exec/BarrierExecStream.h"
00041 #include "fennel/exec/ValuesExecStream.h"
00042 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00043 #include "fennel/ftrs/FtrsTableWriterFactory.h"
00044 #include "fennel/exec/CartesianJoinExecStream.h"
00045 #include "fennel/exec/SortedAggExecStream.h"
00046 #include "fennel/exec/MockProducerExecStream.h"
00047 #include "fennel/exec/ReshapeExecStream.h"
00048 #include "fennel/exec/NestedLoopJoinExecStream.h"
00049 #include "fennel/exec/BernoulliSamplingExecStream.h"
00050 #include "fennel/calculator/CalcExecStream.h"
00051 #include "fennel/exec/CollectExecStream.h"
00052 #include "fennel/exec/UncollectExecStream.h"
00053 #include "fennel/exec/CorrelationJoinExecStream.h"
00054 #include "fennel/db/Database.h"
00055 #include "fennel/db/CheckpointThread.h"
00056 #include "fennel/tuple/TupleDescriptor.h"
00057 #include "fennel/tuple/TupleAccessor.h"
00058 #include "fennel/cache/QuotaCacheAccessor.h"
00059 #include "fennel/segment/SegmentFactory.h"
00060 #include "fennel/sorter/ExternalSortExecStream.h"
00061 #include "fennel/flatfile/FlatFileExecStream.h"
00062 #include "fennel/hashexe/LhxJoinExecStream.h"
00063 #include "fennel/hashexe/LhxAggExecStream.h"
00064
00065 FENNEL_BEGIN_CPPFILE(
00066 "$Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $");
00067
00068 ExecStreamFactory::ExecStreamFactory(
00069 SharedDatabase pDatabaseInit,
00070 SharedFtrsTableWriterFactory pTableWriterFactoryInit,
00071 CmdInterpreter::StreamGraphHandle *pStreamGraphHandleInit)
00072 {
00073 pDatabase = pDatabaseInit;
00074 pTableWriterFactory = pTableWriterFactoryInit;
00075 pStreamGraphHandle = pStreamGraphHandleInit;
00076 pGraphEmbryo = NULL;
00077 }
00078
00079 SharedDatabase ExecStreamFactory::getDatabase()
00080 {
00081 return pDatabase;
00082 }
00083
00084 void ExecStreamFactory::setGraphEmbryo(
00085 ExecStreamGraphEmbryo &graphEmbryo)
00086 {
00087 pGraphEmbryo = &graphEmbryo;
00088 }
00089
00090 void ExecStreamFactory::setScratchAccessor(
00091 SegmentAccessor &scratchAccessorInit)
00092 {
00093 scratchAccessor = scratchAccessorInit;
00094 }
00095
00096 void ExecStreamFactory::addSubFactory(
00097 SharedExecStreamSubFactory pSubFactory)
00098 {
00099 subFactories.push_back(pSubFactory);
00100 }
00101
00102 ExecStreamEmbryo const &ExecStreamFactory::visitStream(
00103 ProxyExecutionStreamDef &streamDef)
00104 {
00105 bool created = false;
00106
00107
00108 std::vector<SharedExecStreamSubFactory>::iterator ppSubFactory;
00109 for (ppSubFactory = subFactories.begin();
00110 ppSubFactory != subFactories.end(); ++ppSubFactory)
00111 {
00112 ExecStreamSubFactory &subFactory = **ppSubFactory;
00113 created = subFactory.createStream(
00114 *this,
00115 streamDef,
00116 embryo);
00117 if (created) {
00118 break;
00119 }
00120 }
00121
00122 if (!created) {
00123
00124 invokeVisit(streamDef);
00125 }
00126 embryo.getStream()->setName(streamDef.getName());
00127 return embryo;
00128 }
00129
00130 void ExecStreamFactory::invokeVisit(
00131 ProxyExecutionStreamDef &streamDef)
00132 {
00133 FemVisitor::visitTbl.accept(*this,streamDef);
00134 }
00135
00136
00137
00138
00139
00140
00141
00142 void ExecStreamFactory::visit(ProxyBarrierStreamDef &streamDef)
00143 {
00144 BarrierExecStreamParams params;
00145 readTupleStreamParams(params, streamDef);
00146 params.returnMode = streamDef.getReturnMode();
00147 readBarrierDynamicParams(params, streamDef);
00148 embryo.init(new BarrierExecStream(), params);
00149 }
00150
00151 void ExecStreamFactory::readBarrierDynamicParams(
00152 BarrierExecStreamParams ¶ms,
00153 ProxyBarrierStreamDef &streamDef)
00154 {
00155 SharedProxyDynamicParameter dynamicParam = streamDef.getDynamicParameter();
00156 for (; dynamicParam; ++dynamicParam) {
00157 DynamicParamId p = (DynamicParamId) dynamicParam->getParameterId();
00158 params.parameterIds.push_back(p);
00159 }
00160 }
00161
00162 void ExecStreamFactory::visit(ProxyBufferingTupleStreamDef &streamDef)
00163 {
00164 SegBufferExecStreamParams params;
00165 readTupleStreamParams(params, streamDef);
00166 params.multipass = streamDef.isMultipass();
00167 if (!streamDef.isInMemory()) {
00168 params.scratchAccessor.pSegment = pDatabase->getTempSegment();
00169 params.scratchAccessor.pCacheAccessor = params.pCacheAccessor;
00170 }
00171 embryo.init(new SegBufferExecStream(), params);
00172 }
00173
00174 void ExecStreamFactory::visit(ProxyBufferWriterStreamDef &streamDef)
00175 {
00176 SegBufferWriterExecStreamParams params;
00177 readExecStreamParams(params, streamDef);
00178 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00179 if (!streamDef.isInMemory()) {
00180 params.scratchAccessor.pSegment = pDatabase->getTempSegment();
00181 params.scratchAccessor.pCacheAccessor = params.pCacheAccessor;
00182 }
00183 assert(streamDef.isMultipass());
00184 params.readerRefCountParamId =
00185 readDynamicParamId(streamDef.getReaderRefCountParamId());
00186 embryo.init(new SegBufferWriterExecStream(), params);
00187 }
00188
00189 void ExecStreamFactory::visit(ProxyBufferReaderStreamDef &streamDef)
00190 {
00191 SegBufferReaderExecStreamParams params;
00192 readTupleStreamParams(params, streamDef);
00193 if (!streamDef.isInMemory()) {
00194 params.scratchAccessor.pSegment = pDatabase->getTempSegment();
00195 params.scratchAccessor.pCacheAccessor = params.pCacheAccessor;
00196 }
00197 assert(streamDef.isMultipass());
00198 params.readerRefCountParamId =
00199 readDynamicParamId(streamDef.getReaderRefCountParamId());
00200 embryo.init(new SegBufferReaderExecStream(), params);
00201 }
00202
00203 void ExecStreamFactory::visit(ProxyCartesianProductStreamDef &streamDef)
00204 {
00205 CartesianJoinExecStreamParams params;
00206 readTupleStreamParams(params, streamDef);
00207 params.leftOuter = streamDef.isLeftOuter();
00208 embryo.init(new CartesianJoinExecStream(), params);
00209 }
00210
00211 void ExecStreamFactory::visit(ProxyIndexLoaderDef &streamDef)
00212 {
00213 BTreeInsertExecStreamParams params;
00214 readTupleStreamParams(params, streamDef);
00215 readBTreeStreamParams(params, streamDef);
00216 params.distinctness = streamDef.getDistinctness();
00217 params.monotonic = streamDef.isMonotonic();
00218 embryo.init(new BTreeInsertExecStream(), params);
00219 }
00220
00221 void ExecStreamFactory::visit(ProxyIndexScanDef &streamDef)
00222 {
00223 BTreeScanExecStreamParams params;
00224 readBTreeReadStreamParams(params, streamDef);
00225 embryo.init(
00226 new BTreeScanExecStream(),
00227 params);
00228 }
00229
00230 void ExecStreamFactory::visit(ProxyIndexSearchDef &streamDef)
00231 {
00232 assert(!(streamDef.isUniqueKey() && streamDef.isPrefetch()));
00233 if (streamDef.isPrefetch()) {
00234 BTreePrefetchSearchExecStreamParams params;
00235 initBTreePrefetchSearchParams(params, streamDef);
00236 embryo.init(
00237 new BTreePrefetchSearchExecStream(),
00238 params);
00239 } else {
00240 BTreeSearchExecStreamParams params;
00241 readBTreeSearchStreamParams(params, streamDef);
00242 embryo.init(
00243 streamDef.isUniqueKey()
00244 ? new BTreeSearchUniqueExecStream() : new BTreeSearchExecStream(),
00245 params);
00246 }
00247 }
00248
00249 void ExecStreamFactory::initBTreePrefetchSearchParams(
00250 BTreePrefetchSearchExecStreamParams ¶ms,
00251 ProxyIndexSearchDef &streamDef)
00252 {
00253 readBTreeSearchStreamParams(params, streamDef);
00254
00255
00256 createPrivateScratchSegment(params);
00257 }
00258
00259 void ExecStreamFactory::visit(ProxyJavaSinkStreamDef &streamDef)
00260 {
00261 JavaSinkExecStreamParams params;
00262 readExecStreamParams(params, streamDef);
00263 params.pStreamGraphHandle = pStreamGraphHandle;
00264 params.javaFennelPipeTupleIterId = streamDef.getStreamId();
00265 embryo.init(new JavaSinkExecStream(), params);
00266 }
00267
00268 void ExecStreamFactory::visit(ProxyJavaTransformStreamDef &streamDef)
00269 {
00270 JavaTransformExecStreamParams params;
00271
00272 readExecStreamParams(params, streamDef);
00273
00274 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00275
00276 params.pStreamGraphHandle = pStreamGraphHandle;
00277 params.javaClassName = streamDef.getJavaClassName();
00278 embryo.init(new JavaTransformExecStream(), params);
00279 }
00280
00281 void ExecStreamFactory::visit(ProxyMergeStreamDef &streamDef)
00282 {
00283 MergeExecStreamParams params;
00284 readTupleStreamParams(params, streamDef);
00285 if (!streamDef.isSequential()) {
00286 params.isParallel = true;
00287 }
00288
00289 assert(!streamDef.isPrePullInputs());
00290 embryo.init(new MergeExecStream(), params);
00291 }
00292
00293 void ExecStreamFactory::visit(ProxyMockTupleStreamDef &streamDef)
00294 {
00295 MockProducerExecStreamParams params;
00296 readTupleStreamParams(params, streamDef);
00297 params.nRows = streamDef.getRowCount();
00298 embryo.init(new MockProducerExecStream(), params);
00299 }
00300
00301 void ExecStreamFactory::visit(ProxyTableDeleterDef &streamDef)
00302 {
00303 FtrsTableWriterExecStreamParams params;
00304 params.actionType = FtrsTableWriter::ACTION_DELETE;
00305 readTableWriterStreamParams(params, streamDef);
00306 embryo.init(new FtrsTableWriterExecStream(), params);
00307 }
00308
00309 void ExecStreamFactory::visit(ProxyTableInserterDef &streamDef)
00310 {
00311 FtrsTableWriterExecStreamParams params;
00312 params.actionType = FtrsTableWriter::ACTION_INSERT;
00313 readTableWriterStreamParams(params, streamDef);
00314 embryo.init(new FtrsTableWriterExecStream(), params);
00315 }
00316
00317 void ExecStreamFactory::visit(ProxyTableUpdaterDef &streamDef)
00318 {
00319 FtrsTableWriterExecStreamParams params;
00320 params.actionType = FtrsTableWriter::ACTION_UPDATE;
00321 SharedProxyTupleProjection pUpdateProj = streamDef.getUpdateProj();
00322 CmdInterpreter::readTupleProjection(
00323 params.updateProj,
00324 pUpdateProj);
00325 readTableWriterStreamParams(params, streamDef);
00326 embryo.init(new FtrsTableWriterExecStream(), params);
00327 }
00328
00329 void ExecStreamFactory::visit(ProxySortedAggStreamDef &streamDef)
00330 {
00331 SortedAggExecStreamParams params;
00332 readAggStreamParams(params, streamDef);
00333 embryo.init(new SortedAggExecStream(), params);
00334 }
00335
00336 void ExecStreamFactory::implementSortWithBTree(ProxySortingStreamDef &streamDef)
00337 {
00338 BTreeSortExecStreamParams params;
00339 readTupleStreamParams(params,streamDef);
00340 params.distinctness = streamDef.getDistinctness();
00341 params.monotonic = false;
00342 params.pSegment = pDatabase->getTempSegment();
00343 params.rootPageId = NULL_PAGE_ID;
00344 params.segmentId = Database::TEMP_SEGMENT_ID;
00345 params.pageOwnerId = ANON_PAGE_OWNER_ID;
00346 params.pRootMap = NULL;
00347 params.rootPageIdParamId = DynamicParamId(0);
00348 CmdInterpreter::readTupleProjection(
00349 params.keyProj,
00350 streamDef.getKeyProj());
00351
00352
00353 params.tupleDesc = params.outputTupleDesc;
00354 embryo.init(new BTreeSortExecStream(), params);
00355 }
00356
00357 void ExecStreamFactory::visit(ProxySplitterStreamDef &streamDef)
00358 {
00359 SplitterExecStreamParams params;
00360 readExecStreamParams(params, streamDef);
00361 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00362 embryo.init(new SplitterExecStream(), params);
00363 }
00364
00365
00366 void ExecStreamFactory::visit(ProxyValuesStreamDef &streamDef)
00367 {
00368 ValuesExecStreamParams params;
00369 readTupleStreamParams(params, streamDef);
00370
00371
00372 jobject tupleBytesBase64 = streamDef.pEnv->CallObjectMethod(
00373 streamDef.jObject, ProxyValuesStreamDef::meth_getTupleBytesBase64);
00374
00375
00376 jbyteArray jbytes = (jbyteArray) streamDef.pEnv->CallStaticObjectMethod(
00377 JniUtil::classRhBase64,
00378 JniUtil::methBase64Decode,
00379 tupleBytesBase64);
00380
00381
00382 params.bufSize = streamDef.pEnv->GetArrayLength(jbytes);
00383 params.pTupleBuffer.reset(new FixedBuffer[params.bufSize]);
00384 streamDef.pEnv->GetByteArrayRegion(
00385 jbytes, 0, params.bufSize,
00386 reinterpret_cast<jbyte *>(params.pTupleBuffer.get()));
00387
00388 embryo.init(new ValuesExecStream(), params);
00389 }
00390
00391 void ExecStreamFactory::visit(ProxyReshapeStreamDef &streamDef)
00392 {
00393 ReshapeExecStreamParams params;
00394 readTupleStreamParams(params, streamDef);
00395
00396 params.compOp = streamDef.getCompareOp();
00397 if (params.compOp != COMP_NOOP) {
00398
00399 jobject tupleBytesBase64 = streamDef.pEnv->CallObjectMethod(
00400 streamDef.jObject,
00401 ProxyReshapeStreamDef::meth_getTupleCompareBytesBase64);
00402
00403
00404 jbyteArray jbytes = (jbyteArray) streamDef.pEnv->CallStaticObjectMethod(
00405 JniUtil::classRhBase64,
00406 JniUtil::methBase64Decode,
00407 tupleBytesBase64);
00408
00409
00410 int bufSize = streamDef.pEnv->GetArrayLength(jbytes);
00411 params.pCompTupleBuffer.reset(new FixedBuffer[bufSize]);
00412 streamDef.pEnv->GetByteArrayRegion(
00413 jbytes, 0, bufSize,
00414 reinterpret_cast<jbyte *>(params.pCompTupleBuffer.get()));
00415
00416 CmdInterpreter::readTupleProjection(
00417 params.inputCompareProj, streamDef.getInputCompareProjection());
00418 }
00419
00420 CmdInterpreter::readTupleProjection(
00421 params.outputProj, streamDef.getOutputProjection());
00422
00423 SharedProxyReshapeParameter dynamicParam = streamDef.getReshapeParameter();
00424 for (; dynamicParam; ++dynamicParam) {
00425 int offset = dynamicParam->getCompareOffset();
00426 ReshapeParameter reshapeParam(
00427 DynamicParamId(dynamicParam->getDynamicParamId()),
00428 (offset < 0) ? MAXU : uint(offset),
00429 dynamicParam->isOutputParam());
00430 params.dynamicParameters.push_back(reshapeParam);
00431 }
00432
00433 embryo.init(new ReshapeExecStream(), params);
00434 }
00435
00436 void ExecStreamFactory::visit(ProxyNestedLoopJoinStreamDef &streamDef)
00437 {
00438 NestedLoopJoinExecStreamParams params;
00439 readTupleStreamParams(params, streamDef);
00440 params.leftOuter = streamDef.isLeftOuter();
00441
00442 SharedProxyCorrelation dynamicParam = streamDef.getLeftJoinKey();
00443 for (; dynamicParam; ++dynamicParam) {
00444 NestedLoopJoinKey joinKey(
00445 DynamicParamId(dynamicParam->getId()),
00446 dynamicParam->getOffset());
00447 params.leftJoinKeys.push_back(joinKey);
00448 }
00449
00450 embryo.init(new NestedLoopJoinExecStream(), params);
00451 }
00452
00453 void ExecStreamFactory::visit(ProxyBernoulliSamplingStreamDef &streamDef)
00454 {
00455 BernoulliSamplingExecStreamParams params;
00456 readTupleStreamParams(params, streamDef);
00457
00458 params.samplingRate = streamDef.getSamplingRate();
00459 params.isRepeatable = streamDef.isRepeatable();
00460 params.repeatableSeed = streamDef.getRepeatableSeed();
00461
00462 embryo.init(new BernoulliSamplingExecStream(), params);
00463 }
00464
00465 void ExecStreamFactory::visit(ProxyCalcTupleStreamDef &streamDef)
00466 {
00467 CalcExecStreamParams params;
00468 readTupleStreamParams(params, streamDef);
00469 params.program = streamDef.getProgram();
00470 params.isFilter = streamDef.isFilter();
00471 embryo.init(
00472 new CalcExecStream(),
00473 params);
00474 }
00475
00476 void ExecStreamFactory::visit(ProxyCorrelationJoinStreamDef &streamDef)
00477 {
00478 CorrelationJoinExecStreamParams params;
00479 readTupleStreamParams(params, streamDef);
00480 SharedProxyCorrelation pCorrelation = streamDef.getCorrelations();
00481 for (; pCorrelation; ++pCorrelation) {
00482 Correlation correlation(
00483 DynamicParamId(pCorrelation->getId()),
00484 pCorrelation->getOffset());
00485 params.correlations.push_back(correlation);
00486 }
00487 embryo.init(new CorrelationJoinExecStream(), params);
00488 }
00489
00490 void ExecStreamFactory::visit(ProxyCollectTupleStreamDef &streamDef)
00491 {
00492 CollectExecStreamParams params;
00493 readTupleStreamParams(params, streamDef);
00494 embryo.init(new CollectExecStream(), params);
00495 }
00496
00497 void ExecStreamFactory::visit(ProxyUncollectTupleStreamDef &streamDef)
00498 {
00499 UncollectExecStreamParams params;
00500 readTupleStreamParams(params, streamDef);
00501 embryo.init(new UncollectExecStream(), params);
00502 }
00503
00504 void ExecStreamFactory::visit(ProxySortingStreamDef &streamDef)
00505 {
00506 if (streamDef.getDistinctness() != DUP_ALLOW) {
00507
00508 implementSortWithBTree(streamDef);
00509 return;
00510 }
00511
00512 SharedDatabase pDatabase = getDatabase();
00513
00514 ExternalSortExecStreamParams params;
00515
00516 readTupleStreamParams(params, streamDef);
00517
00518
00519 createPrivateScratchSegment(params);
00520
00521 params.distinctness = streamDef.getDistinctness();
00522 params.pTempSegment = pDatabase->getTempSegment();
00523 params.storeFinalRun = false;
00524 params.estimatedNumRows = streamDef.getEstimatedNumRows();
00525 params.earlyClose = streamDef.isEarlyClose();
00526 CmdInterpreter::readTupleProjection(
00527 params.keyProj,
00528 streamDef.getKeyProj());
00529 params.descendingKeyColumns.resize(params.keyProj.size(), false);
00530 if (streamDef.getDescendingProj()) {
00531 TupleProjection descendingProj;
00532 CmdInterpreter::readTupleProjection(
00533 descendingProj,
00534 streamDef.getDescendingProj());
00535 for (uint i = 0; i < descendingProj.size(); ++i) {
00536 params.descendingKeyColumns[descendingProj[i]] = true;
00537 }
00538 }
00539 embryo.init(
00540 ExternalSortExecStream::newExternalSortExecStream(),
00541 params);
00542 }
00543
00544 char ExecStreamFactory::readCharParam(const std::string &val)
00545 {
00546 assert(val.size() <= 1);
00547 if (val.size() == 0) {
00548 return 0;
00549 }
00550 return val.at(0);
00551 }
00552
00553 void ExecStreamFactory::visit(ProxyFlatFileTupleStreamDef &streamDef)
00554 {
00555 FlatFileExecStreamParams params;
00556 readTupleStreamParams(params, streamDef);
00557
00558 assert(streamDef.getDataFilePath().size() > 0);
00559 params.dataFilePath = streamDef.getDataFilePath();
00560 params.errorFilePath = streamDef.getErrorFilePath();
00561 params.fieldDelim = readCharParam(streamDef.getFieldDelimiter());
00562 params.rowDelim = readCharParam(streamDef.getRowDelimiter());
00563 params.quoteChar = readCharParam(streamDef.getQuoteCharacter());
00564 params.escapeChar = readCharParam(streamDef.getEscapeCharacter());
00565 params.header = streamDef.isHasHeader();
00566 params.lenient = streamDef.isLenient();
00567 params.trim = streamDef.isTrim();
00568 params.mapped = streamDef.isMapped();
00569 readColumnList(streamDef, params.columnNames);
00570
00571 params.numRowsScan = streamDef.getNumRowsScan();
00572 params.calcProgram = streamDef.getCalcProgram();
00573 if (params.numRowsScan > 0 && params.calcProgram.size() > 0) {
00574 params.mode = FLATFILE_MODE_SAMPLE;
00575 } else if (params.numRowsScan > 0) {
00576 params.mode = FLATFILE_MODE_DESCRIBE;
00577 } else if (params.numRowsScan == 0 && params.calcProgram.size() == 0) {
00578 params.mode = FLATFILE_MODE_QUERY_TEXT;
00579 }
00580 embryo.init(FlatFileExecStream::newFlatFileExecStream(), params);
00581 }
00582
00583 void ExecStreamFactory::visit(ProxyLhxJoinStreamDef &streamDef)
00584 {
00585 TupleProjection tmpProj;
00586
00587 LhxJoinExecStreamParams params;
00588 readTupleStreamParams(params, streamDef);
00589
00590
00591
00592
00593 createPrivateScratchSegment(params);
00594
00595
00596
00597
00598 SharedDatabase pDatabase = getDatabase();
00599 params.pTempSegment = pDatabase->getTempSegment();
00600
00601
00602
00603
00604
00605 params.leftInner = streamDef.isLeftInner();
00606 params.leftOuter = streamDef.isLeftOuter();
00607 params.rightInner = streamDef.isRightInner();
00608 params.rightOuter = streamDef.isRightOuter();
00609 params.setopDistinct = streamDef.isSetopDistinct();
00610 params.setopAll = streamDef.isSetopAll();
00611
00612
00613
00614
00615 params.forcePartitionLevel = 0;
00616 params.enableJoinFilter = true;
00617 params.enableSubPartStat = true;
00618 params.enableSwing = true;
00619
00620 CmdInterpreter::readTupleProjection(
00621 params.leftKeyProj, streamDef.getLeftKeyProj());
00622
00623 CmdInterpreter::readTupleProjection(
00624 params.rightKeyProj, streamDef.getRightKeyProj());
00625
00626 CmdInterpreter::readTupleProjection(
00627 params.filterNullKeyProj, streamDef.getFilterNullProj());
00628
00629
00630
00631
00632 params.cndKeys = streamDef.getCndBuildKeys();
00633 params.numRows = streamDef.getNumBuildRows();
00634
00635 embryo.init(new LhxJoinExecStream(), params);
00636 }
00637
00638 void ExecStreamFactory::visit(ProxyLhxAggStreamDef &streamDef)
00639 {
00640 LhxAggExecStreamParams params;
00641 readAggStreamParams(params, streamDef);
00642
00643
00644
00645
00646 createPrivateScratchSegment(params);
00647
00648
00649
00650
00651 SharedDatabase pDatabase = getDatabase();
00652 params.pTempSegment = pDatabase->getTempSegment();
00653
00654
00655
00656
00657 params.cndGroupByKeys = streamDef.getCndGroupByKeys();
00658 params.numRows = streamDef.getNumRows();
00659
00660
00661
00662
00663 params.forcePartitionLevel = 0;
00664
00665
00666
00667
00668
00669
00670
00671
00672 params.enableSubPartStat = false;
00673
00674 embryo.init(new LhxAggExecStream(), params);
00675 }
00676
00677 void ExecStreamFactory::readColumnList(
00678 ProxyFlatFileTupleStreamDef &streamDef,
00679 std::vector<std::string> &names)
00680 {
00681 SharedProxyColumnName pColumnName = streamDef.getColumn();
00682
00683 for (; pColumnName; ++pColumnName) {
00684 names.push_back(pColumnName->getName());
00685 }
00686 }
00687
00688 void ExecStreamFactory::readExecStreamParams(
00689 ExecStreamParams ¶ms,
00690 ProxyExecutionStreamDef &streamDef)
00691 {
00692 createQuotaAccessors(params);
00693 }
00694
00695 void ExecStreamFactory::readTupleDescriptor(
00696 TupleDescriptor& desc,
00697 SharedProxyTupleDescriptor def)
00698 {
00699 assert(def);
00700 CmdInterpreter::readTupleDescriptor(
00701 desc, *def, pDatabase->getTypeFactory());
00702 }
00703
00704 void ExecStreamFactory::readTupleStreamParams(
00705 SingleOutputExecStreamParams ¶ms,
00706 ProxyTupleStreamDef &streamDef)
00707 {
00708 readExecStreamParams(params,streamDef);
00709 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc());
00710 }
00711
00712 void ExecStreamFactory::createPrivateScratchSegment(ExecStreamParams ¶ms)
00713 {
00714
00715 assert(params.pCacheAccessor);
00716
00717 params.scratchAccessor =
00718 pDatabase->getSegmentFactory()->newScratchSegment(
00719 pDatabase->getCache());
00720 SharedQuotaCacheAccessor pSuperQuotaAccessor =
00721 boost::dynamic_pointer_cast<QuotaCacheAccessor>(
00722 params.pCacheAccessor);
00723 params.scratchAccessor.pCacheAccessor.reset(
00724 new QuotaCacheAccessor(
00725 pSuperQuotaAccessor,
00726 params.scratchAccessor.pCacheAccessor,
00727 UINT_MAX));
00728 }
00729
00730 void ExecStreamFactory::createQuotaAccessors(
00731 ExecStreamParams ¶ms)
00732 {
00733 assert(pGraphEmbryo);
00734 pGraphEmbryo->initStreamParams(params);
00735 }
00736
00737 void ExecStreamFactory::readTableWriterStreamParams(
00738 FtrsTableWriterExecStreamParams ¶ms,
00739 ProxyTableWriterDef &streamDef)
00740 {
00741 readTupleStreamParams(params, streamDef);
00742 params.pTableWriterFactory = pTableWriterFactory;
00743 params.tableId = ANON_PAGE_OWNER_ID;
00744 params.pActionMutex = &(pDatabase->getCheckpointThread()->getActionMutex());
00745
00746 SharedProxyIndexWriterDef pIndexWriterDef = streamDef.getIndexWriter();
00747 for (; pIndexWriterDef; ++pIndexWriterDef) {
00748 FtrsTableIndexWriterParams indexParams;
00749
00750 indexParams.pCacheAccessor = params.pCacheAccessor;
00751 indexParams.scratchAccessor = params.scratchAccessor;
00752 readIndexWriterParams(indexParams, *pIndexWriterDef);
00753 SharedProxyTupleProjection pInputProj =
00754 pIndexWriterDef->getInputProj();
00755 if (pInputProj) {
00756 CmdInterpreter::readTupleProjection(
00757 indexParams.inputProj,
00758 pInputProj);
00759 } else {
00760
00761 params.tableId = indexParams.pageOwnerId;
00762 }
00763 params.indexParams.push_back(indexParams);
00764 }
00765 assert(params.tableId != ANON_PAGE_OWNER_ID);
00766 }
00767
00768 void ExecStreamFactory::readBTreeStreamParams(
00769 BTreeExecStreamParams ¶ms,
00770 ProxyIndexAccessorDef &streamDef)
00771 {
00772 assert(params.pCacheAccessor);
00773 readBTreeParams(params, streamDef);
00774 }
00775
00776 void ExecStreamFactory::readBTreeParams(
00777 BTreeParams ¶ms,
00778 ProxyIndexAccessorDef &streamDef)
00779 {
00780 params.rootPageIdParamId =
00781 readDynamicParamId(streamDef.getRootPageIdParamId());
00782 if (params.rootPageIdParamId > DynamicParamId(0) &&
00783 streamDef.getRootPageId() == -1)
00784 {
00785
00786
00787 params.segmentId = Database::TEMP_SEGMENT_ID;
00788 params.pageOwnerId = ANON_PAGE_OWNER_ID;
00789 params.pSegment = pDatabase->getTempSegment();
00790 params.rootPageId = NULL_PAGE_ID;
00791 params.pRootMap = NULL;
00792 } else {
00793 params.segmentId = SegmentId(streamDef.getSegmentId());
00794 params.pageOwnerId = PageOwnerId(streamDef.getIndexId());
00795 assert(VALID_PAGE_OWNER_ID(params.pageOwnerId));
00796
00797
00798
00799 if (streamDef.isReadOnlyCommittedData()) {
00800 params.pSegment =
00801 pDatabase->getSegmentById(
00802 params.segmentId,
00803 pStreamGraphHandle->pReadCommittedSegment);
00804 } else {
00805 params.pSegment =
00806 pDatabase->getSegmentById(
00807 params.segmentId,
00808 pStreamGraphHandle->pSegment);
00809 }
00810 if (streamDef.getRootPageId() != -1) {
00811 params.rootPageId = PageId(streamDef.getRootPageId());
00812 params.pRootMap = NULL;
00813 } else {
00814 params.rootPageId = NULL_PAGE_ID;
00815 if (params.rootPageIdParamId == DynamicParamId(0)) {
00816 params.pRootMap = pStreamGraphHandle;
00817 }
00818 }
00819 }
00820 readTupleDescriptor(params.tupleDesc, streamDef.getTupleDesc());
00821 CmdInterpreter::readTupleProjection(
00822 params.keyProj,
00823 streamDef.getKeyProj());
00824
00825 }
00826
00827 DynamicParamId ExecStreamFactory::readDynamicParamId(const int val)
00828 {
00829
00830 uint (id) = (val < 0) ? 0 : (uint) val;
00831 return (DynamicParamId) id;
00832 }
00833
00834 void ExecStreamFactory::readBTreeReadStreamParams(
00835 BTreeReadExecStreamParams ¶ms,
00836 ProxyIndexScanDef &streamDef)
00837 {
00838 readTupleStreamParams(params, streamDef);
00839 readBTreeStreamParams(params, streamDef);
00840 CmdInterpreter::readTupleProjection(
00841 params.outputProj,
00842 streamDef.getOutputProj());
00843 }
00844
00845 void ExecStreamFactory::readIndexWriterParams(
00846 FtrsTableIndexWriterParams ¶ms,
00847 ProxyIndexWriterDef &indexWriterDef)
00848 {
00849 readBTreeStreamParams(params, indexWriterDef);
00850 params.distinctness = indexWriterDef.getDistinctness();
00851 params.updateInPlace = indexWriterDef.isUpdateInPlace();
00852 }
00853
00854 void ExecStreamFactory::readBTreeSearchStreamParams(
00855 BTreeSearchExecStreamParams ¶ms,
00856 ProxyIndexSearchDef &streamDef)
00857 {
00858 readBTreeReadStreamParams(params, streamDef);
00859 params.outerJoin = streamDef.isOuterJoin();
00860 if (streamDef.getInputKeyProj()) {
00861 CmdInterpreter::readTupleProjection(
00862 params.inputKeyProj,
00863 streamDef.getInputKeyProj());
00864 }
00865 if (streamDef.getInputJoinProj()) {
00866 CmdInterpreter::readTupleProjection(
00867 params.inputJoinProj,
00868 streamDef.getInputJoinProj());
00869 }
00870 if (streamDef.getInputDirectiveProj()) {
00871 CmdInterpreter::readTupleProjection(
00872 params.inputDirectiveProj,
00873 streamDef.getInputDirectiveProj());
00874 }
00875
00876 SharedProxyCorrelation dynamicParam = streamDef.getSearchKeyParameter();
00877 for (; dynamicParam; ++dynamicParam) {
00878 BTreeSearchKeyParameter searchKeyParam(
00879 DynamicParamId(dynamicParam->getId()),
00880 dynamicParam->getOffset());
00881 params.searchKeyParams.push_back(searchKeyParam);
00882 }
00883 }
00884
00885 void ExecStreamFactory::readAggStreamParams(
00886 SortedAggExecStreamParams ¶ms,
00887 ProxyAggStreamDef &streamDef)
00888 {
00889 readTupleStreamParams(params,streamDef);
00890 SharedProxyAggInvocation pAggInvocation = streamDef.getAggInvocation();
00891 for (; pAggInvocation; ++pAggInvocation) {
00892 AggInvocation aggInvocation;
00893 aggInvocation.aggFunction = pAggInvocation->getFunction();
00894 aggInvocation.iInputAttr =
00895 pAggInvocation->getInputAttributeIndex();
00896 params.aggInvocations.push_back(aggInvocation);
00897 }
00898 params.groupByKeyCount = streamDef.getGroupingPrefixSize();
00899 }
00900
00901 ExecStreamSubFactory::~ExecStreamSubFactory()
00902 {
00903 }
00904
00905 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $");
00906
00907