00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterReader.h"
00024 #include "fennel/lucidera/colstore/LcsColumnReader.h"
00025 #include "fennel/btree/BTreeWriter.h"
00026 #include "fennel/tuple/TupleAccessor.h"
00027 #include "fennel/segment/SegPageEntryIterImpl.h"
00028
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $");
00030
00031 LcsClusterReader::LcsClusterReader(
00032 BTreeDescriptor const &treeDescriptor,
00033 CircularBuffer<LcsRidRun> *pRidRuns)
00034 :
00035 LcsClusterAccessBase(treeDescriptor),
00036 SegPageEntryIterSource<LcsRid>(),
00037 ridRunIter(pRidRuns),
00038 prefetchQueue(4000)
00039 {
00040 bTreeReader = SharedBTreeReader(new BTreeReader(treeDescriptor));
00041 if (pRidRuns == NULL) {
00042 noPrefetch = true;
00043 } else {
00044 noPrefetch = false;
00045 prefetchQueue.setPrefetchSource(*this);
00046 }
00047 }
00048
00049 LcsClusterReader::~LcsClusterReader()
00050 {
00051 }
00052
00053 void LcsClusterReader::setRootPageId(PageId rootPageId)
00054 {
00055 bTreeReader->setRootPageId(rootPageId);
00056 }
00057
00058 LcsClusterNode const &LcsClusterReader::readClusterPage()
00059 {
00060 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00061 clusterPageId = readClusterPageId();
00062
00063
00064 bTreeRid = readRid();
00065 clusterLock.lockShared(clusterPageId);
00066 LcsClusterNode const &node = clusterLock.getNodeForRead();
00067 assert(bTreeRid == node.firstRID);
00068 return node;
00069 }
00070
00071 bool LcsClusterReader::getFirstClusterPageForRead(
00072 PConstLcsClusterNode &pBlock)
00073 {
00074 bool found;
00075
00076 found = bTreeReader->searchFirst();
00077 if (!found) {
00078 return false;
00079 }
00080
00081 LcsClusterNode const &node = readClusterPage();
00082 pBlock = &node;
00083 return true;
00084 }
00085
00086 bool LcsClusterReader::getNextClusterPageForRead(PConstLcsClusterNode &pBlock)
00087 {
00088 bool found;
00089
00090 found = bTreeReader->searchNext();
00091 if (!found) {
00092 return false;
00093 }
00094
00095 LcsClusterNode const &node = readClusterPage();
00096 pBlock = &node;
00097 return true;
00098 }
00099
00100 void LcsClusterReader::initColumnReaders(
00101 uint nClusterColsInit,
00102 TupleProjection const &clusterProj)
00103 {
00104 nClusterCols = nClusterColsInit;
00105 nColsToRead = clusterProj.size();
00106 clusterCols.reset(new LcsColumnReader[nColsToRead]);
00107 for (uint i = 0; i < nColsToRead; i++) {
00108 clusterCols[i].init(this, clusterProj[i]);
00109 }
00110 }
00111
00112 void LcsClusterReader::open()
00113 {
00114 pLeaf = NULL;
00115 pRangeBatches = NULL;
00116 ridRunIter.reset();
00117
00118
00119 nextBTreeEntry.first = PageId(0);
00120 nextBTreeEntry.second = LcsRid(MAXU);
00121
00122 nextPrefetchEntry.first = PageId(0);
00123 nextPrefetchEntry.second = LcsRid(MAXU);
00124
00125 dumbPrefetch = false;
00126 nextRid = LcsRid(0);
00127 }
00128
00129 void LcsClusterReader::close()
00130 {
00131 bTreeReader->endSearch();
00132 unlockClusterPage();
00133 }
00134
00135 bool LcsClusterReader::position(LcsRid rid)
00136 {
00137 bool found;
00138
00139 currRid = rid;
00140 if (pLeaf) {
00141
00142
00143
00144 found = positionInBlock(rid);
00145 if (found) {
00146 return true;
00147 }
00148 } else {
00149 if (noPrefetch) {
00150 if (!bTreeReader->searchFirst()) {
00151 bTreeReader->endSearch();
00152 }
00153 } else {
00154
00155 prefetchQueue.mapRange(segmentAccessor, NULL_PAGE_ID);
00156 }
00157 }
00158
00159 if (noPrefetch) {
00160 found = searchForRid(rid);
00161 if (!found) {
00162 return false;
00163 }
00164 moveToBlock(readClusterPageId());
00165 } else {
00166 found = moveToBlockWithRid(rid);
00167 if (!found) {
00168 return false;
00169 }
00170 }
00171
00172 found = positionInBlock(rid);
00173
00174 if (!found) {
00175 return false;
00176 }
00177
00178 return true;
00179 }
00180
00181 bool LcsClusterReader::searchForRid(LcsRid rid)
00182 {
00183 bTreeTupleData[0].pData = (PConstBuffer) &rid;
00184
00185 bTreeReader->searchForKey(
00186 bTreeTupleData, DUP_SEEK_BEGIN, false);
00187 if (bTreeReader->isSingular()) {
00188 return false;
00189 }
00190 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00191
00192 LcsRid key = readRid();
00193 assert(key <= rid);
00194 return true;
00195 }
00196
00197 void LcsClusterReader::moveToBlock(PageId clusterPageId)
00198 {
00199
00200
00201
00202 clusterLock.lockShared(clusterPageId);
00203 LcsClusterNode const &page = clusterLock.getNodeForRead();
00204 pLHdr = &page;
00205 setUpBlock();
00206 }
00207
00208 bool LcsClusterReader::moveToBlockWithRid(LcsRid rid)
00209 {
00210 PageId clusterPageId;
00211
00212
00213
00214
00215 for (;;) {
00216 std::pair<PageId, LcsRid> &prefetchEntry =
00217 (nextPrefetchEntry.second == LcsRid(MAXU)) ?
00218 *prefetchQueue : nextPrefetchEntry;
00219
00220 clusterPageId = prefetchEntry.first;
00221 if (clusterPageId == NULL_PAGE_ID) {
00222 return false;
00223 }
00224
00225 LcsRid prefetchRid = prefetchEntry.second;
00226 assert(prefetchRid <= rid);
00227
00228
00229
00230
00231 ++prefetchQueue;
00232 nextPrefetchEntry = *prefetchQueue;
00233 if (nextPrefetchEntry.first == NULL_PAGE_ID ||
00234 rid < nextPrefetchEntry.second)
00235 {
00236 break;
00237 } else {
00238 continue;
00239 }
00240 }
00241
00242
00243 clusterLock.lockShared(clusterPageId);
00244 LcsClusterNode const &page = clusterLock.getNodeForRead();
00245 pLHdr = &page;
00246 setUpBlock();
00247 return true;
00248 }
00249
00250 bool LcsClusterReader::positionInBlock(LcsRid rid)
00251 {
00252
00253
00254 while (rid >= getRangeEndRid()
00255 && pRangeBatches + nClusterCols < pBatches + pLHdr->nBatch) {
00256 rangeStartRid += pRangeBatches->nRow;
00257
00258 pRangeBatches += nClusterCols;
00259
00260
00261
00262 rangeEndRid = rangeStartRid + pRangeBatches->nRow;
00263 }
00264
00265
00266 if (rid < getRangeEndRid()) {
00267 assert(rid >= rangeStartRid);
00268 positionInRange(opaqueToInt(rid) - opaqueToInt(rangeStartRid));
00269 return true;
00270 } else {
00271 return false;
00272 }
00273 }
00274
00275 void LcsClusterReader::setUpBlock()
00276 {
00277
00278 pLeaf = (PBuffer) pLHdr;
00279 setHdrOffsets(pLHdr);
00280
00281 assert(pLHdr->nBatch > 0);
00282
00283 pBatches = (PLcsBatchDir) (pLeaf + pLHdr->oBatch);
00284 rangeStartRid = pLHdr->firstRID;
00285
00286
00287 pRangeBatches = pBatches;
00288
00289 nRangePos = 0;
00290
00291
00292 rangeEndRid = rangeStartRid + pRangeBatches->nRow;
00293 }
00294
00295 bool LcsClusterReader::advance(uint nRids)
00296 {
00297 uint newPos = nRangePos + nRids;
00298
00299 if (newPos < pRangeBatches->nRow) {
00300 nRangePos = newPos;
00301 return true;
00302 } else {
00303 return false;
00304 }
00305 }
00306
00307 PageId LcsClusterReader::getNextPageForPrefetch(LcsRid &rid, bool &found)
00308 {
00309 found = true;
00310 for (;;) {
00311 if (dumbPrefetch) {
00312 rid = currRid;
00313 } else {
00314 rid = getFetchRids(ridRunIter, nextRid, false);
00315 }
00316
00317
00318
00319
00320 if (rid == LcsRid(MAXU)) {
00321 if (ridRunIter.done()) {
00322 rid = LcsRid(0);
00323 return NULL_PAGE_ID;
00324 } else {
00325
00326
00327
00328 dumbPrefetch = true;
00329 continue;
00330 }
00331 }
00332
00333 if (!(nextBTreeEntry.second == LcsRid(MAXU) &&
00334 nextBTreeEntry.first == PageId(0)))
00335 {
00336
00337
00338 if (nextBTreeEntry.first == NULL_PAGE_ID) {
00339 rid = LcsRid(0);
00340 return NULL_PAGE_ID;
00341 }
00342
00343
00344
00345
00346
00347 if (rid < nextBTreeEntry.second) {
00348 if (dumbPrefetch) {
00349 rid = nextBTreeEntry.second;
00350 PageId pageId = nextBTreeEntry.first;
00351 getNextBTreeEntry();
00352 return pageId;
00353 }
00354 nextRid = nextBTreeEntry.second;
00355 continue;
00356 } else {
00357
00358
00359 break;
00360 }
00361 } else {
00362
00363 break;
00364 }
00365 }
00366
00367 bool rc = searchForRid(rid);
00368 if (!rc) {
00369 return NULL_PAGE_ID;
00370 }
00371 rid = readRid();
00372 PageId pageId = readClusterPageId();
00373 getNextBTreeEntry();
00374 return pageId;
00375 }
00376
00377 void LcsClusterReader::getNextBTreeEntry()
00378 {
00379 bool rc = bTreeReader->searchNext();
00380 if (!rc) {
00381
00382 nextBTreeEntry.first = NULL_PAGE_ID;
00383 } else {
00384 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00385 nextBTreeEntry.first = readClusterPageId();
00386 nextBTreeEntry.second = readRid();
00387 }
00388 }
00389
00390 LcsRid LcsClusterReader::getFetchRids(
00391 CircularBufferIter<LcsRidRun> &ridRunIter,
00392 LcsRid &nextRid,
00393 bool remove)
00394 {
00395 for (;;) {
00396 if (ridRunIter.end()) {
00397 return LcsRid(MAXU);
00398 }
00399
00400 LcsRidRun &currRidRun = *ridRunIter;
00401 if (nextRid < currRidRun.startRid) {
00402 nextRid = currRidRun.startRid + 1;
00403 return currRidRun.startRid;
00404 } else if (
00405 (currRidRun.nRids == RecordNum(MAXU) &&
00406 nextRid >= currRidRun.startRid) ||
00407 (nextRid < currRidRun.startRid + currRidRun.nRids))
00408 {
00409 return nextRid++;
00410 } else {
00411 ++ridRunIter;
00412 if (remove) {
00413 ridRunIter.removeFront();
00414 }
00415 }
00416 }
00417 }
00418
00419 void LcsClusterReader::catchUp(uint parentBufPos, LcsRid parentNextRid)
00420 {
00421 if (parentNextRid - 1 > nextRid) {
00422 nextRid = parentNextRid - 1;
00423 }
00424 if (parentBufPos > ridRunIter.getCurrPos()) {
00425 ridRunIter.setCurrPos(parentBufPos);
00426 }
00427 }
00428
00429 RecordNum LcsClusterReader::getNumRows()
00430 {
00431
00432 if (bTreeReader->searchLast() == false) {
00433 bTreeReader->endSearch();
00434 return RecordNum(0);
00435 }
00436 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00437 LcsClusterNode const &node = readClusterPage();
00438
00439
00440 RecordNum nRows = RecordNum(opaqueToInt(node.firstRID));
00441 PLcsBatchDir pBatch = (PLcsBatchDir) ((PBuffer) &node + node.oBatch);
00442 for (uint i = 0; i < node.nBatch; i += nClusterCols) {
00443 nRows += pBatch[i].nRow;
00444 }
00445
00446 bTreeReader->endSearch();
00447 return nRows;
00448 }
00449
00450 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $");
00451
00452