LbmEntry.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmEntry.cpp#26 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/bitmap/LbmEntry.h"
00024 #include "fennel/tuple/TuplePrinter.h"
00025 #include "fennel/common/FennelResource.h"
00026 #include <iomanip>
00027 #include <sstream>
00028 #include <boost/scoped_array.hpp>
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmEntry.cpp#26 $");
00031 
00032 
00033 LbmEntry::LbmEntry()
00034 {
00035     scratchBuffer = NULL;
00036 
00037     /*
00038      * Reset buffers and offsets
00039      */
00040     pSegDescStart = NULL;
00041     pSegStart = NULL;
00042 
00043     resetSegment();
00044 }
00045 
00046 
00047 void LbmEntry::init(
00048     PBuffer scratchBufferInit,
00049     PBuffer mergeScratchBufferInit,
00050     uint scratchBufferSizeInit,
00051     TupleDescriptor const &tupleDesc)
00052 {
00053     scratchBuffer = scratchBufferInit;
00054     mergeScratchBuffer = mergeScratchBufferInit;
00055 
00056     /*
00057      * Leave room to write the last segment descriptor which has an optional
00058      * length field of 3 bytes.
00059      */
00060     scratchBufferSize = scratchBufferSizeInit;
00061     scratchBufferUsableSize = scratchBufferSizeInit - LbmZeroLengthExtended;
00062 
00063 #ifdef DEBUG
00064     /*
00065      * Overwrite the buffer with FFs.
00066      */
00067     memset(scratchBuffer, 0xFF, scratchBufferSizeInit);
00068     if (mergeScratchBuffer != NULL) {
00069         memset(mergeScratchBuffer, 0xFF, scratchBufferSizeInit);
00070     }
00071 #endif
00072 
00073     /*
00074      * Set up the entryTuple
00075      */
00076     entryTuple.compute(tupleDesc);
00077     currentEntrySize = 0;
00078 
00079     keyDesc.resize(tupleDesc.size() - 2);
00080     for (uint i = 0; i < keyDesc.size(); i++) {
00081         keyDesc[i] = tupleDesc[i];
00082     }
00083 
00084     bitmapSegSize = tupleDesc[tupleDesc.size() - 1].cbStorage;
00085     maxSegSize = getMaxBitmapSize(bitmapSegSize);
00086 }
00087 
00088 
00089 void LbmEntry::setEntryTuple(TupleData const &indexTuple)
00090 {
00091     /*
00092      * RID field
00093      */
00094     uint RIDField = entryTuple.size() - 3;
00095 
00096     /*
00097      * Next to the last.
00098      */
00099     uint segmentDescField = entryTuple.size() - 2;
00100 
00101     /*
00102      * Last field.
00103      */
00104     uint segmentField = entryTuple.size() - 1;
00105 
00106     /*
00107      * Now copy everything from indexTuple to entryTuple, using scratchBuffer
00108      * to store tuple.
00109      */
00110     currentEntrySize = 0;
00111 
00112     for (int i = 0; i < entryTuple.size(); i ++) {
00113         if (i != segmentField) {
00114             entryTuple[i].pData = scratchBuffer + currentEntrySize;
00115         } else {
00116             /*
00117              * Move the segments to the end of the buffer. Segments grow
00118              * backward in memory.
00119              */
00120             entryTuple[i].pData = scratchBuffer + scratchBufferSize
00121                                   - indexTuple[i].cbData;
00122         }
00123 
00124         entryTuple[i].memCopyFrom(indexTuple[i]);
00125 
00126         if (entryTuple[i].isNull()) {
00127             entryTuple[i].cbData = 0;
00128         }
00129         currentEntrySize += entryTuple[i].cbData;
00130         if (i == RIDField) {
00131             keySize = currentEntrySize;
00132         }
00133     }
00134 
00135     /*
00136      * We only know the available buffer size after getting the
00137      * key values since the column(s) might be of variable size.
00138      *
00139      * The current entry might have been initialized from an existing entry
00140      * in which case, the last bytes reserved for extended zero length might
00141      * have been used. The total length in this case will exceed
00142      * scratchBufferUsableSize but not scratchBufferSize.
00143      */
00144     validateEntrySize();
00145 
00146     /*
00147      * Set up the Segment and Segment descriptor pointers. There are a few
00148      * cases when LBmEntry is initialized, and the buffer setup is different
00149      * for each case.
00150      * 1. LbmGenerator calls init with <key1>...<keyN><actualRID><NULL><NULL>
00151      * 2. LbmSplicer calls init with <Key1>..<keyN><SingletonRID><NULL><NULL>
00152      * 3. LbmSplicer calls init with <Key1>..<keyN><StartRID><value><value>
00153      * 4. LbmSplicer calls init with <Key1>..<keyN><StartRID><NULL><value>
00154      *
00155      * For case 1 and 2, the setup for the variable part(segmentDesc/segment)
00156      * is the same(point to begin and end of the available buffer).
00157      */
00158 
00159     startRID = *((LcsRid *)entryTuple[RIDField].pData);
00160 
00161     if (isSingleton(indexTuple)) {
00162         /*
00163          * Case 1, or 2
00164          */
00165         pSegDescStart = pSegDescEnd = scratchBuffer + keySize;
00166         pSegStart     = pSegEnd     = scratchBuffer + scratchBufferSize;
00167 
00168         resetSegment();
00169     } else {
00170         /*
00171          * Case 3, or 4(single bitmap)
00172          * From a bitmap entry. Check startRID is well-formed.
00173          */
00174         assert((startRID) % LbmOneByteSize == (LcsRid)0);
00175 
00176         uint segLength = entryTuple[segmentField].cbData;
00177         /*
00178          * Check if this is a single bitmap, or a compressed bitmap.
00179          */
00180         if (isSingleBitmap(entryTuple)) {
00181             /*
00182              * Add segment descriptors to single bitmaps.
00183              * It should fit since an on-disk single bitmap always come
00184              * from an in-memory compressed bitmap; so reverting a single bitmap
00185              * to compressed bitmap should always succeed.
00186              */
00187             pSegDescStart = pSegDescEnd = scratchBuffer + keySize;
00188             uint reservedSpace = 0;
00189             bool ret = addSegDesc(reservedSpace, segLength);
00190             assert (ret);
00191         } else {
00192             pSegDescStart = (PBuffer)entryTuple[segmentDescField].pData;
00193             pSegDescEnd = pSegDescStart + entryTuple[segmentDescField].cbData;
00194         }
00195 
00196         pSegStart = scratchBuffer + scratchBufferSize;
00197         pSegEnd = pSegStart - segLength;
00198 
00199         resetSegment();
00200     }
00201 }
00202 
00203 
00204 bool LbmEntry::setRIDNewSegment(LcsRid rid)
00205 {
00206     if (!openNewSegment(rid)) {
00207         return false;
00208     }
00209 
00210     setRIDCurrentSegByte(rid);
00211     return true;
00212 }
00213 
00214 
00215 bool LbmEntry::setRIDAdjacentSegByte(LcsRid rid)
00216 {
00217     assert (!isSingleBitmap());
00218 
00219     if (currSegLength == LbmMaxSegSize) {
00220         /*
00221          * Current segment is full.
00222          * Need New segement and new segment descriptor.
00223          * First close the current segment in this entry. Use the
00224          * closeCurrentSegment() interface which does not encode any zeros
00225          * since the next segment is adjacent.
00226          */
00227         closeCurrentSegment();
00228         return setRIDNewSegment(rid);
00229     }
00230 
00231     if (currentEntrySize + 1 > scratchBufferUsableSize) {
00232         closeCurrentSegment();
00233         return false;
00234     }
00235 
00236     if (pSegStart - pSegEnd == bitmapSegSize) {
00237         closeCurrentSegment();
00238         return false;
00239     }
00240     pSegEnd--;
00241     currSegByte = pSegEnd;
00242     currSegByteStartRID = roundToByteBoundary(rid);
00243     currSegLength ++;
00244 
00245     *currSegByte = (uint8_t)(1 << (opaqueToInt(rid) % LbmOneByteSize));
00246 
00247     currentEntrySize += 1;
00248     /*
00249      * We are growing the current segment. If there is a descriptor for this
00250      * segment, add one more segment byte.
00251      */
00252     if (isSegmentOpen()) {
00253         setSegLength(*currSegDescByte, currSegLength);
00254     }
00255 
00256     return true;
00257 }
00258 
00259 
00260 bool LbmEntry::openNewSegment(LcsRid rid)
00261 {
00262     assert (!isSegmentOpen());
00263     assert (pSegDescEnd);
00264 
00265     if (currentEntrySize + 2 > scratchBufferUsableSize) {
00266         return false;
00267     }
00268 
00269     if (pSegStart - pSegEnd == bitmapSegSize) {
00270         return false;
00271     }
00272     pSegEnd--;
00273     currSegByte = pSegEnd;
00274     *currSegByte = 0;
00275 
00276     currSegDescByte = pSegDescEnd;
00277     pSegDescEnd ++;
00278 
00279     currSegByteStartRID = roundToByteBoundary(rid);
00280     currSegLength = 1;
00281 
00282     /*
00283      * This is a new segment, set number of segment byte to be one.
00284      * Note that the stored length is actually (length - 1). So we
00285      * store value zero here.
00286      */
00287     setSegLength(*currSegDescByte, currSegLength);
00288     currentEntrySize += 2;
00289 
00290     return true;
00291 }
00292 
00293 
00294 void LbmEntry::openLastSegment()
00295 {
00296     assert (!isSegmentOpen());
00297     assert (pSegEnd);
00298 
00299     uint lastZeroRIDs = 0;
00300     uint rowCount = getRowCount(currSegDescByte, lastZeroRIDs);
00301 
00302     // get current segment length
00303     currSegLength = getSegLength(*currSegDescByte);
00304 
00305     // backtrack to exclude the extended zero length bytes
00306     currentEntrySize -= getZeroLengthByteCount(*currSegDescByte);
00307 
00308     pSegDescEnd = currSegDescByte + 1;
00309 
00310     // write the segment length back, and erase the previously stored extended
00311     // zero length bytes in the seg descriptor
00312     setSegLength(*currSegDescByte, currSegLength);
00313 
00314     // still point to the same last segment byte
00315     currSegByte = pSegEnd;
00316 
00317     currSegByteStartRID =
00318         startRID + rowCount - lastZeroRIDs - LbmOneByteSize;
00319 }
00320 
00321 
00322 void LbmEntry::closeCurrentSegment()
00323 {
00324     assert (isSegmentOpen());
00325     assert (currSegLength >= 1 && currSegLength <= LbmMaxSegSize);
00326 
00327     setSegLength(*currSegDescByte, currSegLength);
00328     resetSegment();
00329 }
00330 
00331 
00332 bool LbmEntry::closeCurrentSegment(LcsRid rid)
00333 {
00334     if (!isSegmentOpen()) {
00335         // segment already closed.
00336         return true;
00337     }
00338 
00339     int ridDistance = 0;
00340 
00341     assert(currSegLength >= 1 && currSegLength <= LbmMaxSegSize);
00342 
00343     setSegLength(*currSegDescByte, currSegLength);
00344 
00345     /*
00346      * Count of zero bits in bytes. Each byte represents 8 RIDs.
00347      */
00348     ridDistance =
00349         opaqueToInt(roundToByteBoundary(rid) - currSegByteStartRID)
00350         / LbmOneByteSize - 1;
00351 
00352     if (ridDistance <= 0) {
00353         /*
00354          * Boundary case: request writing the directory even though the rid is
00355          * in the same seg, or in an adjacent segment. In both cases, there is
00356          * no rid gap between the two segments.
00357          */
00358     } else {
00359         uint lengthBytes;
00360         assert(currSegDescByte + 1 == pSegDescEnd);
00361         if (!setZeroLength(ridDistance, currSegDescByte, lengthBytes)) {
00362             /*
00363              * The ridDistance can not be encoded in LbmZeroLengthExtended
00364              * bytes.
00365              *
00366              * In this special case, this segment descriptor will be the
00367              * last one on the current LbmEntry. The descriptor will encode
00368              * that 0 bits follows the current segment. The caller will
00369              * construct a new LbmEntry with rid, and very likely there's a
00370              * gap between the last RID encoded by the current LbmEntry and
00371              * the startRID of this new LbmEntry. This means that RIDs
00372              * might not be densely encoded in an ordered sequence of
00373              * LbmEntries.
00374              */
00375             resetSegment();
00376             return false;
00377         }
00378         currentEntrySize += lengthBytes;
00379         pSegDescEnd += lengthBytes;
00380     }
00381     resetSegment();
00382     return true;
00383 }
00384 
00385 bool LbmEntry::setZeroLength(
00386     uint nZeroBytes, PBuffer pLenDesc, uint &lengthBytes)
00387 {
00388     *(pLenDesc) &= ~LbmZeroLengthMask;
00389     if (nZeroBytes <= LbmZeroLengthCompact) {
00390         /*
00391          * Can encode the zero bits directly in the segment descriptor.
00392          */
00393         *(pLenDesc) |= (uint8_t) (nZeroBytes & LbmZeroLengthMask);
00394         lengthBytes = 0;
00395     } else {
00396         lengthBytes = value2ByteArray(
00397             nZeroBytes, pLenDesc + 1, LbmZeroLengthExtended);
00398 
00399         if (lengthBytes) {
00400             /*
00401              * Caller needs to ensure that the buffer has enough space to
00402              * encode the zero bytes.
00403              */
00404             *(pLenDesc) |=
00405                 (uint8_t) ((lengthBytes + LbmZeroLengthCompact) &
00406                     LbmZeroLengthMask);
00407         } else {
00408             return false;
00409         }
00410     }
00411 
00412     return true;
00413 }
00414 
00415 uint LbmEntry::getRowCount()
00416 {
00417     PBuffer lastSegDescByte = NULL;
00418     uint lastZeroRIDs = 0;
00419 
00420     return getRowCount(lastSegDescByte, lastZeroRIDs);
00421 }
00422 
00423 
00424 uint LbmEntry::getCompressedRowCount(
00425     PBuffer pDescStart,
00426     PBuffer pDescEnd,
00427     PBuffer &lastSegDescByte,
00428     uint &lastZeroRIDs)
00429 {
00430     PBuffer p1 = pDescStart;
00431     uint lastLengthDesc = 0;
00432     uint lastLengthDescBytes, lastZeroBytes;
00433     uint rowCount = 0;
00434 
00435     while (p1 < pDescEnd) {
00436         /*
00437          * Count the RIDs in bitmaps.
00438          */
00439         rowCount += getSegLength(*p1) * LbmOneByteSize;
00440 
00441         /*
00442          * Count the RIDs in Descriptors.
00443          */
00444         lastSegDescByte = p1;
00445         lastLengthDesc = *p1 & LbmZeroLengthMask;
00446         p1 ++;
00447 
00448         if (lastLengthDesc <= LbmZeroLengthCompact) {
00449             lastZeroBytes = lastLengthDesc;
00450         } else {
00451             lastLengthDescBytes = lastLengthDesc - LbmZeroLengthCompact;
00452             /*
00453              * Translate number of bytes into number of RIDs(bits).
00454              */
00455             lastZeroBytes =
00456                 byteArray2Value(p1, lastLengthDescBytes);
00457             p1 += lastLengthDescBytes;
00458         }
00459 
00460         lastZeroRIDs = lastZeroBytes * LbmOneByteSize;
00461         rowCount += lastZeroRIDs;
00462     }
00463     return rowCount;
00464 }
00465 
00466 
00467 uint LbmEntry::getRowCount(
00468     PBuffer &lastSegDescByte,
00469     uint &lastZeroRIDs)
00470 {
00471     assert (!isSingleBitmap());
00472 
00473     uint rowCount = 0;
00474 
00475     if (isSingleton()) {
00476         rowCount = 1;
00477         lastSegDescByte = NULL;
00478         lastZeroRIDs = 0;
00479     } else {
00480         rowCount =
00481             getCompressedRowCount(
00482                 pSegDescStart, pSegDescEnd,
00483                 lastSegDescByte, lastZeroRIDs);
00484     }
00485     return rowCount;
00486 }
00487 
00488 
00489 uint LbmEntry::getRowCount(TupleData const &inputTuple)
00490 {
00491     uint rowCount = 0;
00492 
00493     if (isSingleton(inputTuple)) {
00494         rowCount = 1;
00495     } else if (isSingleBitmap(inputTuple)) {
00496         /*
00497          * A single bitmap
00498          */
00499         rowCount =
00500             inputTuple[inputTuple.size() - 1].cbData * LbmOneByteSize;
00501     } else {
00502         PBuffer lastSegDescByte = NULL;
00503         uint lastZeroRIDs = 0;
00504         PBuffer pDescStart = (PBuffer)inputTuple[inputTuple.size() - 2].pData;
00505         PBuffer pDescEnd   =
00506             (PBuffer)(inputTuple[inputTuple.size() - 2].pData +
00507                       inputTuple[inputTuple.size() - 2].cbData);
00508         rowCount =
00509             getCompressedRowCount(
00510                 pDescStart, pDescEnd,
00511                 lastSegDescByte, lastZeroRIDs);
00512     }
00513     return rowCount;
00514 }
00515 
00516 
00517 bool LbmEntry::singleton2Bitmap()
00518 {
00519     if (setRIDNewSegment(startRID)) {
00520         startRID = roundToByteBoundary(startRID);
00521         entryTuple[entryTuple.size() - 2].pData = pSegDescStart;
00522         return true;
00523     } else {
00524         return false;
00525     }
00526 }
00527 
00528 
00529 bool LbmEntry::setRID(LcsRid rid)
00530 {
00531     assert (!isSingleBitmap());
00532 
00533     /*
00534      * First prepare the current LbmEntry for insert.
00535      */
00536     if (isSingleton()) {
00537         /*
00538          * If adding RID to a singleton LbmEntry, change the singleton to
00539          * bitmap entry
00540          */
00541         if (!singleton2Bitmap()) {
00542             /*
00543              * Current LbmEntry cannot be appended to.
00544              * Current LbmEntry remains a singleton.
00545              */
00546             return false;
00547         }
00548         /*
00549          * Now the current entry is changed to a bitmap. We are ready to
00550          * insert the new rid.
00551          */
00552     } else if (!isSegmentOpen()) {
00553         /*
00554          * It's a bitmap entry, but the currSegDescByte is not set up yet.
00555          * This is the case when Splicer initializes a LbmEntry with an existing
00556          * entryTuple.
00557          * Need to search to the end of the segment descriptors to grow the
00558          * entry, if there're intervening zeros between the last RID in the
00559          * buffer and the rid which is the input.
00560          * Reserve 1 byte to add the RID in the new segment byte.
00561          */
00562         openLastSegment();
00563     }
00564 
00565     assert (isSegmentOpen());
00566 
00567     /*
00568      * Now insert the new RID.
00569      */
00570     if (!isSegmentOpen()) {
00571         /*
00572          * First time appending to this LbmEntry.
00573          */
00574         return setRIDNewSegment(rid);
00575     } else {
00576         /*
00577          * Distance in bits
00578          */
00579         uint distance = opaqueToInt(rid - currSegByteStartRID);
00580 
00581         assert(distance >= 0);
00582 
00583         if (distance < LbmOneByteSize) {
00584             /*
00585              * Easiest one. Same byte.
00586              */
00587             setRIDCurrentSegByte(rid);
00588         } else if (distance < LbmOneByteSize * 2) {
00589             /*
00590              * Adjacent byte.
00591              */
00592             return setRIDAdjacentSegByte(rid);
00593         } else {
00594             /*
00595              * Further away. Need to write the Descriptor.
00596              */
00597             if (closeCurrentSegment(rid)) {
00598                 /*
00599                  * Then figure out if there's space left to begin a new
00600                  * segment and segment descriptor
00601                  */
00602                 if (!setRIDNewSegment(rid)) {
00603                     return false;
00604                 }
00605             } else {
00606                 /*
00607                  * No room for descriptor in the same LbmEntry, tell caller to
00608                  * start a new entry.
00609                  */
00610                 return false;
00611             }
00612         }
00613     }
00614     return true;
00615 }
00616 
00617 
00618 int LbmEntry::compareEntry(
00619     TupleData const &inputTuple,
00620     TupleDescriptor const &tupleDesc) const
00621 {
00622     return tupleDesc.compareTuplesKey(
00623         entryTuple,
00624         inputTuple,
00625         entryTuple.size() - 3);
00626 }
00627 
00628 
00629 bool LbmEntry::addSegDesc(uint reservedSpace, uint bitmapLength)
00630 {
00631     uint leftOverSpace =
00632         scratchBufferUsableSize - currentEntrySize - reservedSpace;
00633 
00634     uint segDescCount = bitmapLength / LbmMaxSegSize;
00635     uint lastSegLength = bitmapLength % LbmMaxSegSize;
00636     uint addedSegDescBytes = segDescCount + (lastSegLength ? 1 : 0);
00637 
00638     if (addedSegDescBytes > leftOverSpace) {
00639         return false;
00640     } else {
00641         int i;
00642 
00643         /*
00644          * Set segment descriptor pointers.
00645          */
00646         for (i = 0; i < segDescCount; i ++) {
00647             setSegLength(*pSegDescEnd, LbmMaxSegSize);
00648             pSegDescEnd ++;
00649         }
00650 
00651         if (lastSegLength) {
00652             setSegLength(*pSegDescEnd, lastSegLength);
00653             pSegDescEnd ++;
00654         }
00655 
00656         resetSegment();
00657 
00658         // number of bytes added to this entry
00659         currentEntrySize += addedSegDescBytes;
00660 
00661         return true;
00662     }
00663 }
00664 
00665 uint LbmEntry::getMergeSpaceRequired(TupleData const &inputTuple)
00666 {
00667     uint mergeSpaceRequired = 0;
00668 
00669     /*
00670      * This seems to be a really strange compiler problem: if
00671      * "inputSegDescLength" is replaced with the following
00672      * expression, the calculation is incorrect.
00673      */
00674 
00675     uint inputSegDescLength =
00676         inputTuple[inputTuple.size() - 2].pData ?
00677         inputTuple[inputTuple.size() - 2].cbData : 0;
00678     uint inputSegLength =
00679         inputTuple[inputTuple.size() - 1].pData ?
00680         inputTuple[inputTuple.size() - 1].cbData : 0;
00681 
00682     if (isSingleton(inputTuple)) {
00683         // singleton
00684         mergeSpaceRequired = 2;
00685     } else if (isSingleBitmap(inputTuple)) {
00686         // single bitmap
00687         uint segDescCount = (inputSegLength / LbmMaxSegSize) + 1;
00688         mergeSpaceRequired = segDescCount + inputSegLength;
00689     } else {
00690         // compressed bitmap
00691         mergeSpaceRequired = inputSegDescLength + inputSegLength;
00692     }
00693 
00694     // If the currentEntry is a singleton, then add a couple of bytes to
00695     // account for the segment byte and descriptor that haven't yet been
00696     // explicitly created
00697     if (isSingleton()) {
00698         mergeSpaceRequired += 2;
00699     }
00700 
00701     return mergeSpaceRequired;
00702 }
00703 
00704 bool LbmEntry::growEntry(LcsRid rid, uint reserveSpace)
00705 {
00706     /*
00707      * See if an existing(rather than an entry which is forming) entry can be
00708      * grown to encode additional zeros until the byte that encodes rid.
00709      */
00710     int leftOverSpace =
00711         scratchBufferUsableSize - currentEntrySize - reserveSpace;
00712 
00713     if (leftOverSpace < 0) {
00714         return false;
00715     }
00716 
00717     if (isSingleton()) {
00718         if (!singleton2Bitmap()) {
00719             return false;
00720         }
00721     }
00722 
00723     if (!isSegmentOpen()) {
00724         openLastSegment();
00725     }
00726 
00727     /*
00728      * Find the last RID.
00729      */
00730     uint rowCount = getRowCount();
00731 
00732     /*
00733      * Bitmap entries encode 8 RIDs at a time. When Splicer calls setRID,
00734      * this new rid must not have been encoded by the current LbmEntry.
00735      */
00736     LcsRid endRID = startRID + rowCount - 1;
00737 
00738     if (rid >= endRID + 1) {
00739         /*
00740          * If this rid is not in the next 8 RIDs. We need to "grow" the entry by
00741          * - remembering the rid gap in the current segment descriptor.
00742          * - open a new segment for appending.
00743          */
00744         if (!closeCurrentSegment(rid)) {
00745             return false;
00746         }
00747     }
00748     /*
00749      * if (rid < endRID + 1)
00750      * is the singleton case. i.e. rid is from a singleton which chould appear
00751      * before the endRID of this entry. Do not have to grow the entry
00752      * (i.e. encode any intervening zero RIDs) in this case.
00753      */
00754 
00755     return true;
00756 }
00757 
00758 bool LbmEntry::adjustEntry(TupleData &inputTuple)
00759 {
00760     LcsRid &inputStartRID = *((LcsRid*)inputTuple[inputTuple.size() - 3].pData);
00761 
00762     /*
00763      * It is possible for both the input and the current entry to be singletons
00764      * if we're doing random mergeEntry's
00765      */
00766     if (isSingleton() && isSingleton(inputTuple)) {
00767         assert(startRID != inputStartRID);
00768         return false;
00769     }
00770 
00771     if (isSingleton(inputTuple)) {
00772         /*
00773          * The current entry must be either compressed bitmap or single
00774          * bitmap. In either case, just need to set the last byte of the
00775          * current entry.
00776          */
00777         currSegByte = pSegEnd;
00778         setRIDCurrentSegByte(inputStartRID);
00779         return true;
00780     } else if (isSingleton()) {
00781         /*
00782          * This can only happen if the singleton RID in in the same byte as the
00783          * inputStartRID. e,g:
00784          * current entry(singleton): startRID == endRID == 2
00785          * input entry(non-singleton) inputStartRID == 0, and
00786          *                            first byte is 0 0 1 1 1 0 0 0
00787          * See test case LER-422 in lbm.sql.
00788          */
00789         assert ((opaqueToInt(startRID) - opaqueToInt(inputStartRID))
00790             < LbmOneByteSize);
00791 
00792         // use the input as the current and set the bit at inputStartRID.
00793         LcsRid oldStartRid = startRID;
00794         setEntryTuple(inputTuple);
00795 
00796         // remember to set the bit corresponding original startRID
00797         setRIDSegByte(pSegStart - 1, oldStartRid);
00798 
00799         // the two entries are already merged
00800         return true;
00801     } else {
00802         currSegByte = pSegEnd;
00803         uint8_t inputFirstSegByte =
00804             *(uint8_t *)(inputTuple[inputTuple.size() - 1].pData +
00805                 inputTuple[inputTuple.size() - 1].cbData - 1);
00806         *currSegByte |= inputFirstSegByte;
00807 
00808         /*
00809          * Everything in the current entry remains the same, since only the
00810          * last byte is modified. Need to update the fields in inputTuple
00811          * to reflect that the current entry no longer contains the first
00812          * byte; unless the inputTuple only contains a single byte, in which
00813          * case, there's nothing left in the tuple to merge.
00814          */
00815         if (inputTuple[inputTuple.size() -1].cbData == 1) {
00816             assert(inputTuple[inputTuple.size() - 2].pData == NULL);
00817             return true;
00818         }
00819 
00820         /*
00821          * First,
00822          * modify the segment field. It loses the first byte which is
00823          * located at the end of the segment storage portion. The length
00824          * of segment storage is reduced by one, but the pointer to the
00825          * (end of the) segment storage portion does not change.
00826          */
00827         inputTuple[inputTuple.size() -1].cbData --;
00828 
00829         /*
00830          * Then,
00831          * modify the segment descriptor for the first segment.
00832          * The descriptor might not be needed if the first segment only has
00833          * one byte.
00834          * Also, don't need to modify descriptor if there's none(the single
00835          * bitmap case).
00836          */
00837         PBuffer pFirstDesc = (PBuffer)inputTuple[inputTuple.size() - 2].pData;
00838         /*
00839          * Moved a byte worth(=8) of RIDs from the input entry(into
00840          * the current entry).
00841          */
00842         uint numRIDsMoved = LbmOneByteSize;
00843 
00844         if (pFirstDesc) {
00845             uint8_t firstSegLength  = ((*pFirstDesc) >> LbmHalfByteSize) + 1;
00846 
00847             if (firstSegLength >= 2) {
00848                 /*
00849                  * Still need the first segment desscriptor.
00850                  */
00851                 firstSegLength --;
00852                 /*
00853                  * Clear out the segment length in the descriptor.
00854                  */
00855                 *pFirstDesc &= LbmZeroLengthMask;
00856                 /*
00857                  * Set the new segment length in the descriptor.
00858                  */
00859                 *pFirstDesc |= (firstSegLength -1) << LbmHalfByteSize;
00860             } else {
00861                 /*
00862                  * No need for the first descriptor now.
00863                  * Figure out the number of RIDs removed from the input
00864                  * entry. Move the descriptor pointer accordingly.
00865                  */
00866                 uint firstZeroLength = (*pFirstDesc) & LbmZeroLengthMask;
00867                 pFirstDesc ++;
00868 
00869                 inputTuple[inputTuple.size() - 2].cbData --;
00870 
00871                 if (firstZeroLength > LbmZeroLengthCompact) {
00872                     uint firstZeroLengthBytes =
00873                         firstZeroLength - LbmZeroLengthCompact;
00874                     /*
00875                      * Translate number of bytes into number of RIDs(bits).
00876                      */
00877                     firstZeroLength =
00878                         byteArray2Value(pFirstDesc, firstZeroLengthBytes);
00879                     /*
00880                      * Number of bytes used to store length of zeros.
00881                      */
00882                     pFirstDesc += firstZeroLengthBytes;
00883                     inputTuple[inputTuple.size() - 2].cbData -=
00884                         firstZeroLengthBytes;
00885                 }
00886                 inputTuple[inputTuple.size() - 2].pData = pFirstDesc;
00887                 numRIDsMoved += firstZeroLength * LbmOneByteSize;
00888             }
00889         }
00890         /*
00891          * Adjust the startRID in the input tuple.
00892          */
00893         inputStartRID += numRIDsMoved;
00894     }
00895     return false;
00896 }
00897 
00898 bool LbmEntry::mergeEntry(TupleData &inputTuple)
00899 {
00900     assert (!isSingleBitmap());
00901 
00902     /*
00903      * MergeEntry needs to first handle the case where there are overlapping
00904      * rid ranges. Overlap could happen when the last byte encoded by the
00905      * first entry is also encoded by the first byte in the next entry.
00906      * So the first thing mergeEntry does is to adjust the current entry to
00907      * include the first byte of the input entry. After that, if there is still
00908      * overlap, then we are trying to merge a singleton within the current
00909      * rid range, so we need to handle that case.
00910      * Otherwise, the current entry and the input entry do not overlap, and
00911      * the regular merging logic takes place.
00912      */
00913 
00914     /*
00915      * Find the last RID of this entry.
00916      */
00917     uint rowCount = getRowCount();
00918 
00919     /*
00920      * Bitmap entries encode 8 RIDs at a time. When Splicer calls setRID,
00921      * this new rid might have been encoded by the last segment byte of the
00922      * current LbmEntry.
00923      */
00924     LcsRid endRID = startRID + rowCount - 1;
00925     LcsRid &inputStartRID = *((LcsRid*)inputTuple[inputTuple.size() - 3].pData);
00926 
00927     /*
00928      * First, if there is overlap in the last byte, move the first byte of the
00929      * input entry to the last byte of this entry. Modify inputTuple,
00930      * inputStartRID.
00931      */
00932     if (inputStartRID <= endRID &&
00933         inputStartRID >= roundToByteBoundary(endRID))
00934     {
00935         if (adjustEntry(inputTuple)) {
00936             validateEntrySize();
00937             return true;
00938         }
00939     }
00940 
00941     /*
00942      * After adjustEntry(), if there is still overlap between the current
00943      * entry and the (newly modified) input entry tuple, then we are trying
00944      * to merge in a singleton.
00945      */
00946     if (inputStartRID <= endRID) {
00947         permAssert(isSingleton(inputTuple));
00948         bool rc = spliceSingleton(inputTuple);
00949         validateEntrySize();
00950         return rc;
00951     }
00952 
00953     uint mergeSpaceRequired = getMergeSpaceRequired(inputTuple);
00954 
00955     if (!growEntry(inputStartRID, mergeSpaceRequired)) {
00956         /*
00957          * If either the combined entry is bigger than maximum entry size,
00958          * or if the distance between the last rid of current entry and the
00959          * start rid of the input entry can not be encoded by
00960          * LbmZeroLengthExtended bytes, tell caller to start a new entry.
00961          */
00962         return false;
00963     }
00964 
00965     /*
00966      * If the new inputTuple is a singleton, use the setRID interface.
00967      */
00968     if (isSingleton(inputTuple)) {
00969         bool rc = setRID(inputStartRID);
00970         validateEntrySize();
00971         return rc;
00972     }
00973 
00974     /*
00975      * Now merge the two pieces of bitmaps together, provided the resulting
00976      * bitmap doesn't exceed the segment field size.
00977      */
00978     uint inputSegDescLength =
00979         inputTuple[inputTuple.size() - 2].pData ?
00980         inputTuple[inputTuple.size() - 2].cbData : 0;
00981     uint inputSegLength =
00982         inputTuple[inputTuple.size() - 1].pData ?
00983         inputTuple[inputTuple.size() - 1].cbData : 0;
00984     if (pSegStart - pSegEnd + inputSegLength > bitmapSegSize) {
00985         return false;
00986     }
00987 
00988     if (!isSingleBitmap(inputTuple)) {
00989         /*
00990          * Copy the segment descriptors only if the tuples are not single
00991          * bitmaps. Single bitmaps do not have segment descriptors.
00992          */
00993         memcpy(
00994             pSegDescEnd,
00995             inputTuple[inputTuple.size() - 2].pData,
00996             inputSegDescLength);
00997         pSegDescEnd += inputSegDescLength;
00998         currentEntrySize += inputSegDescLength;
00999     } else {
01000         // Need to add descriptor for inputSegLength and then copy
01001         // inputSegLength worth of  bitmap segment.
01002         uint reservedSpace = inputSegLength;
01003         if (!addSegDesc(reservedSpace, inputSegLength)) {
01004             return false;
01005         }
01006     }
01007 
01008     /*
01009      * segment grows backwards.
01010      */
01011     pSegEnd -= inputSegLength;
01012     memcpy(
01013         pSegEnd,
01014         inputTuple[inputTuple.size() - 1].pData,
01015         inputSegLength);
01016     currentEntrySize += inputSegLength;
01017     validateEntrySize();
01018     return true;
01019 }
01020 
01021 bool LbmEntry::spliceSingleton(TupleData &inputTuple)
01022 {
01023     assert(!isSingleBitmap());
01024 
01025     int ridField = inputTuple.size() - 3;
01026     LcsRid &inputStartRID = *((LcsRid*) inputTuple[ridField].pData);
01027 
01028     // special case where the current entry is the minimum entry and we are
01029     // trying to splice a rid in front of it
01030     if (inputStartRID < startRID) {
01031         // reverse the roles of current and input and merge the original
01032         // current into the original input; first copy the current to the
01033         // temporary merge buffer
01034         if (isSegmentOpen()) {
01035             closeCurrentSegment();
01036         }
01037         TupleData origCurrEntry;
01038         copyToMergeBuffer(origCurrEntry, startRID, pSegStart, pSegDescStart);
01039 
01040         setEntryTuple(inputTuple);
01041         bool rc = mergeEntry(origCurrEntry);
01042         // if we weren't able to merge the input into the current entry,
01043         // the new entry needs to be produced and the current entry becomes
01044         // the input entry
01045         if (rc == false) {
01046             for (int i = 0; i < origCurrEntry.size(); i++) {
01047                 inputTuple[i] = origCurrEntry[i];
01048             }
01049         }
01050         return rc;
01051 
01052     } else {
01053         // loop through each segment and determine if there already is a
01054         // rid range containing the input singleton
01055         PBuffer segDesc = pSegDescStart;
01056         PBuffer seg = pSegStart;
01057         LcsRid srid = startRID;
01058         while (segDesc < pSegDescEnd) {
01059             uint segBytes;
01060             uint zeroBytes;
01061             PBuffer prevSegDesc = segDesc;
01062             readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01063 
01064             // input rid is within the range of an existing set of rids
01065             if (inputStartRID >= srid &&
01066                 inputStartRID < srid + (segBytes * LbmOneByteSize))
01067             {
01068                 setRIDSegByte(
01069                     seg - 1 -
01070                         opaqueToInt(inputStartRID - srid) / LbmOneByteSize,
01071                     inputStartRID);
01072                 return true;
01073             }
01074             LcsRid nextSrid = srid + (zeroBytes + segBytes) * LbmOneByteSize;
01075             // input rid is within the range of trailing zeros
01076             if (inputStartRID < nextSrid) {
01077                 return addNewSegment(
01078                     inputTuple, srid, prevSegDesc, segDesc, seg, segBytes,
01079                     zeroBytes);
01080             }
01081             seg -= segBytes;
01082             srid = nextSrid;
01083         }
01084         // should never go past the end of the entry, as we would not have
01085         // entered this method otherwise
01086         permAssert(false);
01087         return true;
01088     }
01089 }
01090 
01091 void LbmEntry::copyToMergeBuffer(
01092     TupleData &newEntry, LcsRid newStartRID, PBuffer segStart,
01093     PBuffer segDescStart)
01094 {
01095     assert(mergeScratchBuffer != NULL);
01096     PBuffer pTempBuff = mergeScratchBuffer;
01097     memcpy(pTempBuff, scratchBuffer, keySize);
01098 
01099     newEntry.resize(entryTuple.size());
01100     for (int i = 0; i < entryTuple.size() - 3; i++) {
01101         newEntry[i] = entryTuple[i];
01102         newEntry[i].pData = pTempBuff;
01103         newEntry[i].cbData = entryTuple[i].cbData;
01104         pTempBuff += entryTuple[i].cbData;
01105     }
01106     memcpy(pTempBuff, &newStartRID, sizeof(LcsRid));
01107     uint ridField = entryTuple.size() - 3;
01108     newEntry[ridField].pData = pTempBuff;
01109     newEntry[ridField].cbData = sizeof(LcsRid);
01110 
01111     newEntry[ridField + 1].cbData = pSegDescEnd - segDescStart;
01112     if (newEntry[ridField + 1].cbData == 0) {
01113         newEntry[ridField + 1].pData = NULL;
01114     } else {
01115         newEntry[ridField + 1].pData = mergeScratchBuffer + keySize;
01116         memcpy(
01117             (void *) newEntry[ridField + 1].pData, segDescStart,
01118             pSegDescEnd - segDescStart);
01119     }
01120 
01121     uint segLen = segStart - pSegEnd;
01122     newEntry[ridField + 2].cbData = segLen;
01123     if (segLen == 0) {
01124         newEntry[ridField + 2].pData = NULL;
01125     } else {
01126         newEntry[ridField + 2].pData =
01127             mergeScratchBuffer + scratchBufferSize - segLen;
01128         memcpy(
01129             (void *) newEntry[ridField + 2].pData, segStart - segLen, segLen);
01130     }
01131 }
01132 
01133 bool LbmEntry::addNewSegment(
01134     TupleData &inputTuple, LcsRid prevSrid, PBuffer prevSegDesc,
01135     PBuffer nextSegDesc, PBuffer prevSeg, uint prevSegBytes, uint prevZeroBytes)
01136 {
01137     assert(isSingleton(inputTuple));
01138 
01139     // close the current segment before we create the new segment so we start
01140     // off from a clean state
01141     if (isSegmentOpen()) {
01142         closeCurrentSegment();
01143     }
01144 
01145     LcsRid inputStartRID =
01146         *((LcsRid*) inputTuple[inputTuple.size() - 3].pData);
01147     LcsRid prevEridPlus1 = prevSrid + (prevSegBytes * LbmOneByteSize);
01148     LcsRid nextSrid =
01149         prevSrid + (prevSegBytes + prevZeroBytes) * LbmOneByteSize;
01150 
01151     // new byte is either at the very end of the previous segment or the
01152     // very start of the next segment
01153     if ((inputStartRID >= prevEridPlus1 &&
01154             inputStartRID < prevEridPlus1 + LbmOneByteSize) ||
01155         (inputStartRID >= nextSrid - LbmOneByteSize &&
01156             inputStartRID < nextSrid))
01157     {
01158         return addNewAdjacentSegment(
01159             inputTuple, prevSrid, prevSegDesc, nextSegDesc, prevSeg,
01160             prevSegBytes, prevZeroBytes);
01161     }
01162 
01163     return addNewMiddleSegment(
01164         inputTuple, prevSrid, prevSegDesc, nextSegDesc, prevSeg,
01165         prevSegBytes, prevZeroBytes);
01166 }
01167 
01168 bool LbmEntry::addNewMiddleSegment(
01169     TupleData &inputTuple, LcsRid prevSrid, PBuffer prevSegDesc,
01170     PBuffer nextSegDesc, PBuffer prevSeg, uint prevSegBytes, uint prevZeroBytes)
01171 {
01172     assert(isSingleton(inputTuple));
01173 
01174     // new byte is in the middle of the zero bytes in between two segments;
01175     // compute the space required to add the new segment (both byte and
01176     // descriptor) and to split the zero length bytes into 2 segments
01177     LcsRid inputStartRID =
01178         *((LcsRid*) inputTuple[inputTuple.size() - 3].pData);
01179     LcsRid prevEridPlus1 = prevSrid + (prevSegBytes * LbmOneByteSize);
01180     uint leftZeroBytes = opaqueToInt(inputStartRID - prevEridPlus1) /
01181         LbmOneByteSize;
01182     uint rightZeroBytes = prevZeroBytes - leftZeroBytes - 1;
01183     uint spaceRequired = 2;
01184     int spaceNeededBefore = computeSpaceForZeroBytes(prevZeroBytes);
01185     int spaceNeededAfter = computeSpaceForZeroBytes(leftZeroBytes) +
01186         computeSpaceForZeroBytes(rightZeroBytes);
01187     spaceRequired += (spaceNeededAfter - spaceNeededBefore);
01188     assert(spaceRequired >= 1);
01189     if (spaceRequired + currentEntrySize > scratchBufferUsableSize) {
01190         splitEntry(inputTuple);
01191         return false;
01192     }
01193 
01194     // compute the length of the remaining segments before we modify
01195     // the descriptors
01196     uint remainingSegLen = computeSegLength(nextSegDesc);
01197 
01198     // set the zero byte length for the previous segment
01199     uint leftZeroLength;
01200     setZeroLength(leftZeroBytes, prevSegDesc, leftZeroLength);
01201 
01202     // compute the length of the remaining segment desciptors
01203     // so we can shift them to the right to make room for the new descriptor
01204     uint shiftAmount = spaceNeededAfter + 1 - spaceNeededBefore;
01205     if (shiftAmount > 0) {
01206         uint remainingSegDescLen = computeSegDescLength(nextSegDesc);
01207         PBuffer segDesc = prevSegDesc + spaceNeededBefore + 1;
01208         for (int i = remainingSegDescLen - 1; i >= 0; i--) {
01209             segDesc[i + shiftAmount] = segDesc[i];
01210         }
01211     }
01212 
01213     // add the new segment descriptor
01214     setSegLength(*(prevSegDesc + 1 + leftZeroLength), 1);
01215     uint size;
01216     setZeroLength(rightZeroBytes, prevSegDesc + 1 + leftZeroLength, size);
01217     pSegDescEnd += shiftAmount;
01218 
01219     addNewRid(
01220         nextSegDesc, prevSeg - prevSegBytes - 1, inputStartRID,
01221         remainingSegLen);
01222 
01223     currentEntrySize += spaceRequired;
01224     return true;
01225 }
01226 
01227 bool LbmEntry::addNewAdjacentSegment(
01228     TupleData &inputTuple, LcsRid prevSrid, PBuffer prevSegDesc,
01229     PBuffer nextSegDesc, PBuffer prevSeg, uint prevSegBytes, uint prevZeroBytes)
01230 {
01231     assert(isSingleton(inputTuple));
01232 
01233     LcsRid inputStartRID =
01234         *((LcsRid*) inputTuple[inputTuple.size() - 3].pData);
01235     LcsRid prevEridPlus1 = prevSrid + (prevSegBytes * LbmOneByteSize);
01236     LcsRid nextSrid =
01237         prevSrid + (prevSegBytes + prevZeroBytes) * LbmOneByteSize;
01238 
01239     // see if we can fit the new byte within the current entry; we'll
01240     // need 1 byte for the new segment, but the zero bytes in the
01241     // previous segment decreases by 1 and we may be able to combine
01242     // segments if only a single zero byte separates the two segments we
01243     // are inserting the new segment in between, and the combined length
01244     // of the two segments doesn't exceed the maximum
01245     uint spaceRequired = 1;
01246     int spaceNeededBefore = computeSpaceForZeroBytes(prevZeroBytes);
01247     int spaceNeededAfter = computeSpaceForZeroBytes(prevZeroBytes - 1);
01248     assert(spaceNeededAfter <= spaceNeededBefore);
01249     spaceRequired += (spaceNeededAfter - spaceNeededBefore);
01250     assert(spaceRequired >= 0);
01251 
01252     bool combine = false;
01253     if (nextSrid == prevEridPlus1 + LbmOneByteSize) {
01254         combine = true;
01255         spaceRequired -= 1;
01256     }
01257 
01258     if (spaceRequired + currentEntrySize > scratchBufferUsableSize) {
01259         splitEntry(inputTuple);
01260         return false;
01261     }
01262 
01263     // compute the length of the remaining segments before we modify
01264     // the descriptors
01265     uint remainingSegLen = computeSegLength(nextSegDesc);
01266 
01267     // adjust the segment length of either the previous segment or the
01268     // next segment, depending on where the new segment is placed
01269     bool rc;
01270     if (inputStartRID >= prevEridPlus1 &&
01271         inputStartRID < prevEridPlus1 + LbmOneByteSize)
01272     {
01273         uint segLen = prevSegBytes + 1;
01274         if (combine) {
01275             segLen += getSegLength(*nextSegDesc);
01276         }
01277         rc = adjustSegLength(*prevSegDesc, segLen);
01278     } else {
01279         rc = adjustSegLength(*nextSegDesc, getSegLength(*nextSegDesc) + 1);
01280     }
01281 
01282     // if adding a new adjacent segment exceeds the max segment size,
01283     // then we have to create a new segment
01284     if (!rc) {
01285         return addNewMiddleSegment(
01286             inputTuple, prevSrid, prevSegDesc, nextSegDesc, prevSeg,
01287             prevSegBytes, prevZeroBytes);
01288     }
01289 
01290     // if the segments are being combined, set the zero length to that of
01291     // the next segment and remove the first byte of the next segment
01292     // descriptor, shifting the rest of the descriptors to the left by 1
01293     if (combine) {
01294         uint remainingSegDescLen = computeSegDescLength(nextSegDesc);
01295         uint zeroLen = *nextSegDesc & LbmZeroLengthMask;
01296         *prevSegDesc &= ~LbmZeroLengthMask;
01297         *prevSegDesc |= zeroLen;
01298         PBuffer segDesc = nextSegDesc;
01299         for (int i = 0; i < remainingSegDescLen - 1; i++) {
01300             segDesc[i] = segDesc[i + 1];
01301         }
01302         pSegDescEnd--;
01303 
01304     // otherwise, decrease the zerobyte count by 1
01305     } else {
01306         uint size;
01307         setZeroLength(prevZeroBytes - 1, prevSegDesc, size);
01308 
01309         if (spaceNeededBefore > spaceNeededAfter) {
01310             // compute the length of the remaining segment desciptors
01311             // so we can shift them to the left
01312             uint shiftAmount = spaceNeededBefore - spaceNeededAfter;
01313             uint remainingSegDescLen = computeSegDescLength(nextSegDesc);
01314             PBuffer segDesc = prevSegDesc + spaceNeededAfter + 1;
01315             for (int i = 0; i < remainingSegDescLen; i++) {
01316                 segDesc[i] = segDesc[i + shiftAmount];
01317             }
01318             pSegDescEnd -= shiftAmount;
01319         }
01320     }
01321 
01322     // add the new byte segment; do this after fixing up the segment
01323     // descriptor, in the event that we are able to free up some space
01324     // from there
01325     addNewRid(
01326         nextSegDesc, prevSeg - prevSegBytes - 1, inputStartRID,
01327         remainingSegLen);
01328 
01329     currentEntrySize += spaceRequired;
01330     return true;
01331 }
01332 
01333 void LbmEntry::addNewRid(
01334     PBuffer nextSegDesc, PBuffer newSeg, LcsRid newRid, uint remainingSegLen)
01335 {
01336     // This method should only be called when splicing entries that consist
01337     // of only the rid key
01338     assert(keyDesc.size() == 1);
01339     // shift the byte segments to the left by 1
01340     PBuffer seg = pSegEnd;
01341     for (int i = 0; i < remainingSegLen; i++) {
01342         seg[i - 1] = seg[i];
01343     }
01344     *newSeg = (uint8_t) (1 << (opaqueToInt(newRid) % LbmOneByteSize));
01345     pSegEnd--;
01346 }
01347 
01348 void LbmEntry::splitEntry(TupleData &inputTuple)
01349 {
01350     assert(isSingleton(inputTuple));
01351 
01352     // we should never try to split a single segment because the only
01353     // reason we would need to split would be if there are trailing zeros in
01354     // between segments that we are trying to replace; trailing zeros at the
01355     // very end of an entry are handled by the regular mergeEntry()
01356     assert(countSegments() > 1);
01357 
01358     // split the current entry in half based on the current entry size,
01359     // excluding the keysize
01360     uint targetSplitSize = (currentEntrySize - keySize) / 2;
01361 
01362     // divide up the segments until we reach the target splitSize
01363     PBuffer segDesc = pSegDescStart;
01364     PBuffer prevPrevSegDesc = NULL;
01365     PBuffer prevSegDesc = NULL;
01366     PBuffer seg = pSegStart;
01367     LcsRid srid = startRID;
01368     LcsRid prevSrid = srid;
01369     uint segBytes;
01370     uint zeroBytes;
01371     while (segDesc < pSegDescEnd) {
01372         prevPrevSegDesc = prevSegDesc;
01373         prevSegDesc = segDesc;
01374         readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01375         seg -= segBytes;
01376         prevSrid = srid;
01377         srid += (segBytes + zeroBytes) * LbmOneByteSize;
01378         if ((segDesc - pSegDescStart) + (pSegStart - seg) >= targetSplitSize) {
01379             break;
01380         }
01381     }
01382 
01383     // if we effectively haven't split anything, bump the cutoff point back
01384     // by one so the split entry has at least 1 segment; also bump back the
01385     // cutoff point if the new rid will be inserted before the new last
01386     // segment in the current entry; this way, we minimize the size of the
01387     // entry that the new rid will be inserted into
01388     LcsRid &inputStartRID = *((LcsRid*)inputTuple[inputTuple.size() - 3].pData);
01389     if (segDesc >= pSegDescEnd || inputStartRID < prevSrid) {
01390         segDesc = prevSegDesc;
01391         permAssert(prevPrevSegDesc != NULL);
01392         prevSegDesc = prevPrevSegDesc;
01393         seg += segBytes;
01394         srid = prevSrid;
01395     }
01396 
01397     // copy the segments and descriptors (starting at the one that follows
01398     // the one where the split was made) into the secondary scratch buffer
01399     TupleData newEntry;
01400     copyToMergeBuffer(newEntry, srid, seg, segDesc);
01401 
01402     // clear out the zeroBytes in the last segment descriptor for the current
01403     // entry, since it's no longer needed
01404     segDesc -= getZeroLengthByteCount(*prevSegDesc);
01405     *(prevSegDesc) &= ~LbmZeroLengthMask;
01406 
01407     // adjust the current entry to reflect the segments that have been
01408     // removed
01409     pSegDescEnd = segDesc;
01410     pSegEnd = seg;
01411     currentEntrySize =
01412         keySize + (pSegDescEnd - pSegDescStart) + (pSegStart - pSegEnd);
01413 
01414     // if the input rid is in the new current entry, merge it in and then
01415     // move the newly split off entry into inputTuple
01416     if (inputStartRID < srid) {
01417         bool rc = mergeEntry(inputTuple);
01418         // there has to be enough space to merge in the input singleton since
01419         // we've done a split to free up space
01420         permAssert(rc);
01421         for (int i = 0; i < newEntry.size(); i++) {
01422             inputTuple[i] = newEntry[i];
01423         }
01424         return;
01425     }
01426 
01427     // the input is in the split off entry
01428     mergeIntoSplitEntry(inputTuple, newEntry);
01429 }
01430 
01431 void LbmEntry::mergeIntoSplitEntry(
01432     TupleData &inputTuple, TupleData &splitEntry)
01433 {
01434     assert(isSingleton(inputTuple));
01435 
01436     // temporarily save away the current entry
01437     boost::scoped_array<FixedBuffer> tempBuffer;
01438     tempBuffer.reset(new FixedBuffer[scratchBufferSize]);
01439 #ifdef DEBUG
01440     memset(tempBuffer.get(), 0xFF, scratchBufferSize);
01441 #endif
01442     memcpy(tempBuffer.get(), scratchBuffer, scratchBufferSize);
01443     LcsRid savStartRID = startRID;
01444     PBuffer savSegStart = pSegStart;
01445     PBuffer savSegEnd = pSegEnd;
01446     PBuffer savSegDescStart = pSegDescStart;
01447     PBuffer savSegDescEnd = pSegDescEnd;
01448     uint savCurrentEntrySize = currentEntrySize;
01449 
01450     // move the split entry into the current entry
01451     uint ridField = splitEntry.size() - 3;
01452     memcpy(scratchBuffer, mergeScratchBuffer, scratchBufferSize);
01453     startRID = *((LcsRid*) splitEntry[ridField].pData);
01454     LcsRid splitStartRid = startRID;
01455     pSegDescStart = scratchBuffer + keySize;
01456     pSegDescEnd = pSegDescStart + splitEntry[ridField + 1].cbData;
01457     pSegStart = scratchBuffer + scratchBufferSize;
01458     pSegEnd = pSegStart - splitEntry[ridField + 2].cbData;
01459     currentEntrySize =
01460         keySize + splitEntry[ridField + 1].cbData +
01461             splitEntry[ridField + 2].cbData;
01462 
01463     // splice the input into the split entry that now occupies the current
01464     // entry
01465     bool rc = mergeEntry(inputTuple);
01466     permAssert(rc);
01467 
01468     // copy the split entry into inputTuple
01469     copyToMergeBuffer(inputTuple, splitStartRid, pSegStart, pSegDescStart);
01470 
01471     // copy the saved away current entry back into its place
01472     memcpy(scratchBuffer, tempBuffer.get(), scratchBufferSize);
01473     startRID = savStartRID;
01474     pSegStart = savSegStart;
01475     pSegEnd = savSegEnd;
01476     pSegDescStart = savSegDescStart;
01477     pSegDescEnd = savSegDescEnd;
01478     currentEntrySize = savCurrentEntrySize;
01479 }
01480 
01481 TupleData const &LbmEntry::produceEntryTuple()
01482 {
01483     /*
01484      * If singleton, just return the tuple.
01485      */
01486     if (isSingleton()) {
01487         return entryTuple;
01488     }
01489 
01490     /*
01491      * Set up all the data pointers in entryTuple.
01492      * Make sure that closeCurrentSegment is called for this entry.
01493      */
01494     if (isSegmentOpen()) {
01495         closeCurrentSegment();
01496     }
01497 
01498     /*
01499      * RID field
01500      */
01501     uint RIDField = entryTuple.size() - 3;
01502     *((LcsRid *)entryTuple[RIDField].pData) = startRID;
01503 
01504     /*
01505      * The last field is the segment field.
01506      */
01507     uint segmentField = entryTuple.size() - 1;
01508     entryTuple[segmentField].cbData = pSegStart - pSegEnd;
01509     entryTuple[segmentField].pData = pSegEnd;
01510 
01511     /*
01512      * Next to the last is segment descriptor field.
01513      */
01514     uint segmentDescField = entryTuple.size() - 2;
01515 
01516     /*
01517      * Find out if the segment descriptors are necessary.
01518      * If all segments are contiguous, they can be stored at one big bitmap
01519      * segment without any descriptors.
01520      */
01521 
01522     if ((pSegDescStart == pSegDescEnd) || (pSegDescStart == NULL)) {
01523         /*
01524          * There is no segment descriptor, which means the entry must have a
01525          * single bitmap.
01526          */
01527         entryTuple[segmentDescField].cbData = 0;
01528         entryTuple[segmentDescField].pData  = NULL;
01529     } else {
01530         /*
01531          * There are segment descriptors. Check if these descriptors can be
01532          * removed (when all the segments are contiguous and there's no
01533          * possibility that the max segment size is exceeded).
01534          */
01535         uint rowCount = getRowCount();
01536 
01537         if (rowCount == entryTuple[segmentField].cbData * 8 &&
01538             rowCount <= maxSegSize * 8)
01539         {
01540             /*
01541              * All the RIDs are in the bitmap segments. There is no "gap"
01542              * between segments. Do not need to store the segment descriptors
01543              * since there is only one big bitmap segment inthis entry.
01544              *
01545              * The last entry in the stream must remain compressed, otherwise
01546              * future append might fail. Passing lastRID < 0 bypasses this
01547              * check.
01548              */
01549             entryTuple[segmentDescField].cbData = 0;
01550             entryTuple[segmentDescField].pData  = NULL;
01551         } else {
01552             // zero out the trailing zero length in the last segment descriptor
01553             // since it should be zero
01554             openLastSegment();
01555             closeCurrentSegment();
01556             entryTuple[segmentDescField].cbData = pSegDescEnd - pSegDescStart;
01557             entryTuple[segmentDescField].pData  = pSegDescStart;
01558         }
01559     }
01560 
01561     return entryTuple;
01562 }
01563 
01564 string LbmEntry::printDatum(
01565     TupleDatum const &tupleDatum,
01566     bool reverseByteOrder)
01567 {
01568     ostringstream byteStr;
01569     PConstBuffer ptr = tupleDatum.pData;
01570     uint size = tupleDatum.cbData;
01571 
01572     if (ptr) {
01573         if (reverseByteOrder) {
01574             PConstBuffer tmpPtr = ptr + size;
01575             while (size > 0) {
01576                 tmpPtr --;
01577                 byteStr << hex << setw(2) << setfill('0')  << (uint)(*tmpPtr);
01578                 size --;
01579             }
01580         } else {
01581             while (size > 0) {
01582                 byteStr << hex << setw(2) << setfill('0')  << (uint)(*ptr);
01583                 ptr ++;
01584                 size --;
01585             }
01586         }
01587     }
01588 
01589     return byteStr.str();
01590 }
01591 
01592 string LbmEntry::printByte(uint8_t byte)
01593 {
01594     ostringstream byteStr;
01595     uint byteLength = 8;
01596 
01597     while (byteLength > 0) {
01598         byteStr << byte % 2 << " ";
01599         byte = byte >> 1;
01600         byteLength --;
01601     }
01602     return byteStr.str();
01603 }
01604 
01605 string LbmEntry::dumpSeg(PBuffer segDesc, PBuffer segDescEnd, PBuffer seg)
01606 {
01607     uint segBytes;
01608     ostringstream entryLine;
01609     uint zeroBytes;
01610     uint startPos = 0;
01611     uint i;
01612 
01613     while (segDesc < segDescEnd) {
01614         /*
01615          * Print the bitmaps.
01616          */
01617         readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01618 
01619         /*
01620          * Print the bitmap segment.
01621          */
01622         for (i = 0; i < segBytes + startPos; i++) {
01623             seg --;
01624             entryLine << printByte((uint8_t)(*seg));
01625             if (i % 5 == 4) {
01626                 entryLine << "\n";
01627             } else {
01628                 entryLine << " ";
01629             }
01630         }
01631 
01632         /*
01633          * Print the trailing zeros.
01634          */
01635         for (; i < zeroBytes + segBytes + startPos; i++) {
01636             entryLine << "0 0 0 0 0 0 0 0 ";
01637             if (i % 5 == 4) {
01638                 entryLine << "\n";
01639             } else {
01640                 entryLine << " ";
01641             }
01642         }
01643     }
01644     return entryLine.str();
01645 }
01646 
01647 string LbmEntry::dumpSegRID(
01648     PBuffer segDesc,
01649     PBuffer segDescEnd,
01650     PBuffer seg,
01651     string prefix,
01652     LcsRid srid)
01653 {
01654     uint segBytes;
01655     uint zeroBytes;
01656     ostringstream entryTrace;
01657     uint8_t bitmapByte;
01658     uint byteLength;
01659     uint i;
01660 
01661     while (segDesc < segDescEnd) {
01662         /*
01663          * Print the bitmaps.
01664          */
01665         readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01666 
01667         for (i = 0; i < segBytes; i++) {
01668             seg --;
01669             bitmapByte = *(uint8_t *)seg;
01670             for (byteLength = 8; byteLength > 0; byteLength--) {
01671                 if (bitmapByte % 2) {
01672                     entryTrace << prefix << opaqueToInt(srid) << "]\n";
01673                 }
01674                 bitmapByte = bitmapByte >> 1;
01675                 srid ++;
01676             }
01677         }
01678 
01679         /*
01680          * Skip the zeros, but need to update srid.
01681          */
01682         srid += zeroBytes * LbmOneByteSize;
01683     }
01684     return entryTrace.str();
01685 }
01686 
01687 string LbmEntry::dumpBitmap(PBuffer seg, uint segBytes)
01688 {
01689     ostringstream entryLine;
01690     /*
01691      * Print the bitmap.
01692      */
01693     for (uint i = 0; i < segBytes; i++) {
01694         seg --;
01695         entryLine << printByte((uint8_t)(*seg));
01696         if (i % 5 == 4) {
01697             entryLine << "\n";
01698         } else {
01699             entryLine << " ";
01700         }
01701     }
01702     entryLine << "\n";
01703     return entryLine.str();
01704 }
01705 
01706 string LbmEntry::dumpBitmapRID(
01707     PBuffer seg,
01708     uint segBytes,
01709     string prefix,
01710     LcsRid srid)
01711 {
01712     ostringstream entryTrace;
01713     uint byteLength;
01714     uint8_t bitmapByte;
01715 
01716     /*
01717      * Print all the RIDs in the bitmap, formatted as:
01718      * Key [ .... ] RID [...]
01719      */
01720     for (uint i = 0; i < segBytes; i++) {
01721         seg --;
01722         bitmapByte = *(uint8_t *)seg;
01723         for (byteLength = 8; byteLength > 0; byteLength--) {
01724             if (bitmapByte % 2) {
01725                 entryTrace << prefix << opaqueToInt(srid) << "]\n";
01726             }
01727             bitmapByte = bitmapByte >> 1;
01728             srid ++;
01729         }
01730     }
01731     return entryTrace.str();
01732 }
01733 
01734 string LbmEntry::toString(TupleData const&inputTuple, bool printRID)
01735 {
01736     if (printRID) {
01737         return toRIDString(inputTuple);
01738     } else {
01739         return toBitmapString(inputTuple);
01740     }
01741 }
01742 
01743 string LbmEntry::toBitmapString(TupleData const&inputTuple)
01744 {
01745     ostringstream tupleTrace;
01746     uint tupleSize = inputTuple.size();
01747     LcsRid inputStartRID = *((LcsRid *)inputTuple[tupleSize - 3].pData);
01748 
01749     tupleTrace << "Key [";
01750 
01751     for (uint i = 0; i < tupleSize - 3 ; i ++) {
01752         tupleTrace << printDatum(inputTuple[i], true);
01753         if (i < tupleSize - 4) {
01754             tupleTrace << "|";
01755         }
01756     }
01757 
01758     tupleTrace << "] startRID ["
01759                << opaqueToInt(inputStartRID)
01760                << "] ";
01761 
01762     if (isSingleton(inputTuple)) {
01763         tupleTrace <<"Singleton";
01764     } else {
01765         PBuffer segDesc = (PBuffer)inputTuple[tupleSize - 2].pData;
01766         /*
01767          * segments are stored backward.
01768          */
01769         PBuffer seg     = (PBuffer)inputTuple[tupleSize - 1].pData
01770                            + inputTuple[tupleSize - 1].cbData;
01771 
01772         if (segDesc) {
01773             tupleTrace << "Compressed Bitmap: SegDesc "
01774                        << inputTuple[tupleSize - 2].cbData
01775                        << " bytes, Seg "
01776                        << inputTuple[tupleSize - 1].cbData
01777                        << " bytes.\n";
01778 
01779             /*
01780              * Compressed bitmap(with both segment descriptors and segments)
01781              */
01782             PBuffer segDescEnd = segDesc + inputTuple[tupleSize - 2].cbData;
01783 
01784             tupleTrace << dumpSeg(segDesc, segDescEnd, seg);
01785         } else {
01786             tupleTrace << "Single Bitmap: Seg "
01787                        << inputTuple[tupleSize - 1].cbData
01788                        << " bytes\n";
01789             /*
01790              * An entry with a single bitmap segement.
01791              */
01792             tupleTrace << dumpBitmap(seg, inputTuple[tupleSize - 1].cbData);
01793         }
01794     }
01795 
01796     tupleTrace << "\n";
01797 
01798     return tupleTrace.str();
01799 }
01800 
01801 string LbmEntry::toRIDString(TupleData const &inputTuple)
01802 {
01803     ostringstream tupleTrace;
01804     ostringstream keyTrace;
01805     uint tupleSize = inputTuple.size();
01806     LcsRid inputStartRID = *((LcsRid *)inputTuple[tupleSize - 3].pData);
01807 
01808     keyTrace << "Key [";
01809     for (uint i = 0; i < tupleSize - 3 ; i ++) {
01810         keyTrace << printDatum(inputTuple[i], true);
01811         if (i < tupleSize - 4) {
01812             keyTrace << "|";
01813         }
01814     }
01815 
01816     keyTrace << "] ";
01817 
01818     tupleTrace << keyTrace.str()
01819                << "startRID ["
01820                << opaqueToInt(inputStartRID)
01821                << "] ";
01822 
01823     if (isSingleton(inputTuple)) {
01824         tupleTrace <<"Singleton\n";
01825     } else {
01826         keyTrace << "RID [";
01827         PBuffer segDesc = (PBuffer)inputTuple[tupleSize - 2].pData;
01828         /*
01829          * segments are stored backward.
01830          */
01831         PBuffer seg     = (PBuffer)inputTuple[tupleSize - 1].pData
01832                            + inputTuple[tupleSize - 1].cbData;
01833 
01834         tupleTrace << "\n";
01835 
01836         if (segDesc) {
01837             /*
01838              * Compressed bitmap(with both segment descriptors and segments)
01839              */
01840             PBuffer segDescEnd = segDesc + inputTuple[tupleSize - 2].cbData;
01841 
01842             tupleTrace <<
01843                 dumpSegRID(
01844                     segDesc,
01845                     segDescEnd,
01846                     seg,
01847                     keyTrace.str(),
01848                     inputStartRID);
01849         } else {
01850             /*
01851              * An entry with a single bitmap segement.
01852              */
01853             tupleTrace <<
01854                 dumpBitmapRID(
01855                     seg,
01856                     inputTuple[tupleSize - 1].cbData,
01857                     keyTrace.str(),
01858                     inputStartRID);
01859         }
01860     }
01861 
01862     return tupleTrace.str();
01863 }
01864 
01865 string LbmEntry::toString()
01866 {
01867     ostringstream tupleTrace;
01868     uint tupleSize = entryTuple.size();
01869 
01870     tupleTrace << "Key [";
01871 
01872     for (uint i = 0; i < tupleSize - 3 ; i ++) {
01873         tupleTrace << printDatum(entryTuple[i], true);
01874         if (i < tupleSize - 4) {
01875             tupleTrace << "|";
01876         }
01877     }
01878 
01879     tupleTrace << "] RID ["
01880                << opaqueToInt(*(LcsRid *)entryTuple[tupleSize - 3].pData)
01881                << "] ";
01882 
01883     if (isSingleton()) {
01884         tupleTrace <<"Singleton";
01885     } else {
01886         PBuffer pSegDesc = pSegDescStart;
01887         /*
01888          * segments are stored backward.
01889          */
01890         PBuffer pSeg     = pSegStart;
01891 
01892         if (pSegDesc) {
01893             tupleTrace << "Compressed Bitmap: SegDesc "
01894                        << pSegDescEnd - pSegDescStart
01895                        << " bytes, Seg "
01896                        << pSegStart - pSegEnd
01897                        << " bytes.\n";
01898 
01899             /*
01900              * Compressed bitmap(with both segment descriptors and segments)
01901              */
01902             tupleTrace << dumpSeg(pSegDesc, pSegDescEnd, pSeg);
01903         } else {
01904             tupleTrace << "Single Bitmap: Seg "
01905                        << pSegStart - pSegEnd
01906                        << " bytes\n";
01907             /*
01908              * An entry with a single bitmap segement.
01909              */
01910             tupleTrace << dumpBitmap(pSeg, pSegStart - pSegEnd);
01911         }
01912     }
01913 
01914     tupleTrace << "\n";
01915 
01916     return tupleTrace.str();
01917 }
01918 
01919 void LbmEntry::getSizeBounds(
01920     TupleDescriptor const &indexTupleDesc, uint pageSize,
01921     uint &minEntrySize, uint &maxEntrySize)
01922 {
01923     uint tupleSize = indexTupleDesc.size();
01924     TupleStorageByteLength segDescFieldMaxLength =
01925         indexTupleDesc[tupleSize - 2].cbStorage;
01926     TupleStorageByteLength segFieldMaxLength =
01927         indexTupleDesc[tupleSize - 1].cbStorage;
01928 
01929     // The length of the last two variable size fields are set to be the
01930     // maximum of the combined size, so that the buffer can fit the most number
01931     // of bytes for both fields. For exmaple, if the combined size should be
01932     // less then 512 bytes, then either field can have a size between 0 and
01933     // 512, however, the LbmEntry manages the buffer so that the combined size
01934     // of the two fields is not more than 512.
01935     assert(segDescFieldMaxLength == segFieldMaxLength);
01936 
01937     // The minimum size taken by the bitmap index tuple(in unmarshal format) is
01938     // then sizeof(keys) + size(RID), plus 2 bytes to encode at least one
01939     // segment(=1) and one segment desc byte(=1) with extended length
01940     // descriptor bytes(=3)
01941     minEntrySize =
01942         indexTupleDesc.getMaxByteCount()
01943         - segDescFieldMaxLength
01944         - segFieldMaxLength
01945         + 5;
01946     // Cap the size of the entry by the pageSize
01947     if (minEntrySize > pageSize) {
01948         minEntrySize = pageSize;
01949     }
01950 
01951     // the sum total of the two bitmap columns needs to be less than the
01952     // max size of a single column. The maximum size taken by the bitmap index
01953     // tuple(in unmarshal format) is then sizeof(keys) + size(RID) + sizeof(one
01954     // variable length column).
01955     maxEntrySize =
01956         indexTupleDesc.getMaxByteCount()
01957         - segDescFieldMaxLength;
01958 
01959     // Adjust max size based on page size. Try to fit a least minEntryPerPage
01960     // in a page.
01961     uint maxEntrySizeForPage = pageSize / LbmMinEntryPerPage;
01962 
01963     if (maxEntrySizeForPage < minEntrySize) {
01964         maxEntrySize = minEntrySize;
01965     } else {
01966         maxEntrySize = min(maxEntrySizeForPage, maxEntrySize);
01967     }
01968     assert(minEntrySize <= maxEntrySize);
01969 }
01970 
01971 void LbmEntry::generateRIDs(
01972     TupleData const &inputTuple, vector<LcsRid> &ridValues)
01973 {
01974     uint tupleSize = inputTuple.size();
01975     LcsRid inputStartRID = *((LcsRid *)inputTuple[tupleSize - 3].pData);
01976 
01977     if (isSingleton(inputTuple)) {
01978         ridValues.push_back(inputStartRID);
01979     } else {
01980         PBuffer segDesc = (PBuffer) inputTuple[tupleSize - 2].pData;
01981         /*
01982          * segments are stored backward.
01983          */
01984         PBuffer seg =
01985             (PBuffer) inputTuple[tupleSize - 1].pData
01986                 + inputTuple[tupleSize - 1].cbData;
01987         if (segDesc) {
01988             /*
01989              * Compressed bitmap(with both segment descriptors and segments)
01990              */
01991             PBuffer segDescEnd = segDesc + inputTuple[tupleSize - 2].cbData;
01992 
01993             generateSegRIDs(segDesc, segDescEnd, seg, ridValues, inputStartRID);
01994         } else {
01995             /*
01996              * An entry with a single bitmap segement.
01997              */
01998             generateBitmapRIDs(
01999                 seg, inputTuple[tupleSize - 1].cbData, ridValues,
02000                 inputStartRID);
02001         }
02002     }
02003 }
02004 
02005 void LbmEntry::generateSegRIDs(
02006     PBuffer segDesc, PBuffer segDescEnd, PBuffer seg,
02007     vector<LcsRid> &ridValues, LcsRid srid)
02008 {
02009     uint segBytes;
02010     uint zeroBytes;
02011     uint8_t bitmapByte;
02012     uint byteLength;
02013     uint i;
02014 
02015     while (segDesc < segDescEnd) {
02016         readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
02017 
02018         for (i = 0; i < segBytes; i++) {
02019             seg --;
02020             bitmapByte = *(uint8_t *)seg;
02021             for (byteLength = 8; byteLength > 0; byteLength--) {
02022                 if (bitmapByte % 2) {
02023                     ridValues.push_back(srid);
02024                 }
02025                 bitmapByte = bitmapByte >> 1;
02026                 srid ++;
02027             }
02028         }
02029 
02030         /*
02031          * Skip the zeros, but need to update srid.
02032          */
02033         srid += zeroBytes * LbmOneByteSize;
02034     }
02035 }
02036 
02037 void LbmEntry::generateBitmapRIDs(
02038     PBuffer seg, uint segBytes, vector<LcsRid> &ridValues, LcsRid srid)
02039 {
02040     uint byteLength;
02041     uint8_t bitmapByte;
02042 
02043     for (uint i = 0; i < segBytes; i++) {
02044         seg --;
02045         bitmapByte = *(uint8_t *)seg;
02046         for (byteLength = 8; byteLength > 0; byteLength--) {
02047             if (bitmapByte % 2) {
02048                 ridValues.push_back(srid);
02049             }
02050             bitmapByte = bitmapByte >> 1;
02051             srid ++;
02052         }
02053     }
02054 }
02055 
02056 uint LbmEntry::getScratchBufferSize(uint bitmapColSize)
02057 {
02058     // size of scratch buffer should be the max size of one of
02059     // the bitmap columns plus space for the rid plus space for an
02060     // optional zero extension
02061     return bitmapColSize + sizeof(LcsRid) + LbmZeroLengthExtended;
02062 }
02063 
02064 uint LbmEntry::getMaxBitmapSize(uint bitmapColSize)
02065 {
02066     // The maximum size of the segments combined is when the bitmep entry
02067     // is packed with as many LbmMaxSegSize segments as possible. There could
02068     // be a non-max length segment at the end.
02069     uint descriptorBytesPerSegment = 1;
02070     uint maxNumSegments =
02071         (bitmapColSize / (LbmMaxSegSize + descriptorBytesPerSegment))
02072         + ((bitmapColSize % (LbmMaxSegSize + descriptorBytesPerSegment)) == 0
02073            ? 0
02074            : 1);
02075     return (bitmapColSize - maxNumSegments * descriptorBytesPerSegment);
02076 }
02077 
02078 bool LbmEntry::containsRid(LcsRid rid)
02079 {
02080     LcsRid startRid = startRID;
02081 
02082     if (isSingleton()) {
02083         return (startRid == rid);
02084     } else {
02085         PBuffer pSegDesc = pSegDescStart;
02086         PBuffer pSeg = pSegStart;
02087         if (pSegDesc) {
02088             // loop through each segment
02089             while (pSegDesc < pSegDescEnd) {
02090                 uint nSegBytes;
02091                 uint nZeroBytes;
02092 
02093                 readSegDescAndAdvance(pSegDesc, nSegBytes, nZeroBytes);
02094                 int rc = segmentContainsRid(rid, startRid, pSeg, nSegBytes);
02095                 if (rc == 0) {
02096                     return true;
02097                 } else if (rc < 0) {
02098                     return false;
02099                 } else {
02100                     // rid wasn't within the current segment range so
02101                     // move to the next segment
02102                     startRid += (nSegBytes + nZeroBytes) * LbmOneByteSize;
02103                     pSeg -= nSegBytes;
02104                     // if the rid is in the range of the zeroBytes, then it's
02105                     // not contained in the entry
02106                     if (rid < startRid) {
02107                         return false;
02108                     }
02109                 }
02110             }
02111             return false;
02112         } else {
02113             // single bitmap -- just check the one segment
02114             int rc =
02115                 segmentContainsRid(
02116                     rid,
02117                     startRid,
02118                     pSegStart,
02119                     pSegStart - pSegEnd);
02120             if (rc == 0) {
02121                 return true;
02122             } else {
02123                 return false;
02124             }
02125         }
02126     }
02127 }
02128 
02129 int LbmEntry::segmentContainsRid(
02130     LcsRid rid,
02131     LcsRid startRid,
02132     PBuffer pSeg,
02133     uint nSegBytes)
02134 {
02135     if (rid >= startRid && rid < startRid + (nSegBytes * LbmOneByteSize)) {
02136         // rid is within the current segment; check the
02137         // appropriate byte
02138         int byteNum = (opaqueToInt(rid - startRid) / LbmOneByteSize) + 1;
02139         uint8_t setRid = 1 << (opaqueToInt(rid) % LbmOneByteSize);
02140         if (pSeg[-byteNum] & setRid) {
02141             return 0;
02142         } else {
02143             return -1;
02144         }
02145     } else {
02146         return 1;
02147     }
02148 }
02149 
02150 bool LbmEntry::inRange(LcsRid rid)
02151 {
02152     LcsRid endRID = startRID + getRowCount() - 1;
02153     return (rid >= startRID && rid <= endRID);
02154 }
02155 
02156 void LbmEntry::validateEntrySize()
02157 {
02158     if (currentEntrySize <= scratchBufferSize) {
02159         return;
02160     }
02161     // Extract the portion of the entry that corresponds to the key
02162     TupleData keyData;
02163     keyData.resize(keyDesc.size());
02164     for (uint i = 0; i < keyData.size(); i++) {
02165         keyData[i] = entryTuple[i];
02166     }
02167 
02168     std::ostringstream oss;
02169     TuplePrinter tuplePrinter;
02170     tuplePrinter.print(oss, keyDesc, keyData);
02171     throw FennelResource::instance().bitmapEntryTooLong(
02172         currentEntrySize,
02173         scratchBufferSize,
02174         oss.str());
02175 }
02176 
02177 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmEntry.cpp#26 $");
02178 
02179 // End LbmEntry.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1