LbmGeneratorExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 LucidEra, Inc.
00005 // Copyright (C) 2006-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/bitmap/LbmGeneratorExecStream.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025 #include "fennel/tuple/UnalignedAttributeAccessor.h"
00026 
00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $");
00028 
00029 void LbmGeneratorExecStream::prepare(LbmGeneratorExecStreamParams const &params)
00030 {
00031     BTreeExecStream::prepare(params);
00032     LcsRowScanBaseExecStream::prepare(params);
00033 
00034     insertRowCountParamId = params.insertRowCountParamId;
00035     assert(opaqueToInt(insertRowCountParamId) > 0);
00036 
00037     createIndex = params.createIndex;
00038     parameterIds.resize(nClusters);
00039     for (uint i = 0; i < nClusters; i++) {
00040         parameterIds[i] = params.lcsClusterScanDefs[i].rootPageIdParamId;
00041     }
00042 
00043     scratchLock.accessSegment(scratchAccessor);
00044     scratchPageSize = scratchAccessor.pSegment->getUsablePageSize();
00045 
00046     // setup input tuple -- numRowsToLoad, startRid
00047     assert(inAccessors[0]->getTupleDesc().size() == 2);
00048     inputTuple.compute(inAccessors[0]->getTupleDesc());
00049 
00050     // setup tuple used to store key values read from clusters
00051     bitmapTuple.computeAndAllocate(pOutAccessor->getTupleDesc());
00052 
00053     // setup output tuple
00054     assert(treeDescriptor.tupleDescriptor == pOutAccessor->getTupleDesc());
00055     bitmapTupleDesc = treeDescriptor.tupleDescriptor;
00056 
00057     attrAccessors.resize(bitmapTupleDesc.size());
00058     for (int i = 0; i < bitmapTupleDesc.size(); ++i) {
00059         attrAccessors[i].compute(bitmapTupleDesc[i]);
00060     }
00061 
00062     nIdxKeys = treeDescriptor.keyProjection.size() - 1;
00063 
00064     // determine min and max size of bitmap entries
00065 
00066     LbmEntry::getSizeBounds(
00067         bitmapTupleDesc,
00068         treeDescriptor.segmentAccessor.pSegment->getUsablePageSize(),
00069         minBitmapSize,
00070         maxBitmapSize);
00071 
00072     ridRuns.resize(1);
00073 }
00074 
00075 void LbmGeneratorExecStream::open(bool restart)
00076 {
00077     BTreeExecStream::open(restart);
00078     LcsRowScanBaseExecStream::open(restart);
00079     nBitmapEntries = 0;
00080     flushIdx = 0;
00081     nBitmapBuffers = 0;
00082     nScratchPagesAllocated = 0;
00083     producePending = LBM_NOFLUSH_PENDING;
00084     skipRead = false;
00085     rowCount = 0;
00086     batchRead = false;
00087     doneReading = false;
00088     revertToSingletons = false;
00089     ridRuns.clear();
00090     if (!restart) {
00091         pDynamicParamManager->createParam(
00092             insertRowCountParamId, inAccessors[0]->getTupleDesc()[0]);
00093 
00094         // set the rootPageIds of the clusters, if there are dynamic parameters
00095         // corresponding to them
00096         if (parameterIds.size() > 0) {
00097             for (uint i = 0; i < nClusters; i++) {
00098                 if (opaqueToInt(parameterIds[i]) > 0) {
00099                     pClusters[i]->setRootPageId(
00100                         *reinterpret_cast<PageId const *>(
00101                             pDynamicParamManager->getParam(parameterIds[i]).
00102                                 getDatum().pData));
00103                 }
00104             }
00105         }
00106     }
00107 }
00108 
00109 void LbmGeneratorExecStream::getResourceRequirements(
00110     ExecStreamResourceQuantity &minQuantity,
00111     ExecStreamResourceQuantity &optQuantity,
00112     ExecStreamResourceSettingType &optType)
00113 {
00114     BTreeExecStream::getResourceRequirements(minQuantity, optQuantity);
00115     LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity);
00116     numMiscScratchPages = minQuantity.nCachePages;
00117 
00118     // need a minimum of one scratch pages for constructing LbmEntry's
00119     minQuantity.nCachePages += 1;
00120 
00121     // If this is a multi-key index, then we're only creating singleton
00122     // LbmEntry's and therefore, don't need multiple scratch pages.
00123     // Otherwise, we ideally want to set the number of scratch pages
00124     // based on the max number of distinct values in compressed batches.
00125     // Since we don't have that information, we'll use an "estimate" of
00126     // 10 pages.
00127     if (nIdxKeys > 1) {
00128         optQuantity.nCachePages += 1;
00129         optType = EXEC_RESOURCE_ACCURATE;
00130     } else {
00131         optQuantity.nCachePages += 11;
00132         optType = EXEC_RESOURCE_ESTIMATE;
00133     }
00134 }
00135 
00136 void LbmGeneratorExecStream::setResourceAllocation(
00137     ExecStreamResourceQuantity &quantity)
00138 {
00139     BTreeExecStream::setResourceAllocation(quantity);
00140     LcsRowScanBaseExecStream::setResourceAllocation(quantity);
00141 
00142     maxNumScratchPages = quantity.nCachePages - numMiscScratchPages;
00143 }
00144 
00145 ExecStreamResult LbmGeneratorExecStream::execute(
00146     ExecStreamQuantum const &quantum)
00147 {
00148     // read the start rid and num of rows to load
00149 
00150     if (inAccessors[0]->getState() == EXECBUF_EOS) {
00151         if (doneReading) {
00152             pOutAccessor->markEOS();
00153             return EXECRC_EOS;
00154         }
00155 
00156     } else {
00157         if (!inAccessors[0]->demandData()) {
00158             return EXECRC_BUF_UNDERFLOW;
00159         }
00160 
00161         inAccessors[0]->unmarshalTuple(inputTuple);
00162 
00163         // in the case of create index, the number of rows affected
00164         // is returned as 0, since the statement is a DDL
00165         LcsRidRun ridRun;
00166         if (createIndex) {
00167             numRowsToLoad = 0;
00168             ridRun.nRids = RecordNum(MAXU);
00169             startRid = LcsRid(0);
00170         } else {
00171             numRowsToLoad =
00172                 *reinterpret_cast<RecordNum const *> (inputTuple[0].pData);
00173             ridRun.nRids = numRowsToLoad;
00174             startRid =
00175                 *reinterpret_cast<LcsRid const *> (inputTuple[1].pData);
00176         }
00177         currRid = startRid;
00178 
00179         // Setup the prefetch rid run to a single run spanning the range
00180         // of rows to be inserted into the index
00181         ridRun.startRid = startRid;
00182         ridRuns.push_back(ridRun);
00183 
00184         // set number of rows to load in a dynamic parameter that
00185         // splicer will later read
00186         pDynamicParamManager->writeParam(insertRowCountParamId, inputTuple[0]);
00187 
00188         inAccessors[0]->consumeTuple();
00189 
00190         // special case where there are no rows -- don't bother reading
00191         // from the table because we may end up reading deleted rows
00192         if (!createIndex && numRowsToLoad == 0) {
00193             doneReading = true;
00194             return EXECRC_BUF_UNDERFLOW;
00195         }
00196 
00197         // position to the starting rid
00198         for (uint iClu = 0; iClu < nClusters; iClu++) {
00199             SharedLcsClusterReader &pScan = pClusters[iClu];
00200             if (!pScan->position(startRid)) {
00201                 // empty table
00202                 doneReading = true;
00203                 return EXECRC_BUF_UNDERFLOW;
00204             }
00205             syncColumns(pScan);
00206         }
00207 
00208         // initialize bitmap table to a single entry, assuming we're
00209         // starting with singleton bitmaps
00210         bool rc = initBitmapTable(1);
00211         assert(rc);
00212     }
00213 
00214     // take care of any pending flushes first
00215 
00216     switch (producePending) {
00217     case LBM_ENTRYFLUSH_PENDING:
00218         // outputTuple already contains the pending tuple to be flushed
00219         if (!pOutAccessor->produceTuple(outputTuple)) {
00220             return EXECRC_BUF_OVERFLOW;
00221         }
00222         bitmapTable[flushStart].inuse = false;
00223         break;
00224     case LBM_TABLEFLUSH_PENDING:
00225         if (!flushTable(flushStart)) {
00226             return EXECRC_BUF_OVERFLOW;
00227         }
00228         break;
00229     default:
00230         break;
00231     }
00232     producePending = LBM_NOFLUSH_PENDING;
00233 
00234     ExecStreamResult rc;
00235 
00236     if (nIdxKeys == 1) {
00237         rc = generateSingleKeyBitmaps(quantum);
00238     } else {
00239         rc = generateMultiKeyBitmaps(quantum);
00240     }
00241 
00242     switch (rc) {
00243     case EXECRC_BUF_OVERFLOW:
00244     case EXECRC_QUANTUM_EXPIRED:
00245         return rc;
00246     case EXECRC_EOS:
00247         // no more rows to process
00248         if (!createIndex) {
00249             // It's possible for the row count to be larger if we're building
00250             // an index on a replaced column.
00251             assert(rowCount >= numRowsToLoad);
00252         }
00253         doneReading = true;
00254         return EXECRC_BUF_UNDERFLOW;
00255     default:
00256         permAssert(false);
00257     }
00258 }
00259 
00260 ExecStreamResult LbmGeneratorExecStream::generateSingleKeyBitmaps(
00261     ExecStreamQuantum const &quantum)
00262 {
00263     // read from the current batch until either the end of the batch
00264     // is reached, or there is an overflow in a write to the output stream
00265     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00266         if (!revertToSingletons &&
00267             pClusters[0]->clusterCols[0].batchIsCompressed())
00268         {
00269             if (!generateBitmaps()) {
00270                 return EXECRC_BUF_OVERFLOW;
00271             }
00272         } else if (!batchRead) {
00273             if (!generateSingletons()) {
00274                 return EXECRC_BUF_OVERFLOW;
00275             }
00276         }
00277 
00278         // move to the next batch
00279         batchRead = false;
00280         revertToSingletons = false;
00281         SharedLcsClusterReader &pScan = pClusters[0];
00282         if (!pScan->nextRange()) {
00283             return EXECRC_EOS;
00284         }
00285         pScan->clusterCols[0].sync();
00286     }
00287     return EXECRC_QUANTUM_EXPIRED;
00288 }
00289 
00290 ExecStreamResult LbmGeneratorExecStream::generateMultiKeyBitmaps(
00291     ExecStreamQuantum const &quantum)
00292 {
00293     // read through all rows until the end of the table has been reached,
00294     // or there is an overflow in a write to the output stream
00295     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00296         uint prevClusterEnd = 0;
00297         if (!skipRead) {
00298             // reset buffer before loading new values, in case previous
00299             // row contained nulls
00300             bitmapTuple.resetBuffer();
00301             for (uint iClu = 0; iClu < nClusters; iClu++) {
00302                 SharedLcsClusterReader &pScan = pClusters[iClu];
00303 
00304                 if (currRid >= pScan->getRangeEndRid()) {
00305                     // move to the next batch if this particular cluster
00306                     // reader has reached the end of its batch
00307                     if (!pScan->nextRange()) {
00308                         assert(
00309                             iClu == 0 &&
00310                                 (nClusters == 1 || !pClusters[1]->nextRange()));
00311                         return EXECRC_EOS;
00312                     }
00313                     assert(
00314                         currRid >= pScan->getRangeStartRid() &&
00315                         currRid < pScan->getRangeEndRid());
00316                     syncColumns(pScan);
00317                 } else {
00318                     assert(currRid >= pScan->getRangeStartRid());
00319                     pScan->advanceWithinBatch(
00320                         opaqueToInt(currRid - pScan->getCurrentRid()));
00321                 }
00322                 readColVals(
00323                     pScan,
00324                     bitmapTuple,
00325                     prevClusterEnd);
00326                 prevClusterEnd += pScan->nColsToRead;
00327             }
00328         }
00329 
00330         createSingletonBitmapEntry();
00331         if (!flushEntry(0)) {
00332             return EXECRC_BUF_OVERFLOW;
00333         }
00334     }
00335     return EXECRC_QUANTUM_EXPIRED;
00336 }
00337 
00338 void LbmGeneratorExecStream::createSingletonBitmapEntry()
00339 {
00340     // create the singleton bitmap entry and then flush it out
00341     // right away; should never fail trying to create a singleton
00342     // entry
00343     initRidAndBitmap(bitmapTuple, &currRid);
00344     bool rc = addRidToBitmap(0, bitmapTuple, currRid);
00345     assert(rc);
00346     skipRead = false;
00347     ++currRid;
00348     ++rowCount;
00349 }
00350 
00351 void LbmGeneratorExecStream::closeImpl()
00352 {
00353     BTreeExecStream::closeImpl();
00354     LcsRowScanBaseExecStream::closeImpl();
00355     keyCodes.clear();
00356     bitmapTable.clear();
00357     scratchPages.clear();
00358 
00359     if (scratchAccessor.pSegment) {
00360         scratchAccessor.pSegment->deallocatePageRange(
00361             NULL_PAGE_ID, NULL_PAGE_ID);
00362     }
00363 }
00364 
00365 bool LbmGeneratorExecStream::generateBitmaps()
00366 {
00367     // in the single key case, the column reader is always the first
00368     // one, in the first cluster reader
00369     LcsColumnReader &colReader = pClusters[0]->clusterCols[0];
00370     uint nDistinctVals = colReader.getBatchValCount();
00371 
00372     // only read rows beginning at startRid
00373     uint nRows = pClusters[0]->getRangeRowsLeft();
00374 
00375     // if first time through, setup the keycode array and read the batch
00376     if (!batchRead) {
00377         uint nRead;
00378 
00379         // if there's insufficient buffer space, revert to generating
00380         // singletons for this batch
00381         if (!initBitmapTable(nDistinctVals)) {
00382             revertToSingletons = true;
00383             return generateSingletons();
00384         }
00385 
00386         keyCodes.resize(nRows);
00387         colReader.readCompressedBatch(nRows, &keyCodes[0], &nRead);
00388         assert(nRows == nRead);
00389 
00390         batchRead = true;
00391         currBatch = 0;
00392     }
00393 
00394     // resume reading batch values based on where we last left off;
00395     // if the value has been read but not yet processed, skip the read
00396     for (uint i = currBatch; i < nRows; i++) {
00397         if (!skipRead) {
00398             PBuffer curValue = colReader.getBatchValue(keyCodes[i]);
00399             // reset buffer before loading new value, in case previous
00400             // row had nulls
00401             bitmapTuple.resetBuffer();
00402 
00403             attrAccessors[0].loadValue(bitmapTuple[0], curValue);
00404             initRidAndBitmap(bitmapTuple, &currRid);
00405         }
00406         if (!addRidToBitmap(keyCodes[i], bitmapTuple, currRid)) {
00407             currBatch = i;
00408             skipRead = true;
00409             return false;
00410         }
00411         ++currRid;
00412         ++rowCount;
00413         skipRead = false;
00414     }
00415 
00416     // flush out table since the next batch will have a different set
00417     // of keycodes
00418     if (!flushTable(0)) {
00419         // set currBatch to avoid re-reading column values above
00420         // when return back into this method
00421         currBatch = nRows;
00422         return false;
00423     }
00424 
00425     return true;
00426 }
00427 
00428 bool LbmGeneratorExecStream::generateSingletons()
00429 {
00430     SharedLcsClusterReader &pScan = pClusters[0];
00431 
00432     do {
00433         // if we've already read the row but haven't processed it yet,
00434         // skip the read
00435         if (!skipRead) {
00436             uint prevClusterEnd = 0;
00437 
00438             // reset buffer before loading new values, in case previous
00439             // row contained nulls
00440             bitmapTuple.resetBuffer();
00441 
00442             for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00443                 PBuffer curValue =
00444                     pScan->clusterCols[iCluCol].getCurrentValue();
00445                 uint idx = projMap[prevClusterEnd + iCluCol];
00446 
00447                 attrAccessors[idx].loadValue(bitmapTuple[idx], curValue);
00448             }
00449             prevClusterEnd += pScan->nColsToRead;
00450         }
00451 
00452         createSingletonBitmapEntry();
00453         if (!flushEntry(0)) {
00454             // advance now so the next time we come in here, we'll
00455             // be correctly positioned on the next rid
00456             if (!advanceReader(pScan)) {
00457                 // if we're at the end of the batch, avoid coming
00458                 // back in here until the new batch is read
00459                 batchRead = true;
00460             }
00461             return false;
00462         }
00463 
00464         // advance to the next rid; if at the end of the batch,
00465         // return to caller; else, continue reading from current
00466         // batch
00467         if (!advanceReader(pScan)) {
00468             return true;
00469         }
00470     } while (true);
00471 }
00472 
00473 bool LbmGeneratorExecStream::advanceReader(SharedLcsClusterReader &pScan)
00474 {
00475     if (!pScan->advance(1)) {
00476         return false;
00477     }
00478     syncColumns(pScan);
00479     return true;
00480 }
00481 
00482 bool LbmGeneratorExecStream::initBitmapTable(uint nEntries)
00483 {
00484     // compute the size of the bitmap buffers, based on the number
00485     // of scratch pages available and the number of distinct values
00486     // in the batch
00487     uint nBufsPerPage = (uint) ceil((double) nEntries / maxNumScratchPages);
00488     uint currSize = scratchPageSize / nBufsPerPage;
00489 
00490     if (currSize < minBitmapSize) {
00491         currSize = minBitmapSize;
00492         nBufsPerPage = scratchPageSize / currSize;
00493     } else if (currSize > maxBitmapSize) {
00494         currSize = maxBitmapSize;
00495         nBufsPerPage = scratchPageSize / currSize;
00496     }
00497 
00498     // If there are less than 8 buffers, then there cannot be more keys than
00499     // buffers.  That's because we need to avoid flushing those buffers that
00500     // potentially overlap in the last byte with the upcoming rids being
00501     // processed.
00502     uint nBuffers = nBufsPerPage * maxNumScratchPages;
00503     if (nBuffers < 8 && nEntries > nBuffers) {
00504         return false;
00505     }
00506 
00507     if (nEntries > nBitmapEntries) {
00508         // resize bitmap table to accomodate new batch, which has more
00509         // distinct values
00510         bitmapTable.resize(nEntries);
00511         for (uint i = nBitmapEntries; i < nEntries; i++) {
00512             bitmapTable[i].pBitmap = SharedLbmEntry(new LbmEntry());
00513         }
00514     }
00515 
00516     if (nEntries != nBitmapEntries) {
00517         // divide up the buffers across the bitmap table; if there are
00518         // not enough buffers, the bitmap entries at the end of the table
00519         // won't have assigned buffers yet; no need to re-divide the buffers
00520         // if the previous bitmap table had the same number of entries
00521 
00522         uint nPages = (uint) ceil((double) nEntries / nBufsPerPage);
00523         if (nPages > maxNumScratchPages) {
00524             nPages = maxNumScratchPages;
00525         }
00526         while (nPages > nScratchPagesAllocated) {
00527             scratchLock.allocatePage();
00528             PBuffer newPage = scratchLock.getPage().getWritableData();
00529             scratchPages.push_back(newPage);
00530             ++nScratchPagesAllocated;
00531         }
00532         uint idx = 0;
00533         for (uint i = 0; i < nPages; i++) {
00534             uint offset = 0;
00535             for (uint j = 0; j < nBufsPerPage; j++) {
00536                 idx = i * nBufsPerPage + j;
00537                 if (idx == nEntries) {
00538                     break;
00539                 }
00540                 bitmapTable[idx].bufferPtr = scratchPages[i] + offset;
00541                 offset += currSize;
00542             }
00543             if (idx == nEntries) {
00544                 break;
00545             }
00546         }
00547         // entries without assigned buffers
00548         for (uint i = idx + 1; i < nEntries; i++) {
00549             bitmapTable[i].bufferPtr = NULL;
00550         }
00551     }
00552 
00553     for (uint i = 0; i < nEntries; i++) {
00554         bitmapTable[i].inuse = false;
00555     }
00556     flushIdx = 0;
00557     nBitmapEntries = nEntries;
00558     entrySize = currSize;
00559 
00560     return true;
00561 }
00562 
00563 void LbmGeneratorExecStream::initRidAndBitmap(
00564     TupleData &bitmapTuple, LcsRid* pCurrRid)
00565 {
00566     bitmapTuple[nIdxKeys].pData = (PConstBuffer) pCurrRid;
00567     bitmapTuple[nIdxKeys + 1].pData = NULL;
00568     bitmapTuple[nIdxKeys + 2].pData = NULL;
00569 }
00570 
00571 bool LbmGeneratorExecStream::addRidToBitmap(
00572     uint keycode, TupleData &initBitmap, LcsRid rid)
00573 {
00574     assert(keycode <= nBitmapEntries);
00575 
00576     if (bitmapTable[keycode].inuse) {
00577         assert(bitmapTable[keycode].bufferPtr);
00578 
00579         bool maxedOut = !bitmapTable[keycode].pBitmap->setRID(rid);
00580         if (maxedOut) {
00581             if (!flushEntry(keycode)) {
00582                 return false;
00583             }
00584             assert(!bitmapTable[keycode].inuse);
00585             bitmapTable[keycode].inuse = true;
00586             // buffer should now have enough space; create a singleton
00587             // entry
00588             bitmapTable[keycode].pBitmap->setEntryTuple(initBitmap);
00589         }
00590     } else {
00591         if (!bitmapTable[keycode].bufferPtr) {
00592             // no assigned buffer yet; get a buffer by flushing
00593             // out an existing entry
00594             PBuffer bufPtr = flushBuffer(rid);
00595             if (!bufPtr) {
00596                 return false;
00597             }
00598             bitmapTable[keycode].bufferPtr = bufPtr;
00599         }
00600         // now that we have a buffer, initialize the entry
00601         bitmapTable[keycode].pBitmap->init(
00602             bitmapTable[keycode].bufferPtr, NULL, entrySize, bitmapTupleDesc);
00603         bitmapTable[keycode].pBitmap->setEntryTuple(initBitmap);
00604         bitmapTable[keycode].inuse = true;
00605     }
00606 
00607     return true;
00608 }
00609 
00610 PBuffer LbmGeneratorExecStream::flushBuffer(LcsRid addRid)
00611 {
00612     // need to flush a buffer out and return that buffer for use; for now,
00613     // cycle through the entries in round robin order in determining which
00614     // to flush
00615     //
00616     // NOTE zfong 6-Jan-2006: We may want to change this to a more
00617     // sophisticated scheme where we flush on a LRU basis
00618     PBuffer retPtr;
00619     uint nAttempts = 0;
00620     do {
00621         ++nAttempts;
00622         if (nAttempts > nBitmapEntries) {
00623             // we should always have enough buffers so we can flush at least
00624             // one existing entry
00625             permAssert(false);
00626         }
00627         if (bitmapTable[flushIdx].bufferPtr) {
00628             retPtr = bitmapTable[flushIdx].bufferPtr;
00629             if (bitmapTable[flushIdx].inuse) {
00630                 // skip over entries whose rid range overlaps with the rid
00631                 // that will be added next, since we potentially may need
00632                 // to add that rid (or one that follows and is within the
00633                 // same rid range) into that entry
00634                 if (bitmapTable[flushIdx].pBitmap->inRange(addRid)) {
00635                     flushIdx = ++flushIdx % nBitmapEntries;
00636                     continue;
00637                 }
00638                 if (!flushEntry(flushIdx)) {
00639                     return NULL;
00640                 }
00641             }
00642             bitmapTable[flushIdx].bufferPtr = NULL;
00643             break;
00644         }
00645         flushIdx = ++flushIdx % nBitmapEntries;
00646     } while (true);
00647 
00648     flushIdx = ++flushIdx % nBitmapEntries;
00649     return retPtr;
00650 }
00651 
00652 bool LbmGeneratorExecStream::flushTable(uint start)
00653 {
00654     assert(start <= nBitmapEntries);
00655     for (uint i = start; i < nBitmapEntries; i++) {
00656         if (bitmapTable[i].inuse) {
00657             if (!flushEntry(i)) {
00658                 producePending = LBM_TABLEFLUSH_PENDING;
00659                 return false;
00660             }
00661         }
00662     }
00663 
00664     return true;
00665 }
00666 
00667 bool LbmGeneratorExecStream::flushEntry(uint keycode)
00668 {
00669     assert(bitmapTable[keycode].inuse && bitmapTable[keycode].bufferPtr);
00670 
00671     // retrieve the generated bitmap entry and write it to the output
00672     // stream
00673 
00674     outputTuple = bitmapTable[keycode].pBitmap->produceEntryTuple();
00675 
00676     FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(outputTuple));
00677 
00678     if (!pOutAccessor->produceTuple(outputTuple)) {
00679         flushStart = keycode;
00680         producePending = LBM_ENTRYFLUSH_PENDING;
00681         return false;
00682     }
00683 
00684     // entry no longer is associated with an entry tuple
00685     bitmapTable[keycode].inuse = false;
00686 
00687     return true;
00688 }
00689 
00690 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $");
00691 
00692 // End LbmGeneratorExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1