LcsClusterReader.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterReader.h"
00024 #include "fennel/lucidera/colstore/LcsColumnReader.h"
00025 #include "fennel/btree/BTreeWriter.h"
00026 #include "fennel/tuple/TupleAccessor.h"
00027 #include "fennel/segment/SegPageEntryIterImpl.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $");
00030 
00031 LcsClusterReader::LcsClusterReader(
00032     BTreeDescriptor const &treeDescriptor,
00033     CircularBuffer<LcsRidRun> *pRidRuns)
00034 :
00035     LcsClusterAccessBase(treeDescriptor),
00036     SegPageEntryIterSource<LcsRid>(),
00037     ridRunIter(pRidRuns),
00038     prefetchQueue(4000)
00039 {
00040     bTreeReader = SharedBTreeReader(new BTreeReader(treeDescriptor));
00041     if (pRidRuns == NULL) {
00042         noPrefetch = true;
00043     } else {
00044         noPrefetch = false;
00045         prefetchQueue.setPrefetchSource(*this);
00046     }
00047 }
00048 
00049 LcsClusterReader::~LcsClusterReader()
00050 {
00051 }
00052 
00053 void LcsClusterReader::setRootPageId(PageId rootPageId)
00054 {
00055     bTreeReader->setRootPageId(rootPageId);
00056 }
00057 
00058 LcsClusterNode const &LcsClusterReader::readClusterPage()
00059 {
00060     bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00061     clusterPageId = readClusterPageId();
00062     // REVIEW jvs 27-Dec-2005:  What is bTreeRid used for?  Should probably
00063     // assert that it matches node.firstRID
00064     bTreeRid = readRid();
00065     clusterLock.lockShared(clusterPageId);
00066     LcsClusterNode const &node = clusterLock.getNodeForRead();
00067     assert(bTreeRid == node.firstRID);
00068     return node;
00069 }
00070 
00071 bool LcsClusterReader::getFirstClusterPageForRead(
00072     PConstLcsClusterNode &pBlock)
00073 {
00074     bool found;
00075 
00076     found = bTreeReader->searchFirst();
00077     if (!found) {
00078         return false;
00079     }
00080 
00081     LcsClusterNode const &node = readClusterPage();
00082     pBlock = &node;
00083     return true;
00084 }
00085 
00086 bool LcsClusterReader::getNextClusterPageForRead(PConstLcsClusterNode &pBlock)
00087 {
00088     bool found;
00089 
00090     found = bTreeReader->searchNext();
00091     if (!found) {
00092         return false;
00093     }
00094 
00095     LcsClusterNode const &node = readClusterPage();
00096     pBlock = &node;
00097     return true;
00098 }
00099 
00100 void LcsClusterReader::initColumnReaders(
00101     uint nClusterColsInit,
00102     TupleProjection const &clusterProj)
00103 {
00104     nClusterCols = nClusterColsInit;
00105     nColsToRead = clusterProj.size();
00106     clusterCols.reset(new LcsColumnReader[nColsToRead]);
00107     for (uint i = 0; i < nColsToRead; i++) {
00108         clusterCols[i].init(this, clusterProj[i]);
00109     }
00110 }
00111 
00112 void LcsClusterReader::open()
00113 {
00114     pLeaf = NULL;
00115     pRangeBatches = NULL;
00116     ridRunIter.reset();
00117 
00118     // values of (0, MAXU) indicate that entry has not been filled in yet
00119     nextBTreeEntry.first = PageId(0);
00120     nextBTreeEntry.second = LcsRid(MAXU);
00121 
00122     nextPrefetchEntry.first = PageId(0);
00123     nextPrefetchEntry.second = LcsRid(MAXU);
00124 
00125     dumbPrefetch = false;
00126     nextRid = LcsRid(0);
00127 }
00128 
00129 void LcsClusterReader::close()
00130 {
00131     bTreeReader->endSearch();
00132     unlockClusterPage();
00133 }
00134 
00135 bool LcsClusterReader::position(LcsRid rid)
00136 {
00137     bool found;
00138 
00139     currRid = rid;
00140     if (pLeaf) {
00141         // Scan is already in progress. Try to find the row we want in the
00142         // current block.
00143 
00144         found = positionInBlock(rid);
00145         if (found) {
00146             return true;
00147         }
00148     } else {
00149         if (noPrefetch) {
00150             if (!bTreeReader->searchFirst()) {
00151                 bTreeReader->endSearch();
00152             }
00153         } else {
00154             // Initiate the first set of pre-fetches.
00155             prefetchQueue.mapRange(segmentAccessor, NULL_PAGE_ID);
00156         }
00157     }
00158 
00159     if (noPrefetch) {
00160         found = searchForRid(rid);
00161         if (!found) {
00162             return false;
00163         }
00164         moveToBlock(readClusterPageId());
00165     } else {
00166         found = moveToBlockWithRid(rid);
00167         if (!found) {
00168             return false;
00169         }
00170     }
00171 
00172     found = positionInBlock(rid);
00173     // page ends before "rid"; we must be off the last block
00174     if (!found) {
00175         return false;
00176     }
00177 
00178     return true;
00179 }
00180 
00181 bool LcsClusterReader::searchForRid(LcsRid rid)
00182 {
00183     bTreeTupleData[0].pData = (PConstBuffer) &rid;
00184     // position on greatest lower bound of key
00185     bTreeReader->searchForKey(
00186         bTreeTupleData, DUP_SEEK_BEGIN, false);
00187     if (bTreeReader->isSingular()) {
00188         return false;
00189     }
00190     bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00191 
00192     LcsRid key = readRid();
00193     assert(key <= rid);
00194     return true;
00195 }
00196 
00197 void LcsClusterReader::moveToBlock(PageId clusterPageId)
00198 {
00199     // read the desired cluster page and initialize structures to reflect
00200     // page read
00201 
00202     clusterLock.lockShared(clusterPageId);
00203     LcsClusterNode const &page = clusterLock.getNodeForRead();
00204     pLHdr = &page;
00205     setUpBlock();
00206 }
00207 
00208 bool LcsClusterReader::moveToBlockWithRid(LcsRid rid)
00209 {
00210     PageId clusterPageId;
00211 
00212     // Read entries from the pre-fetch queue until we either find the entry
00213     // within the range of our desired rid, or we exhaust the contents of
00214     // the queue.
00215     for (;;) {
00216         std::pair<PageId, LcsRid> &prefetchEntry =
00217             (nextPrefetchEntry.second == LcsRid(MAXU)) ?
00218             *prefetchQueue : nextPrefetchEntry;
00219 
00220         clusterPageId = prefetchEntry.first;
00221         if (clusterPageId == NULL_PAGE_ID) {
00222             return false;
00223         }
00224 
00225         LcsRid prefetchRid = prefetchEntry.second;
00226         assert(prefetchRid <= rid);
00227 
00228         // Make sure this is the correct entry by checking that the rid
00229         // is smaller than the next entry.  We can end up with non-matching
00230         // entries if dumb pre-fetches are being used.
00231         ++prefetchQueue;
00232         nextPrefetchEntry = *prefetchQueue;
00233         if (nextPrefetchEntry.first == NULL_PAGE_ID ||
00234             rid < nextPrefetchEntry.second)
00235         {
00236             break;
00237         } else {
00238             continue;
00239         }
00240     }
00241 
00242     // Now that we've located the desired page, read it.
00243     clusterLock.lockShared(clusterPageId);
00244     LcsClusterNode const &page = clusterLock.getNodeForRead();
00245     pLHdr = &page;
00246     setUpBlock();
00247     return true;
00248 }
00249 
00250 bool LcsClusterReader::positionInBlock(LcsRid rid)
00251 {
00252     // Go forward through the ranges in the current block until we find the
00253     // right one, or until we hit the end of the block
00254     while (rid >= getRangeEndRid()
00255            && pRangeBatches + nClusterCols < pBatches + pLHdr->nBatch) {
00256         rangeStartRid += pRangeBatches->nRow;
00257 
00258         pRangeBatches += nClusterCols;            // go to start of next range
00259 
00260         // set end rowid based on already available info (for performance
00261         // reasons)
00262         rangeEndRid = rangeStartRid + pRangeBatches->nRow;
00263     }
00264 
00265     // Try to position within current batch
00266     if (rid < getRangeEndRid()) {
00267         assert(rid >= rangeStartRid);
00268         positionInRange(opaqueToInt(rid) - opaqueToInt(rangeStartRid));
00269         return true;
00270     } else {
00271         return false;
00272     }
00273 }
00274 
00275 void LcsClusterReader::setUpBlock()
00276 {
00277     // setup pointers to lastVal, firstVal arrays
00278     pLeaf = (PBuffer) pLHdr;
00279     setHdrOffsets(pLHdr);
00280 
00281     assert(pLHdr->nBatch > 0);
00282 
00283     pBatches = (PLcsBatchDir) (pLeaf + pLHdr->oBatch);
00284     rangeStartRid = pLHdr->firstRID;
00285 
00286     // at first range in block
00287     pRangeBatches = pBatches;
00288     // at first rid in range
00289     nRangePos = 0;
00290 
00291     // set end rowid based on already available info (for performance reasons)
00292     rangeEndRid = rangeStartRid + pRangeBatches->nRow;
00293 }
00294 
00295 bool LcsClusterReader::advance(uint nRids)
00296 {
00297     uint newPos = nRangePos + nRids;
00298 
00299     if (newPos < pRangeBatches->nRow) {
00300         nRangePos = newPos;
00301         return true;
00302     } else {
00303         return false;
00304     }
00305 }
00306 
00307 PageId LcsClusterReader::getNextPageForPrefetch(LcsRid &rid, bool &found)
00308 {
00309     found = true;
00310     for (;;) {
00311         if (dumbPrefetch) {
00312             rid = currRid;
00313         } else {
00314             rid = getFetchRids(ridRunIter, nextRid, false);
00315         }
00316 
00317         // If the rid run buffer is exhausted and there are still rid runs
00318         // to be filled in, we're pre-fetching faster than we can fill up
00319         // the rid run buffers.  So, switch to dumb pre-fetch in that case.
00320         if (rid == LcsRid(MAXU)) {
00321             if (ridRunIter.done()) {
00322                 rid = LcsRid(0);
00323                 return NULL_PAGE_ID;
00324             } else {
00325                 // TODO - Use stricter criteria on when to switch to dumb
00326                 // pre-fetch.  E.g., if there are less than a certain number
00327                 // of pages already pre-fetched.
00328                 dumbPrefetch = true;
00329                 continue;
00330             }
00331         }
00332 
00333         if (!(nextBTreeEntry.second == LcsRid(MAXU) &&
00334             nextBTreeEntry.first == PageId(0)))
00335         {
00336             // If we hit the end of the btree on the last iteration, then
00337             // there are no more pages.
00338             if (nextBTreeEntry.first == NULL_PAGE_ID) {
00339                 rid = LcsRid(0);
00340                 return NULL_PAGE_ID;
00341             }
00342 
00343             // If the next rid to be read is on the same page as the last
00344             // one located, bump up the rid so we move over to the next page.
00345             // In the case of dumb pre-fetch, just use the next page in the
00346             // cluster sequence.
00347             if (rid < nextBTreeEntry.second) {
00348                 if (dumbPrefetch) {
00349                     rid = nextBTreeEntry.second;
00350                     PageId pageId = nextBTreeEntry.first;
00351                     getNextBTreeEntry();
00352                     return pageId;
00353                 }
00354                 nextRid = nextBTreeEntry.second;
00355                 continue;
00356             } else {
00357                 // TODO - optimize by avoiding search from top of btree if
00358                 // desired entry is within a few entries of the current
00359                 break;
00360            }
00361         } else {
00362             // We haven't located any pages yet.  Initiate first btree search.
00363             break;
00364         }
00365     }
00366 
00367     bool rc = searchForRid(rid);
00368     if (!rc) {
00369         return NULL_PAGE_ID;
00370     }
00371     rid = readRid();
00372     PageId pageId = readClusterPageId();
00373     getNextBTreeEntry();
00374     return pageId;
00375 }
00376 
00377 void LcsClusterReader::getNextBTreeEntry()
00378 {
00379     bool rc = bTreeReader->searchNext();
00380     if (!rc) {
00381         // Indicate that there are no more pages
00382         nextBTreeEntry.first = NULL_PAGE_ID;
00383     } else {
00384         bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00385         nextBTreeEntry.first = readClusterPageId();
00386         nextBTreeEntry.second = readRid();
00387     }
00388 }
00389 
00390 LcsRid LcsClusterReader::getFetchRids(
00391     CircularBufferIter<LcsRidRun> &ridRunIter,
00392     LcsRid &nextRid,
00393     bool remove)
00394 {
00395     for (;;) {
00396         if (ridRunIter.end()) {
00397             return LcsRid(MAXU);
00398         }
00399 
00400         LcsRidRun &currRidRun = *ridRunIter;
00401         if (nextRid < currRidRun.startRid) {
00402             nextRid = currRidRun.startRid + 1;
00403             return currRidRun.startRid;
00404         } else if (
00405             (currRidRun.nRids == RecordNum(MAXU) &&
00406                 nextRid >= currRidRun.startRid) ||
00407             (nextRid < currRidRun.startRid + currRidRun.nRids))
00408         {
00409             return nextRid++;
00410         } else {
00411             ++ridRunIter;
00412             if (remove) {
00413                 ridRunIter.removeFront();
00414             }
00415         }
00416     }
00417 }
00418 
00419 void LcsClusterReader::catchUp(uint parentBufPos, LcsRid parentNextRid)
00420 {
00421     if (parentNextRid - 1 > nextRid) {
00422         nextRid = parentNextRid - 1;
00423     }
00424     if (parentBufPos > ridRunIter.getCurrPos()) {
00425         ridRunIter.setCurrPos(parentBufPos);
00426     }
00427 }
00428 
00429 RecordNum LcsClusterReader::getNumRows()
00430 {
00431     // Read the last cluster page
00432     if (bTreeReader->searchLast() == false) {
00433         bTreeReader->endSearch();
00434         return RecordNum(0);
00435     }
00436     bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00437     LcsClusterNode const &node = readClusterPage();
00438 
00439     // Then count the number of rows in each batch on that page
00440     RecordNum nRows = RecordNum(opaqueToInt(node.firstRID));
00441     PLcsBatchDir pBatch = (PLcsBatchDir) ((PBuffer) &node + node.oBatch);
00442     for (uint i = 0; i < node.nBatch; i += nClusterCols) {
00443         nRows += pBatch[i].nRow;
00444     }
00445 
00446     bTreeReader->endSearch();
00447     return nRows;
00448 }
00449 
00450 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $");
00451 
00452 // End LcsClusterReader.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1