00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/tuple/StandardTypeDescriptor.h"
00024 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00025 #include "fennel/exec/ExecStreamBufAccessor.h"
00026 #include "fennel/common/SearchEndpoint.h"
00027 #include <math.h>
00028
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $");
00030
00031 int32_t LcsRowScanExecStreamParams::defaultSystemSamplingClumps = 10;
00032
00033 LcsRowScanExecStream::LcsRowScanExecStream()
00034 :
00035 LcsRowScanBaseExecStream(),
00036 ridRunIter(&ridRuns)
00037 {
00038 ridRuns.resize(4000);
00039 }
00040
00041 void LcsRowScanExecStream::prepareResidualFilters(
00042 LcsRowScanExecStreamParams const ¶ms)
00043 {
00044 nFilters = params.residualFilterCols.size();
00045
00046
00047
00048
00049 VectorOfUint valueCols;
00050 uint j, k = 0;
00051 for (uint i = 0; i < nFilters; i++) {
00052 for (j = 0; j < params.outputProj.size(); j++) {
00053 if (params.outputProj[j] == params.residualFilterCols[i]) {
00054 valueCols.push_back(j);
00055 break;
00056 }
00057 }
00058
00059 if (j >= params.outputProj.size()) {
00060 valueCols.push_back(params.outputProj.size() + k);
00061 k++;
00062 }
00063 }
00064
00065
00066
00067
00068 uint valueClus;
00069 uint clusterPos;
00070 uint clusterStart = 0;
00071 uint realClusterStart = 0;
00072
00073 filters.reset(new PLcsResidualColumnFilters[nFilters]);
00074
00075 for (uint i = 0; i < nClusters; i++) {
00076 uint clusterEnd = clusterStart +
00077 params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1;
00078
00079 for (uint j = 0; j < nFilters; j++) {
00080 if (params.residualFilterCols[j] >= clusterStart &&
00081 params.residualFilterCols[j] <= clusterEnd)
00082 {
00083 valueClus = i;
00084
00085
00086
00087
00088 for (uint k = 0; k < projMap.size(); k++) {
00089 if (projMap[k] == valueCols[j]) {
00090 clusterPos = k - realClusterStart -
00091 nonClusterCols.size();
00092
00093 LcsResidualColumnFilters &filter =
00094 pClusters[valueClus]->
00095 clusterCols[clusterPos].
00096 getFilters();
00097
00098 filters[j] = &filter;
00099
00100 filter.hasResidualFilters = true;
00101
00102 filter.readerKeyProj.push_back(valueCols[j]);
00103 filter.inputKeyDesc.projectFrom(
00104 projDescriptor,
00105 filter.readerKeyProj);
00106 filter.attrAccessor.compute(
00107 filter.inputKeyDesc[0]);
00108
00109 filter.lowerBoundProj.push_back(1);
00110 filter.upperBoundProj.push_back(3);
00111 filter.readerKeyData.computeAndAllocate(
00112 filter.inputKeyDesc);
00113
00114 break;
00115 }
00116 }
00117
00118 }
00119 }
00120
00121 clusterStart = clusterEnd + 1;
00122 realClusterStart += pClusters[i]->nColsToRead;
00123 }
00124 }
00125
00126 void LcsRowScanExecStream::prepare(LcsRowScanExecStreamParams const ¶ms)
00127 {
00128 LcsRowScanBaseExecStream::prepare(params);
00129
00130 isFullScan = params.isFullScan;
00131 hasExtraFilter = params.hasExtraFilter;
00132
00133
00134 ridTupleData.compute(inAccessors[0]->getTupleDesc());
00135
00136
00137 TupleDescriptor inputDesc = inAccessors[0]->getTupleDesc();
00138 assert(inputDesc.size() == 3);
00139 StandardTypeDescriptorFactory stdTypeFactory;
00140 TupleAttributeDescriptor expectedRidDesc(
00141 stdTypeFactory.newDataType(STANDARD_TYPE_RECORDNUM));
00142 assert(inputDesc[0] == expectedRidDesc);
00143
00144 assert(hasExtraFilter == (inAccessors.size() > 1));
00145
00146 if (hasExtraFilter) {
00147 prepareResidualFilters(params);
00148 } else {
00149 nFilters = 0;
00150 }
00151
00152
00153
00154
00155 for (uint i = 0; i < params.outputProj.size(); i++) {
00156 outputProj.push_back(i);
00157 }
00158
00159 pOutAccessor->setTupleShape(pOutAccessor->getTupleDesc());
00160 outputTupleData.computeAndAllocate(projDescriptor);
00161
00162
00163
00164
00165
00166
00167 projOutputTupleData.compute(pOutAccessor->getTupleDesc());
00168
00169 attrAccessors.resize(projDescriptor.size());
00170 for (uint i = 0; i < projDescriptor.size(); ++i) {
00171 attrAccessors[i].compute(projDescriptor[i]);
00172 }
00173
00174
00175 samplingMode = params.samplingMode;
00176
00177 if (samplingMode != SAMPLING_OFF) {
00178 samplingRate = params.samplingRate;
00179 rowCount = params.samplingRowCount;
00180
00181 if (samplingMode == SAMPLING_BERNOULLI) {
00182 isSamplingRepeatable = params.samplingIsRepeatable;
00183 repeatableSeed = params.samplingRepeatableSeed;
00184 samplingClumps = -1;
00185
00186 samplingRng.reset(new BernoulliRng(samplingRate));
00187 } else {
00188 assert(isFullScan);
00189
00190 samplingClumps = params.samplingClumps;
00191 assert(samplingClumps > 0);
00192
00193 isSamplingRepeatable = false;
00194 }
00195 }
00196 }
00197
00198 void LcsRowScanExecStream::open(bool restart)
00199 {
00200 LcsRowScanBaseExecStream::open(restart);
00201 producePending = false;
00202 tupleFound = false;
00203 nRidsRead = 0;
00204 ridRunsBuilt = false;
00205 currRidRun.startRid = LcsRid(MAXU);
00206 currRidRun.nRids = 0;
00207 ridRuns.clear();
00208 ridRunIter.reset();
00209
00210 if (isFullScan) {
00211 inputRid = LcsRid(0);
00212 readDeletedRid = true;
00213 deletedRidEos = false;
00214 }
00215 nextRid = LcsRid(0);
00216 ridReader.init(inAccessors[0], ridTupleData);
00217
00218
00219
00220
00221
00222
00223
00224
00225 if (!restart) {
00226 iFilterToInitialize = 0;
00227 } else if (iFilterToInitialize < nFilters) {
00228 if (!filters[iFilterToInitialize]->filterDataInitialized) {
00229 filters[iFilterToInitialize]->filterData.clear();
00230 }
00231 }
00232
00233 if (samplingMode == SAMPLING_BERNOULLI) {
00234 if (isSamplingRepeatable) {
00235 samplingRng->reseed(repeatableSeed);
00236 } else if (!restart) {
00237 samplingRng->reseed(static_cast<uint32_t>(time(0)));
00238 }
00239 } else if (samplingMode == SAMPLING_SYSTEM) {
00240 clumpSize = 0;
00241 clumpDistance = 0;
00242 clumpPos = 0;
00243 numClumpsBuilt = 0;
00244
00245 initializeSystemSampling();
00246 }
00247 }
00248
00249 void LcsRowScanExecStream::getResourceRequirements(
00250 ExecStreamResourceQuantity &minQuantity,
00251 ExecStreamResourceQuantity &optQuantity)
00252 {
00253 LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity);
00254 }
00255
00256 bool LcsRowScanExecStream::initializeFiltersIfNeeded()
00257 {
00258
00259
00260
00261 for (; iFilterToInitialize < nFilters; iFilterToInitialize++) {
00262 SharedExecStreamBufAccessor &pInAccessor =
00263 inAccessors[iFilterToInitialize + 1];
00264 TupleAccessor &inputAccessor =
00265 pInAccessor->getConsumptionTupleAccessor();
00266
00267 if (pInAccessor->getState() != EXECBUF_EOS) {
00268 PLcsResidualColumnFilters filter = filters[iFilterToInitialize];
00269
00270 while (pInAccessor->demandData()) {
00271 SharedLcsResidualFilter filterData(new LcsResidualFilter);
00272
00273 pInAccessor->accessConsumptionTuple();
00274
00275
00276
00277
00278 filterData->boundData.compute(pInAccessor->getTupleDesc());
00279 filterData->boundBuf.reset(
00280 new FixedBuffer[inputAccessor.getCurrentByteCount()]);
00281
00282 memcpy(
00283 filterData->boundBuf.get(),
00284 pInAccessor->getConsumptionStart(),
00285 inputAccessor.getCurrentByteCount());
00286
00287
00288
00289
00290
00291
00292 PConstBuffer tmpBuf;
00293 tmpBuf = inputAccessor.getCurrentTupleBuf();
00294 inputAccessor.setCurrentTupleBuf(filterData->boundBuf.get());
00295 inputAccessor.unmarshal(filterData->boundData);
00296 inputAccessor.setCurrentTupleBuf(tmpBuf);
00297
00298
00299
00300
00301 filterData->lowerBoundDirective =
00302 SearchEndpoint(*filterData->boundData[0].pData);
00303 filterData->upperBoundDirective =
00304 SearchEndpoint(*filterData->boundData[2].pData);
00305
00306 filter->filterData.push_back(filterData);
00307
00308 pInAccessor->consumeTuple();
00309 }
00310
00311 if (pInAccessor->getState() != EXECBUF_EOS) {
00312 return false;
00313 }
00314 }
00315 filters[iFilterToInitialize]->filterDataInitialized = true;
00316 }
00317 return true;
00318 }
00319
00320
00321 void LcsRowScanExecStream::initializeSystemSampling()
00322 {
00323 clumpPos = 0;
00324 clumpSkipPos = 0;
00325
00326 FENNEL_TRACE(TRACE_FINE, "rowCount = " << rowCount);
00327 FENNEL_TRACE(
00328 TRACE_FINE, "samplingRate = " << static_cast<double>(samplingRate));
00329
00330 if (rowCount <= 0) {
00331
00332 clumpSize = 1;
00333 clumpDistance = 0;
00334 numClumps = 0;
00335 return;
00336 }
00337
00338
00339
00340 numClumps = samplingClumps;
00341
00342
00343 int64_t sampleSize =
00344 static_cast<uint64_t>(
00345 round(
00346 static_cast<double>(rowCount) *
00347 static_cast<double>(samplingRate)));
00348 if (sampleSize < numClumps) {
00349
00350
00351 sampleSize = numClumps;
00352 }
00353
00354 if (sampleSize > rowCount) {
00355
00356
00357 sampleSize = rowCount;
00358 numClumps = 1;
00359 }
00360
00361 FENNEL_TRACE(TRACE_FINE, "sampleSize = " << sampleSize);
00362
00363 clumpSize =
00364 static_cast<uint64_t>(
00365 round(
00366 static_cast<double>(sampleSize) /
00367 static_cast<double>(numClumps)));
00368 assert(sampleSize >= clumpSize);
00369 assert(clumpSize >= 1);
00370
00371 FENNEL_TRACE(TRACE_FINE, "clumpSize = " << clumpSize);
00372
00373 if (numClumps > 1) {
00374
00375 clumpDistance =
00376 static_cast<uint64_t>(
00377 round(
00378 static_cast<double>(rowCount - sampleSize) /
00379 static_cast<double>(numClumps - 1)));
00380
00381
00382
00383 uint64_t rowsRequired =
00384 (clumpSize + clumpDistance) * (numClumps - 1) + clumpSize;
00385 if (rowsRequired > rowCount && clumpDistance > 0) {
00386 clumpDistance--;
00387 }
00388 } else {
00389
00390 clumpDistance = (rowCount - sampleSize);
00391 }
00392
00393 FENNEL_TRACE(TRACE_FINE, "clumpDistance = " << clumpDistance);
00394 }
00395
00396
00397 ExecStreamResult LcsRowScanExecStream::execute(ExecStreamQuantum const &quantum)
00398 {
00399 if (!initializeFiltersIfNeeded()) {
00400 return EXECRC_BUF_UNDERFLOW;
00401 }
00402
00403 for (uint i = 0; i < quantum.nTuplesMax; i++) {
00404 uint iClu;
00405 bool passedFilter;
00406
00407 while (!producePending) {
00408
00409 if (!ridRunsBuilt && ridRuns.nFreeSpace() > 100) {
00410 ExecStreamResult rc = fillRidRunBuffer();
00411 if (rc != EXECRC_YIELD) {
00412 return rc;
00413 }
00414 }
00415
00416
00417
00418 LcsRid rid =
00419 LcsClusterReader::getFetchRids(ridRunIter, nextRid, true);
00420 if (rid == LcsRid(MAXU)) {
00421 assert(ridRunIter.done());
00422 pOutAccessor->markEOS();
00423 return EXECRC_EOS;
00424 }
00425
00426 uint prevClusterEnd = 0;
00427
00428 outputTupleData.resetBuffer();
00429
00430
00431 for (uint j = 0; j < nonClusterCols.size(); j++) {
00432 if (nonClusterCols[j] == LCS_RID_COLUMN_ID) {
00433 memcpy(
00434 const_cast<PBuffer>(outputTupleData[projMap[j]].pData),
00435 (PBuffer) &rid, sizeof(LcsRid));
00436 prevClusterEnd++;
00437 } else {
00438 permAssert(false);
00439 }
00440 }
00441
00442
00443 for (iClu = 0, passedFilter = true; iClu < nClusters; iClu++) {
00444 SharedLcsClusterReader &pScan = pClusters[iClu];
00445
00446
00447 pScan->catchUp(ridRunIter.getCurrPos(), nextRid);
00448
00449
00450
00451
00452 if (!pScan->isPositioned() || rid >= pScan->getRangeEndRid()) {
00453 bool rc = pScan->position(rid);
00454
00455
00456
00457 if (rc == false)
00458 break;
00459
00460 assert(rid >= pScan->getRangeStartRid()
00461 && rid < pScan->getRangeEndRid());
00462
00463
00464 syncColumns(pScan);
00465 } else {
00466
00467 assert(rid > pScan->getRangeStartRid());
00468
00469
00470
00471
00472 pScan->advanceWithinBatch(
00473 opaqueToInt(rid - pScan->getCurrentRid()));
00474 }
00475
00476 passedFilter =
00477 readColVals(
00478 pScan,
00479 outputTupleData,
00480 prevClusterEnd);
00481 if (!passedFilter) {
00482 break;
00483 }
00484 prevClusterEnd += pScan->nColsToRead;
00485 }
00486
00487 if (!passedFilter) {
00488 continue;
00489 }
00490 if (iClu == nClusters) {
00491 tupleFound = true;
00492 }
00493 producePending = true;
00494 }
00495
00496
00497 projOutputTupleData.projectFrom(outputTupleData, outputProj);
00498 if (tupleFound) {
00499 if (!pOutAccessor->produceTuple(projOutputTupleData)) {
00500 return EXECRC_BUF_OVERFLOW;
00501 }
00502 }
00503 producePending = false;
00504
00505 if (isFullScan) {
00506
00507 if (!tupleFound) {
00508 pOutAccessor->markEOS();
00509 return EXECRC_EOS;
00510 }
00511 }
00512
00513 tupleFound = false;
00514 nRidsRead++;
00515 }
00516
00517 return EXECRC_QUANTUM_EXPIRED;
00518 }
00519
00520 ExecStreamResult LcsRowScanExecStream::fillRidRunBuffer()
00521 {
00522 ExecStreamResult rc;
00523 RecordNum nRows;
00524
00525 do {
00526 if (!isFullScan) {
00527 rc = ridReader.readRidAndAdvance(inputRid);
00528 if (rc == EXECRC_EOS) {
00529 ridRunsBuilt = true;
00530 break;
00531 }
00532 if (rc != EXECRC_YIELD) {
00533 return rc;
00534 }
00535 nRows = 1;
00536
00537 } else {
00538 if (!deletedRidEos && readDeletedRid) {
00539 rc = ridReader.readRidAndAdvance(deletedRid);
00540 if (rc == EXECRC_EOS) {
00541 deletedRidEos = true;
00542 if (samplingMode == SAMPLING_OFF) {
00543 ridRunsBuilt = true;
00544 } else if (samplingMode == SAMPLING_SYSTEM &&
00545 numClumps == 0)
00546 {
00547 ridRunsBuilt = true;
00548 break;
00549 }
00550 } else if (rc != EXECRC_YIELD) {
00551 return rc;
00552 } else {
00553 readDeletedRid = false;
00554 }
00555 }
00556
00557 if (!deletedRidEos && inputRid == deletedRid) {
00558 inputRid++;
00559 readDeletedRid = true;
00560 continue;
00561 } else {
00562 if (deletedRidEos) {
00563 nRows = MAXU;
00564 } else {
00565 nRows = opaqueToInt(deletedRid - inputRid);
00566 }
00567 }
00568 }
00569
00570 if (samplingMode != SAMPLING_OFF) {
00571 if (samplingMode == SAMPLING_SYSTEM) {
00572 if (clumpSkipPos > 0) {
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585 if (deletedRidEos) {
00586
00587 inputRid += clumpSkipPos;
00588 clumpSkipPos = 0;
00589 } else if (!readDeletedRid) {
00590 if (deletedRid > inputRid + clumpSkipPos) {
00591
00592 inputRid += clumpSkipPos;
00593 clumpSkipPos = 0;
00594 nRows = opaqueToInt(deletedRid - inputRid);
00595 } else {
00596
00597 clumpSkipPos -= opaqueToInt(deletedRid - inputRid);
00598 inputRid = deletedRid;
00599 continue;
00600 }
00601 } else {
00602
00603 clumpSkipPos--;
00604 inputRid++;
00605 continue;
00606 }
00607 }
00608
00609 if (nRows >= clumpSize - clumpPos) {
00610
00611
00612 nRows = clumpSize - clumpPos;
00613 clumpPos = 0;
00614 clumpSkipPos = clumpDistance;
00615 if (++numClumpsBuilt == numClumps) {
00616 ridRunsBuilt = true;
00617 }
00618 } else {
00619
00620 clumpPos += nRows;
00621 }
00622 } else {
00623
00624 if (opaqueToInt(inputRid) >= opaqueToInt(rowCount)) {
00625 ridRunsBuilt = true;
00626 break;
00627 }
00628 if (!samplingRng->nextValue()) {
00629 inputRid++;
00630 continue;
00631 }
00632 nRows = 1;
00633 }
00634 }
00635
00636 if (currRidRun.startRid == LcsRid(MAXU)) {
00637 currRidRun.startRid = inputRid;
00638 currRidRun.nRids = nRows;
00639 } else if (currRidRun.startRid + currRidRun.nRids == inputRid) {
00640
00641
00642 if (nRows == RecordNum(MAXU)) {
00643 currRidRun.nRids = MAXU;
00644 } else {
00645 currRidRun.nRids += nRows;
00646 }
00647 } else {
00648
00649 ridRuns.push_back(currRidRun);
00650
00651
00652 currRidRun.startRid = inputRid;
00653 currRidRun.nRids = nRows;
00654 }
00655
00656 if (isFullScan) {
00657 inputRid += nRows;
00658 }
00659 } while (ridRuns.spaceAvailable() && !ridRunsBuilt);
00660
00661
00662 if (ridRunsBuilt && currRidRun.startRid != LcsRid(MAXU)) {
00663 ridRuns.push_back(currRidRun);
00664 }
00665
00666 if (ridRunsBuilt) {
00667 ridRuns.setReadOnly();
00668 }
00669 return EXECRC_YIELD;
00670 }
00671
00672 void LcsRowScanExecStream::closeImpl()
00673 {
00674 LcsRowScanBaseExecStream::closeImpl();
00675
00676 for (uint i = 0; i < nFilters; i++) {
00677 filters[i]->filterData.clear();
00678 }
00679 }
00680
00681 void LcsRowScanExecStream::buildOutputProj(
00682 TupleProjection &outputProj,
00683 LcsRowScanBaseExecStreamParams const ¶ms)
00684 {
00685 LcsRowScanExecStreamParams const &rowScanParams =
00686 dynamic_cast<const LcsRowScanExecStreamParams&>(params);
00687
00688
00689
00690
00691 for (uint i = 0; i < rowScanParams.outputProj.size(); i++) {
00692 outputProj.push_back(rowScanParams.outputProj[i]);
00693 }
00694 for (uint i = 0; i < rowScanParams.residualFilterCols.size(); i++) {
00695 uint j;
00696 for (j = 0; j < rowScanParams.outputProj.size(); j++) {
00697 if (rowScanParams.outputProj[j] ==
00698 rowScanParams.residualFilterCols[i])
00699 {
00700 break;
00701 }
00702 }
00703
00704 if (j >= rowScanParams.outputProj.size()) {
00705 outputProj.push_back(rowScanParams.residualFilterCols[i]);
00706 }
00707 }
00708 }
00709
00710
00711 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $");
00712
00713