00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/bitmap/LbmGeneratorExecStream.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025 #include "fennel/tuple/UnalignedAttributeAccessor.h"
00026
00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $");
00028
00029 void LbmGeneratorExecStream::prepare(LbmGeneratorExecStreamParams const ¶ms)
00030 {
00031 BTreeExecStream::prepare(params);
00032 LcsRowScanBaseExecStream::prepare(params);
00033
00034 insertRowCountParamId = params.insertRowCountParamId;
00035 assert(opaqueToInt(insertRowCountParamId) > 0);
00036
00037 createIndex = params.createIndex;
00038 parameterIds.resize(nClusters);
00039 for (uint i = 0; i < nClusters; i++) {
00040 parameterIds[i] = params.lcsClusterScanDefs[i].rootPageIdParamId;
00041 }
00042
00043 scratchLock.accessSegment(scratchAccessor);
00044 scratchPageSize = scratchAccessor.pSegment->getUsablePageSize();
00045
00046
00047 assert(inAccessors[0]->getTupleDesc().size() == 2);
00048 inputTuple.compute(inAccessors[0]->getTupleDesc());
00049
00050
00051 bitmapTuple.computeAndAllocate(pOutAccessor->getTupleDesc());
00052
00053
00054 assert(treeDescriptor.tupleDescriptor == pOutAccessor->getTupleDesc());
00055 bitmapTupleDesc = treeDescriptor.tupleDescriptor;
00056
00057 attrAccessors.resize(bitmapTupleDesc.size());
00058 for (int i = 0; i < bitmapTupleDesc.size(); ++i) {
00059 attrAccessors[i].compute(bitmapTupleDesc[i]);
00060 }
00061
00062 nIdxKeys = treeDescriptor.keyProjection.size() - 1;
00063
00064
00065
00066 LbmEntry::getSizeBounds(
00067 bitmapTupleDesc,
00068 treeDescriptor.segmentAccessor.pSegment->getUsablePageSize(),
00069 minBitmapSize,
00070 maxBitmapSize);
00071
00072 ridRuns.resize(1);
00073 }
00074
00075 void LbmGeneratorExecStream::open(bool restart)
00076 {
00077 BTreeExecStream::open(restart);
00078 LcsRowScanBaseExecStream::open(restart);
00079 nBitmapEntries = 0;
00080 flushIdx = 0;
00081 nBitmapBuffers = 0;
00082 nScratchPagesAllocated = 0;
00083 producePending = LBM_NOFLUSH_PENDING;
00084 skipRead = false;
00085 rowCount = 0;
00086 batchRead = false;
00087 doneReading = false;
00088 revertToSingletons = false;
00089 ridRuns.clear();
00090 if (!restart) {
00091 pDynamicParamManager->createParam(
00092 insertRowCountParamId, inAccessors[0]->getTupleDesc()[0]);
00093
00094
00095
00096 if (parameterIds.size() > 0) {
00097 for (uint i = 0; i < nClusters; i++) {
00098 if (opaqueToInt(parameterIds[i]) > 0) {
00099 pClusters[i]->setRootPageId(
00100 *reinterpret_cast<PageId const *>(
00101 pDynamicParamManager->getParam(parameterIds[i]).
00102 getDatum().pData));
00103 }
00104 }
00105 }
00106 }
00107 }
00108
00109 void LbmGeneratorExecStream::getResourceRequirements(
00110 ExecStreamResourceQuantity &minQuantity,
00111 ExecStreamResourceQuantity &optQuantity,
00112 ExecStreamResourceSettingType &optType)
00113 {
00114 BTreeExecStream::getResourceRequirements(minQuantity, optQuantity);
00115 LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity);
00116 numMiscScratchPages = minQuantity.nCachePages;
00117
00118
00119 minQuantity.nCachePages += 1;
00120
00121
00122
00123
00124
00125
00126
00127 if (nIdxKeys > 1) {
00128 optQuantity.nCachePages += 1;
00129 optType = EXEC_RESOURCE_ACCURATE;
00130 } else {
00131 optQuantity.nCachePages += 11;
00132 optType = EXEC_RESOURCE_ESTIMATE;
00133 }
00134 }
00135
00136 void LbmGeneratorExecStream::setResourceAllocation(
00137 ExecStreamResourceQuantity &quantity)
00138 {
00139 BTreeExecStream::setResourceAllocation(quantity);
00140 LcsRowScanBaseExecStream::setResourceAllocation(quantity);
00141
00142 maxNumScratchPages = quantity.nCachePages - numMiscScratchPages;
00143 }
00144
00145 ExecStreamResult LbmGeneratorExecStream::execute(
00146 ExecStreamQuantum const &quantum)
00147 {
00148
00149
00150 if (inAccessors[0]->getState() == EXECBUF_EOS) {
00151 if (doneReading) {
00152 pOutAccessor->markEOS();
00153 return EXECRC_EOS;
00154 }
00155
00156 } else {
00157 if (!inAccessors[0]->demandData()) {
00158 return EXECRC_BUF_UNDERFLOW;
00159 }
00160
00161 inAccessors[0]->unmarshalTuple(inputTuple);
00162
00163
00164
00165 LcsRidRun ridRun;
00166 if (createIndex) {
00167 numRowsToLoad = 0;
00168 ridRun.nRids = RecordNum(MAXU);
00169 startRid = LcsRid(0);
00170 } else {
00171 numRowsToLoad =
00172 *reinterpret_cast<RecordNum const *> (inputTuple[0].pData);
00173 ridRun.nRids = numRowsToLoad;
00174 startRid =
00175 *reinterpret_cast<LcsRid const *> (inputTuple[1].pData);
00176 }
00177 currRid = startRid;
00178
00179
00180
00181 ridRun.startRid = startRid;
00182 ridRuns.push_back(ridRun);
00183
00184
00185
00186 pDynamicParamManager->writeParam(insertRowCountParamId, inputTuple[0]);
00187
00188 inAccessors[0]->consumeTuple();
00189
00190
00191
00192 if (!createIndex && numRowsToLoad == 0) {
00193 doneReading = true;
00194 return EXECRC_BUF_UNDERFLOW;
00195 }
00196
00197
00198 for (uint iClu = 0; iClu < nClusters; iClu++) {
00199 SharedLcsClusterReader &pScan = pClusters[iClu];
00200 if (!pScan->position(startRid)) {
00201
00202 doneReading = true;
00203 return EXECRC_BUF_UNDERFLOW;
00204 }
00205 syncColumns(pScan);
00206 }
00207
00208
00209
00210 bool rc = initBitmapTable(1);
00211 assert(rc);
00212 }
00213
00214
00215
00216 switch (producePending) {
00217 case LBM_ENTRYFLUSH_PENDING:
00218
00219 if (!pOutAccessor->produceTuple(outputTuple)) {
00220 return EXECRC_BUF_OVERFLOW;
00221 }
00222 bitmapTable[flushStart].inuse = false;
00223 break;
00224 case LBM_TABLEFLUSH_PENDING:
00225 if (!flushTable(flushStart)) {
00226 return EXECRC_BUF_OVERFLOW;
00227 }
00228 break;
00229 default:
00230 break;
00231 }
00232 producePending = LBM_NOFLUSH_PENDING;
00233
00234 ExecStreamResult rc;
00235
00236 if (nIdxKeys == 1) {
00237 rc = generateSingleKeyBitmaps(quantum);
00238 } else {
00239 rc = generateMultiKeyBitmaps(quantum);
00240 }
00241
00242 switch (rc) {
00243 case EXECRC_BUF_OVERFLOW:
00244 case EXECRC_QUANTUM_EXPIRED:
00245 return rc;
00246 case EXECRC_EOS:
00247
00248 if (!createIndex) {
00249
00250
00251 assert(rowCount >= numRowsToLoad);
00252 }
00253 doneReading = true;
00254 return EXECRC_BUF_UNDERFLOW;
00255 default:
00256 permAssert(false);
00257 }
00258 }
00259
00260 ExecStreamResult LbmGeneratorExecStream::generateSingleKeyBitmaps(
00261 ExecStreamQuantum const &quantum)
00262 {
00263
00264
00265 for (uint i = 0; i < quantum.nTuplesMax; i++) {
00266 if (!revertToSingletons &&
00267 pClusters[0]->clusterCols[0].batchIsCompressed())
00268 {
00269 if (!generateBitmaps()) {
00270 return EXECRC_BUF_OVERFLOW;
00271 }
00272 } else if (!batchRead) {
00273 if (!generateSingletons()) {
00274 return EXECRC_BUF_OVERFLOW;
00275 }
00276 }
00277
00278
00279 batchRead = false;
00280 revertToSingletons = false;
00281 SharedLcsClusterReader &pScan = pClusters[0];
00282 if (!pScan->nextRange()) {
00283 return EXECRC_EOS;
00284 }
00285 pScan->clusterCols[0].sync();
00286 }
00287 return EXECRC_QUANTUM_EXPIRED;
00288 }
00289
00290 ExecStreamResult LbmGeneratorExecStream::generateMultiKeyBitmaps(
00291 ExecStreamQuantum const &quantum)
00292 {
00293
00294
00295 for (uint i = 0; i < quantum.nTuplesMax; i++) {
00296 uint prevClusterEnd = 0;
00297 if (!skipRead) {
00298
00299
00300 bitmapTuple.resetBuffer();
00301 for (uint iClu = 0; iClu < nClusters; iClu++) {
00302 SharedLcsClusterReader &pScan = pClusters[iClu];
00303
00304 if (currRid >= pScan->getRangeEndRid()) {
00305
00306
00307 if (!pScan->nextRange()) {
00308 assert(
00309 iClu == 0 &&
00310 (nClusters == 1 || !pClusters[1]->nextRange()));
00311 return EXECRC_EOS;
00312 }
00313 assert(
00314 currRid >= pScan->getRangeStartRid() &&
00315 currRid < pScan->getRangeEndRid());
00316 syncColumns(pScan);
00317 } else {
00318 assert(currRid >= pScan->getRangeStartRid());
00319 pScan->advanceWithinBatch(
00320 opaqueToInt(currRid - pScan->getCurrentRid()));
00321 }
00322 readColVals(
00323 pScan,
00324 bitmapTuple,
00325 prevClusterEnd);
00326 prevClusterEnd += pScan->nColsToRead;
00327 }
00328 }
00329
00330 createSingletonBitmapEntry();
00331 if (!flushEntry(0)) {
00332 return EXECRC_BUF_OVERFLOW;
00333 }
00334 }
00335 return EXECRC_QUANTUM_EXPIRED;
00336 }
00337
00338 void LbmGeneratorExecStream::createSingletonBitmapEntry()
00339 {
00340
00341
00342
00343 initRidAndBitmap(bitmapTuple, &currRid);
00344 bool rc = addRidToBitmap(0, bitmapTuple, currRid);
00345 assert(rc);
00346 skipRead = false;
00347 ++currRid;
00348 ++rowCount;
00349 }
00350
00351 void LbmGeneratorExecStream::closeImpl()
00352 {
00353 BTreeExecStream::closeImpl();
00354 LcsRowScanBaseExecStream::closeImpl();
00355 keyCodes.clear();
00356 bitmapTable.clear();
00357 scratchPages.clear();
00358
00359 if (scratchAccessor.pSegment) {
00360 scratchAccessor.pSegment->deallocatePageRange(
00361 NULL_PAGE_ID, NULL_PAGE_ID);
00362 }
00363 }
00364
00365 bool LbmGeneratorExecStream::generateBitmaps()
00366 {
00367
00368
00369 LcsColumnReader &colReader = pClusters[0]->clusterCols[0];
00370 uint nDistinctVals = colReader.getBatchValCount();
00371
00372
00373 uint nRows = pClusters[0]->getRangeRowsLeft();
00374
00375
00376 if (!batchRead) {
00377 uint nRead;
00378
00379
00380
00381 if (!initBitmapTable(nDistinctVals)) {
00382 revertToSingletons = true;
00383 return generateSingletons();
00384 }
00385
00386 keyCodes.resize(nRows);
00387 colReader.readCompressedBatch(nRows, &keyCodes[0], &nRead);
00388 assert(nRows == nRead);
00389
00390 batchRead = true;
00391 currBatch = 0;
00392 }
00393
00394
00395
00396 for (uint i = currBatch; i < nRows; i++) {
00397 if (!skipRead) {
00398 PBuffer curValue = colReader.getBatchValue(keyCodes[i]);
00399
00400
00401 bitmapTuple.resetBuffer();
00402
00403 attrAccessors[0].loadValue(bitmapTuple[0], curValue);
00404 initRidAndBitmap(bitmapTuple, &currRid);
00405 }
00406 if (!addRidToBitmap(keyCodes[i], bitmapTuple, currRid)) {
00407 currBatch = i;
00408 skipRead = true;
00409 return false;
00410 }
00411 ++currRid;
00412 ++rowCount;
00413 skipRead = false;
00414 }
00415
00416
00417
00418 if (!flushTable(0)) {
00419
00420
00421 currBatch = nRows;
00422 return false;
00423 }
00424
00425 return true;
00426 }
00427
00428 bool LbmGeneratorExecStream::generateSingletons()
00429 {
00430 SharedLcsClusterReader &pScan = pClusters[0];
00431
00432 do {
00433
00434
00435 if (!skipRead) {
00436 uint prevClusterEnd = 0;
00437
00438
00439
00440 bitmapTuple.resetBuffer();
00441
00442 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00443 PBuffer curValue =
00444 pScan->clusterCols[iCluCol].getCurrentValue();
00445 uint idx = projMap[prevClusterEnd + iCluCol];
00446
00447 attrAccessors[idx].loadValue(bitmapTuple[idx], curValue);
00448 }
00449 prevClusterEnd += pScan->nColsToRead;
00450 }
00451
00452 createSingletonBitmapEntry();
00453 if (!flushEntry(0)) {
00454
00455
00456 if (!advanceReader(pScan)) {
00457
00458
00459 batchRead = true;
00460 }
00461 return false;
00462 }
00463
00464
00465
00466
00467 if (!advanceReader(pScan)) {
00468 return true;
00469 }
00470 } while (true);
00471 }
00472
00473 bool LbmGeneratorExecStream::advanceReader(SharedLcsClusterReader &pScan)
00474 {
00475 if (!pScan->advance(1)) {
00476 return false;
00477 }
00478 syncColumns(pScan);
00479 return true;
00480 }
00481
00482 bool LbmGeneratorExecStream::initBitmapTable(uint nEntries)
00483 {
00484
00485
00486
00487 uint nBufsPerPage = (uint) ceil((double) nEntries / maxNumScratchPages);
00488 uint currSize = scratchPageSize / nBufsPerPage;
00489
00490 if (currSize < minBitmapSize) {
00491 currSize = minBitmapSize;
00492 nBufsPerPage = scratchPageSize / currSize;
00493 } else if (currSize > maxBitmapSize) {
00494 currSize = maxBitmapSize;
00495 nBufsPerPage = scratchPageSize / currSize;
00496 }
00497
00498
00499
00500
00501
00502 uint nBuffers = nBufsPerPage * maxNumScratchPages;
00503 if (nBuffers < 8 && nEntries > nBuffers) {
00504 return false;
00505 }
00506
00507 if (nEntries > nBitmapEntries) {
00508
00509
00510 bitmapTable.resize(nEntries);
00511 for (uint i = nBitmapEntries; i < nEntries; i++) {
00512 bitmapTable[i].pBitmap = SharedLbmEntry(new LbmEntry());
00513 }
00514 }
00515
00516 if (nEntries != nBitmapEntries) {
00517
00518
00519
00520
00521
00522 uint nPages = (uint) ceil((double) nEntries / nBufsPerPage);
00523 if (nPages > maxNumScratchPages) {
00524 nPages = maxNumScratchPages;
00525 }
00526 while (nPages > nScratchPagesAllocated) {
00527 scratchLock.allocatePage();
00528 PBuffer newPage = scratchLock.getPage().getWritableData();
00529 scratchPages.push_back(newPage);
00530 ++nScratchPagesAllocated;
00531 }
00532 uint idx = 0;
00533 for (uint i = 0; i < nPages; i++) {
00534 uint offset = 0;
00535 for (uint j = 0; j < nBufsPerPage; j++) {
00536 idx = i * nBufsPerPage + j;
00537 if (idx == nEntries) {
00538 break;
00539 }
00540 bitmapTable[idx].bufferPtr = scratchPages[i] + offset;
00541 offset += currSize;
00542 }
00543 if (idx == nEntries) {
00544 break;
00545 }
00546 }
00547
00548 for (uint i = idx + 1; i < nEntries; i++) {
00549 bitmapTable[i].bufferPtr = NULL;
00550 }
00551 }
00552
00553 for (uint i = 0; i < nEntries; i++) {
00554 bitmapTable[i].inuse = false;
00555 }
00556 flushIdx = 0;
00557 nBitmapEntries = nEntries;
00558 entrySize = currSize;
00559
00560 return true;
00561 }
00562
00563 void LbmGeneratorExecStream::initRidAndBitmap(
00564 TupleData &bitmapTuple, LcsRid* pCurrRid)
00565 {
00566 bitmapTuple[nIdxKeys].pData = (PConstBuffer) pCurrRid;
00567 bitmapTuple[nIdxKeys + 1].pData = NULL;
00568 bitmapTuple[nIdxKeys + 2].pData = NULL;
00569 }
00570
00571 bool LbmGeneratorExecStream::addRidToBitmap(
00572 uint keycode, TupleData &initBitmap, LcsRid rid)
00573 {
00574 assert(keycode <= nBitmapEntries);
00575
00576 if (bitmapTable[keycode].inuse) {
00577 assert(bitmapTable[keycode].bufferPtr);
00578
00579 bool maxedOut = !bitmapTable[keycode].pBitmap->setRID(rid);
00580 if (maxedOut) {
00581 if (!flushEntry(keycode)) {
00582 return false;
00583 }
00584 assert(!bitmapTable[keycode].inuse);
00585 bitmapTable[keycode].inuse = true;
00586
00587
00588 bitmapTable[keycode].pBitmap->setEntryTuple(initBitmap);
00589 }
00590 } else {
00591 if (!bitmapTable[keycode].bufferPtr) {
00592
00593
00594 PBuffer bufPtr = flushBuffer(rid);
00595 if (!bufPtr) {
00596 return false;
00597 }
00598 bitmapTable[keycode].bufferPtr = bufPtr;
00599 }
00600
00601 bitmapTable[keycode].pBitmap->init(
00602 bitmapTable[keycode].bufferPtr, NULL, entrySize, bitmapTupleDesc);
00603 bitmapTable[keycode].pBitmap->setEntryTuple(initBitmap);
00604 bitmapTable[keycode].inuse = true;
00605 }
00606
00607 return true;
00608 }
00609
00610 PBuffer LbmGeneratorExecStream::flushBuffer(LcsRid addRid)
00611 {
00612
00613
00614
00615
00616
00617
00618 PBuffer retPtr;
00619 uint nAttempts = 0;
00620 do {
00621 ++nAttempts;
00622 if (nAttempts > nBitmapEntries) {
00623
00624
00625 permAssert(false);
00626 }
00627 if (bitmapTable[flushIdx].bufferPtr) {
00628 retPtr = bitmapTable[flushIdx].bufferPtr;
00629 if (bitmapTable[flushIdx].inuse) {
00630
00631
00632
00633
00634 if (bitmapTable[flushIdx].pBitmap->inRange(addRid)) {
00635 flushIdx = ++flushIdx % nBitmapEntries;
00636 continue;
00637 }
00638 if (!flushEntry(flushIdx)) {
00639 return NULL;
00640 }
00641 }
00642 bitmapTable[flushIdx].bufferPtr = NULL;
00643 break;
00644 }
00645 flushIdx = ++flushIdx % nBitmapEntries;
00646 } while (true);
00647
00648 flushIdx = ++flushIdx % nBitmapEntries;
00649 return retPtr;
00650 }
00651
00652 bool LbmGeneratorExecStream::flushTable(uint start)
00653 {
00654 assert(start <= nBitmapEntries);
00655 for (uint i = start; i < nBitmapEntries; i++) {
00656 if (bitmapTable[i].inuse) {
00657 if (!flushEntry(i)) {
00658 producePending = LBM_TABLEFLUSH_PENDING;
00659 return false;
00660 }
00661 }
00662 }
00663
00664 return true;
00665 }
00666
00667 bool LbmGeneratorExecStream::flushEntry(uint keycode)
00668 {
00669 assert(bitmapTable[keycode].inuse && bitmapTable[keycode].bufferPtr);
00670
00671
00672
00673
00674 outputTuple = bitmapTable[keycode].pBitmap->produceEntryTuple();
00675
00676 FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(outputTuple));
00677
00678 if (!pOutAccessor->produceTuple(outputTuple)) {
00679 flushStart = keycode;
00680 producePending = LBM_ENTRYFLUSH_PENDING;
00681 return false;
00682 }
00683
00684
00685 bitmapTable[keycode].inuse = false;
00686
00687 return true;
00688 }
00689
00690 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $");
00691
00692