00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/bitmap/LbmEntry.h"
00024 #include "fennel/tuple/TuplePrinter.h"
00025 #include "fennel/common/FennelResource.h"
00026 #include <iomanip>
00027 #include <sstream>
00028 #include <boost/scoped_array.hpp>
00029
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmEntry.cpp#26 $");
00031
00032
00033 LbmEntry::LbmEntry()
00034 {
00035 scratchBuffer = NULL;
00036
00037
00038
00039
00040 pSegDescStart = NULL;
00041 pSegStart = NULL;
00042
00043 resetSegment();
00044 }
00045
00046
00047 void LbmEntry::init(
00048 PBuffer scratchBufferInit,
00049 PBuffer mergeScratchBufferInit,
00050 uint scratchBufferSizeInit,
00051 TupleDescriptor const &tupleDesc)
00052 {
00053 scratchBuffer = scratchBufferInit;
00054 mergeScratchBuffer = mergeScratchBufferInit;
00055
00056
00057
00058
00059
00060 scratchBufferSize = scratchBufferSizeInit;
00061 scratchBufferUsableSize = scratchBufferSizeInit - LbmZeroLengthExtended;
00062
00063 #ifdef DEBUG
00064
00065
00066
00067 memset(scratchBuffer, 0xFF, scratchBufferSizeInit);
00068 if (mergeScratchBuffer != NULL) {
00069 memset(mergeScratchBuffer, 0xFF, scratchBufferSizeInit);
00070 }
00071 #endif
00072
00073
00074
00075
00076 entryTuple.compute(tupleDesc);
00077 currentEntrySize = 0;
00078
00079 keyDesc.resize(tupleDesc.size() - 2);
00080 for (uint i = 0; i < keyDesc.size(); i++) {
00081 keyDesc[i] = tupleDesc[i];
00082 }
00083
00084 bitmapSegSize = tupleDesc[tupleDesc.size() - 1].cbStorage;
00085 maxSegSize = getMaxBitmapSize(bitmapSegSize);
00086 }
00087
00088
00089 void LbmEntry::setEntryTuple(TupleData const &indexTuple)
00090 {
00091
00092
00093
00094 uint RIDField = entryTuple.size() - 3;
00095
00096
00097
00098
00099 uint segmentDescField = entryTuple.size() - 2;
00100
00101
00102
00103
00104 uint segmentField = entryTuple.size() - 1;
00105
00106
00107
00108
00109
00110 currentEntrySize = 0;
00111
00112 for (int i = 0; i < entryTuple.size(); i ++) {
00113 if (i != segmentField) {
00114 entryTuple[i].pData = scratchBuffer + currentEntrySize;
00115 } else {
00116
00117
00118
00119
00120 entryTuple[i].pData = scratchBuffer + scratchBufferSize
00121 - indexTuple[i].cbData;
00122 }
00123
00124 entryTuple[i].memCopyFrom(indexTuple[i]);
00125
00126 if (entryTuple[i].isNull()) {
00127 entryTuple[i].cbData = 0;
00128 }
00129 currentEntrySize += entryTuple[i].cbData;
00130 if (i == RIDField) {
00131 keySize = currentEntrySize;
00132 }
00133 }
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144 validateEntrySize();
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159 startRID = *((LcsRid *)entryTuple[RIDField].pData);
00160
00161 if (isSingleton(indexTuple)) {
00162
00163
00164
00165 pSegDescStart = pSegDescEnd = scratchBuffer + keySize;
00166 pSegStart = pSegEnd = scratchBuffer + scratchBufferSize;
00167
00168 resetSegment();
00169 } else {
00170
00171
00172
00173
00174 assert((startRID) % LbmOneByteSize == (LcsRid)0);
00175
00176 uint segLength = entryTuple[segmentField].cbData;
00177
00178
00179
00180 if (isSingleBitmap(entryTuple)) {
00181
00182
00183
00184
00185
00186
00187 pSegDescStart = pSegDescEnd = scratchBuffer + keySize;
00188 uint reservedSpace = 0;
00189 bool ret = addSegDesc(reservedSpace, segLength);
00190 assert (ret);
00191 } else {
00192 pSegDescStart = (PBuffer)entryTuple[segmentDescField].pData;
00193 pSegDescEnd = pSegDescStart + entryTuple[segmentDescField].cbData;
00194 }
00195
00196 pSegStart = scratchBuffer + scratchBufferSize;
00197 pSegEnd = pSegStart - segLength;
00198
00199 resetSegment();
00200 }
00201 }
00202
00203
00204 bool LbmEntry::setRIDNewSegment(LcsRid rid)
00205 {
00206 if (!openNewSegment(rid)) {
00207 return false;
00208 }
00209
00210 setRIDCurrentSegByte(rid);
00211 return true;
00212 }
00213
00214
00215 bool LbmEntry::setRIDAdjacentSegByte(LcsRid rid)
00216 {
00217 assert (!isSingleBitmap());
00218
00219 if (currSegLength == LbmMaxSegSize) {
00220
00221
00222
00223
00224
00225
00226
00227 closeCurrentSegment();
00228 return setRIDNewSegment(rid);
00229 }
00230
00231 if (currentEntrySize + 1 > scratchBufferUsableSize) {
00232 closeCurrentSegment();
00233 return false;
00234 }
00235
00236 if (pSegStart - pSegEnd == bitmapSegSize) {
00237 closeCurrentSegment();
00238 return false;
00239 }
00240 pSegEnd--;
00241 currSegByte = pSegEnd;
00242 currSegByteStartRID = roundToByteBoundary(rid);
00243 currSegLength ++;
00244
00245 *currSegByte = (uint8_t)(1 << (opaqueToInt(rid) % LbmOneByteSize));
00246
00247 currentEntrySize += 1;
00248
00249
00250
00251
00252 if (isSegmentOpen()) {
00253 setSegLength(*currSegDescByte, currSegLength);
00254 }
00255
00256 return true;
00257 }
00258
00259
00260 bool LbmEntry::openNewSegment(LcsRid rid)
00261 {
00262 assert (!isSegmentOpen());
00263 assert (pSegDescEnd);
00264
00265 if (currentEntrySize + 2 > scratchBufferUsableSize) {
00266 return false;
00267 }
00268
00269 if (pSegStart - pSegEnd == bitmapSegSize) {
00270 return false;
00271 }
00272 pSegEnd--;
00273 currSegByte = pSegEnd;
00274 *currSegByte = 0;
00275
00276 currSegDescByte = pSegDescEnd;
00277 pSegDescEnd ++;
00278
00279 currSegByteStartRID = roundToByteBoundary(rid);
00280 currSegLength = 1;
00281
00282
00283
00284
00285
00286
00287 setSegLength(*currSegDescByte, currSegLength);
00288 currentEntrySize += 2;
00289
00290 return true;
00291 }
00292
00293
00294 void LbmEntry::openLastSegment()
00295 {
00296 assert (!isSegmentOpen());
00297 assert (pSegEnd);
00298
00299 uint lastZeroRIDs = 0;
00300 uint rowCount = getRowCount(currSegDescByte, lastZeroRIDs);
00301
00302
00303 currSegLength = getSegLength(*currSegDescByte);
00304
00305
00306 currentEntrySize -= getZeroLengthByteCount(*currSegDescByte);
00307
00308 pSegDescEnd = currSegDescByte + 1;
00309
00310
00311
00312 setSegLength(*currSegDescByte, currSegLength);
00313
00314
00315 currSegByte = pSegEnd;
00316
00317 currSegByteStartRID =
00318 startRID + rowCount - lastZeroRIDs - LbmOneByteSize;
00319 }
00320
00321
00322 void LbmEntry::closeCurrentSegment()
00323 {
00324 assert (isSegmentOpen());
00325 assert (currSegLength >= 1 && currSegLength <= LbmMaxSegSize);
00326
00327 setSegLength(*currSegDescByte, currSegLength);
00328 resetSegment();
00329 }
00330
00331
00332 bool LbmEntry::closeCurrentSegment(LcsRid rid)
00333 {
00334 if (!isSegmentOpen()) {
00335
00336 return true;
00337 }
00338
00339 int ridDistance = 0;
00340
00341 assert(currSegLength >= 1 && currSegLength <= LbmMaxSegSize);
00342
00343 setSegLength(*currSegDescByte, currSegLength);
00344
00345
00346
00347
00348 ridDistance =
00349 opaqueToInt(roundToByteBoundary(rid) - currSegByteStartRID)
00350 / LbmOneByteSize - 1;
00351
00352 if (ridDistance <= 0) {
00353
00354
00355
00356
00357
00358 } else {
00359 uint lengthBytes;
00360 assert(currSegDescByte + 1 == pSegDescEnd);
00361 if (!setZeroLength(ridDistance, currSegDescByte, lengthBytes)) {
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375 resetSegment();
00376 return false;
00377 }
00378 currentEntrySize += lengthBytes;
00379 pSegDescEnd += lengthBytes;
00380 }
00381 resetSegment();
00382 return true;
00383 }
00384
00385 bool LbmEntry::setZeroLength(
00386 uint nZeroBytes, PBuffer pLenDesc, uint &lengthBytes)
00387 {
00388 *(pLenDesc) &= ~LbmZeroLengthMask;
00389 if (nZeroBytes <= LbmZeroLengthCompact) {
00390
00391
00392
00393 *(pLenDesc) |= (uint8_t) (nZeroBytes & LbmZeroLengthMask);
00394 lengthBytes = 0;
00395 } else {
00396 lengthBytes = value2ByteArray(
00397 nZeroBytes, pLenDesc + 1, LbmZeroLengthExtended);
00398
00399 if (lengthBytes) {
00400
00401
00402
00403
00404 *(pLenDesc) |=
00405 (uint8_t) ((lengthBytes + LbmZeroLengthCompact) &
00406 LbmZeroLengthMask);
00407 } else {
00408 return false;
00409 }
00410 }
00411
00412 return true;
00413 }
00414
00415 uint LbmEntry::getRowCount()
00416 {
00417 PBuffer lastSegDescByte = NULL;
00418 uint lastZeroRIDs = 0;
00419
00420 return getRowCount(lastSegDescByte, lastZeroRIDs);
00421 }
00422
00423
00424 uint LbmEntry::getCompressedRowCount(
00425 PBuffer pDescStart,
00426 PBuffer pDescEnd,
00427 PBuffer &lastSegDescByte,
00428 uint &lastZeroRIDs)
00429 {
00430 PBuffer p1 = pDescStart;
00431 uint lastLengthDesc = 0;
00432 uint lastLengthDescBytes, lastZeroBytes;
00433 uint rowCount = 0;
00434
00435 while (p1 < pDescEnd) {
00436
00437
00438
00439 rowCount += getSegLength(*p1) * LbmOneByteSize;
00440
00441
00442
00443
00444 lastSegDescByte = p1;
00445 lastLengthDesc = *p1 & LbmZeroLengthMask;
00446 p1 ++;
00447
00448 if (lastLengthDesc <= LbmZeroLengthCompact) {
00449 lastZeroBytes = lastLengthDesc;
00450 } else {
00451 lastLengthDescBytes = lastLengthDesc - LbmZeroLengthCompact;
00452
00453
00454
00455 lastZeroBytes =
00456 byteArray2Value(p1, lastLengthDescBytes);
00457 p1 += lastLengthDescBytes;
00458 }
00459
00460 lastZeroRIDs = lastZeroBytes * LbmOneByteSize;
00461 rowCount += lastZeroRIDs;
00462 }
00463 return rowCount;
00464 }
00465
00466
00467 uint LbmEntry::getRowCount(
00468 PBuffer &lastSegDescByte,
00469 uint &lastZeroRIDs)
00470 {
00471 assert (!isSingleBitmap());
00472
00473 uint rowCount = 0;
00474
00475 if (isSingleton()) {
00476 rowCount = 1;
00477 lastSegDescByte = NULL;
00478 lastZeroRIDs = 0;
00479 } else {
00480 rowCount =
00481 getCompressedRowCount(
00482 pSegDescStart, pSegDescEnd,
00483 lastSegDescByte, lastZeroRIDs);
00484 }
00485 return rowCount;
00486 }
00487
00488
00489 uint LbmEntry::getRowCount(TupleData const &inputTuple)
00490 {
00491 uint rowCount = 0;
00492
00493 if (isSingleton(inputTuple)) {
00494 rowCount = 1;
00495 } else if (isSingleBitmap(inputTuple)) {
00496
00497
00498
00499 rowCount =
00500 inputTuple[inputTuple.size() - 1].cbData * LbmOneByteSize;
00501 } else {
00502 PBuffer lastSegDescByte = NULL;
00503 uint lastZeroRIDs = 0;
00504 PBuffer pDescStart = (PBuffer)inputTuple[inputTuple.size() - 2].pData;
00505 PBuffer pDescEnd =
00506 (PBuffer)(inputTuple[inputTuple.size() - 2].pData +
00507 inputTuple[inputTuple.size() - 2].cbData);
00508 rowCount =
00509 getCompressedRowCount(
00510 pDescStart, pDescEnd,
00511 lastSegDescByte, lastZeroRIDs);
00512 }
00513 return rowCount;
00514 }
00515
00516
00517 bool LbmEntry::singleton2Bitmap()
00518 {
00519 if (setRIDNewSegment(startRID)) {
00520 startRID = roundToByteBoundary(startRID);
00521 entryTuple[entryTuple.size() - 2].pData = pSegDescStart;
00522 return true;
00523 } else {
00524 return false;
00525 }
00526 }
00527
00528
00529 bool LbmEntry::setRID(LcsRid rid)
00530 {
00531 assert (!isSingleBitmap());
00532
00533
00534
00535
00536 if (isSingleton()) {
00537
00538
00539
00540
00541 if (!singleton2Bitmap()) {
00542
00543
00544
00545
00546 return false;
00547 }
00548
00549
00550
00551
00552 } else if (!isSegmentOpen()) {
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562 openLastSegment();
00563 }
00564
00565 assert (isSegmentOpen());
00566
00567
00568
00569
00570 if (!isSegmentOpen()) {
00571
00572
00573
00574 return setRIDNewSegment(rid);
00575 } else {
00576
00577
00578
00579 uint distance = opaqueToInt(rid - currSegByteStartRID);
00580
00581 assert(distance >= 0);
00582
00583 if (distance < LbmOneByteSize) {
00584
00585
00586
00587 setRIDCurrentSegByte(rid);
00588 } else if (distance < LbmOneByteSize * 2) {
00589
00590
00591
00592 return setRIDAdjacentSegByte(rid);
00593 } else {
00594
00595
00596
00597 if (closeCurrentSegment(rid)) {
00598
00599
00600
00601
00602 if (!setRIDNewSegment(rid)) {
00603 return false;
00604 }
00605 } else {
00606
00607
00608
00609
00610 return false;
00611 }
00612 }
00613 }
00614 return true;
00615 }
00616
00617
00618 int LbmEntry::compareEntry(
00619 TupleData const &inputTuple,
00620 TupleDescriptor const &tupleDesc) const
00621 {
00622 return tupleDesc.compareTuplesKey(
00623 entryTuple,
00624 inputTuple,
00625 entryTuple.size() - 3);
00626 }
00627
00628
00629 bool LbmEntry::addSegDesc(uint reservedSpace, uint bitmapLength)
00630 {
00631 uint leftOverSpace =
00632 scratchBufferUsableSize - currentEntrySize - reservedSpace;
00633
00634 uint segDescCount = bitmapLength / LbmMaxSegSize;
00635 uint lastSegLength = bitmapLength % LbmMaxSegSize;
00636 uint addedSegDescBytes = segDescCount + (lastSegLength ? 1 : 0);
00637
00638 if (addedSegDescBytes > leftOverSpace) {
00639 return false;
00640 } else {
00641 int i;
00642
00643
00644
00645
00646 for (i = 0; i < segDescCount; i ++) {
00647 setSegLength(*pSegDescEnd, LbmMaxSegSize);
00648 pSegDescEnd ++;
00649 }
00650
00651 if (lastSegLength) {
00652 setSegLength(*pSegDescEnd, lastSegLength);
00653 pSegDescEnd ++;
00654 }
00655
00656 resetSegment();
00657
00658
00659 currentEntrySize += addedSegDescBytes;
00660
00661 return true;
00662 }
00663 }
00664
00665 uint LbmEntry::getMergeSpaceRequired(TupleData const &inputTuple)
00666 {
00667 uint mergeSpaceRequired = 0;
00668
00669
00670
00671
00672
00673
00674
00675 uint inputSegDescLength =
00676 inputTuple[inputTuple.size() - 2].pData ?
00677 inputTuple[inputTuple.size() - 2].cbData : 0;
00678 uint inputSegLength =
00679 inputTuple[inputTuple.size() - 1].pData ?
00680 inputTuple[inputTuple.size() - 1].cbData : 0;
00681
00682 if (isSingleton(inputTuple)) {
00683
00684 mergeSpaceRequired = 2;
00685 } else if (isSingleBitmap(inputTuple)) {
00686
00687 uint segDescCount = (inputSegLength / LbmMaxSegSize) + 1;
00688 mergeSpaceRequired = segDescCount + inputSegLength;
00689 } else {
00690
00691 mergeSpaceRequired = inputSegDescLength + inputSegLength;
00692 }
00693
00694
00695
00696
00697 if (isSingleton()) {
00698 mergeSpaceRequired += 2;
00699 }
00700
00701 return mergeSpaceRequired;
00702 }
00703
00704 bool LbmEntry::growEntry(LcsRid rid, uint reserveSpace)
00705 {
00706
00707
00708
00709
00710 int leftOverSpace =
00711 scratchBufferUsableSize - currentEntrySize - reserveSpace;
00712
00713 if (leftOverSpace < 0) {
00714 return false;
00715 }
00716
00717 if (isSingleton()) {
00718 if (!singleton2Bitmap()) {
00719 return false;
00720 }
00721 }
00722
00723 if (!isSegmentOpen()) {
00724 openLastSegment();
00725 }
00726
00727
00728
00729
00730 uint rowCount = getRowCount();
00731
00732
00733
00734
00735
00736 LcsRid endRID = startRID + rowCount - 1;
00737
00738 if (rid >= endRID + 1) {
00739
00740
00741
00742
00743
00744 if (!closeCurrentSegment(rid)) {
00745 return false;
00746 }
00747 }
00748
00749
00750
00751
00752
00753
00754
00755 return true;
00756 }
00757
00758 bool LbmEntry::adjustEntry(TupleData &inputTuple)
00759 {
00760 LcsRid &inputStartRID = *((LcsRid*)inputTuple[inputTuple.size() - 3].pData);
00761
00762
00763
00764
00765
00766 if (isSingleton() && isSingleton(inputTuple)) {
00767 assert(startRID != inputStartRID);
00768 return false;
00769 }
00770
00771 if (isSingleton(inputTuple)) {
00772
00773
00774
00775
00776
00777 currSegByte = pSegEnd;
00778 setRIDCurrentSegByte(inputStartRID);
00779 return true;
00780 } else if (isSingleton()) {
00781
00782
00783
00784
00785
00786
00787
00788
00789 assert ((opaqueToInt(startRID) - opaqueToInt(inputStartRID))
00790 < LbmOneByteSize);
00791
00792
00793 LcsRid oldStartRid = startRID;
00794 setEntryTuple(inputTuple);
00795
00796
00797 setRIDSegByte(pSegStart - 1, oldStartRid);
00798
00799
00800 return true;
00801 } else {
00802 currSegByte = pSegEnd;
00803 uint8_t inputFirstSegByte =
00804 *(uint8_t *)(inputTuple[inputTuple.size() - 1].pData +
00805 inputTuple[inputTuple.size() - 1].cbData - 1);
00806 *currSegByte |= inputFirstSegByte;
00807
00808
00809
00810
00811
00812
00813
00814
00815 if (inputTuple[inputTuple.size() -1].cbData == 1) {
00816 assert(inputTuple[inputTuple.size() - 2].pData == NULL);
00817 return true;
00818 }
00819
00820
00821
00822
00823
00824
00825
00826
00827 inputTuple[inputTuple.size() -1].cbData --;
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837 PBuffer pFirstDesc = (PBuffer)inputTuple[inputTuple.size() - 2].pData;
00838
00839
00840
00841
00842 uint numRIDsMoved = LbmOneByteSize;
00843
00844 if (pFirstDesc) {
00845 uint8_t firstSegLength = ((*pFirstDesc) >> LbmHalfByteSize) + 1;
00846
00847 if (firstSegLength >= 2) {
00848
00849
00850
00851 firstSegLength --;
00852
00853
00854
00855 *pFirstDesc &= LbmZeroLengthMask;
00856
00857
00858
00859 *pFirstDesc |= (firstSegLength -1) << LbmHalfByteSize;
00860 } else {
00861
00862
00863
00864
00865
00866 uint firstZeroLength = (*pFirstDesc) & LbmZeroLengthMask;
00867 pFirstDesc ++;
00868
00869 inputTuple[inputTuple.size() - 2].cbData --;
00870
00871 if (firstZeroLength > LbmZeroLengthCompact) {
00872 uint firstZeroLengthBytes =
00873 firstZeroLength - LbmZeroLengthCompact;
00874
00875
00876
00877 firstZeroLength =
00878 byteArray2Value(pFirstDesc, firstZeroLengthBytes);
00879
00880
00881
00882 pFirstDesc += firstZeroLengthBytes;
00883 inputTuple[inputTuple.size() - 2].cbData -=
00884 firstZeroLengthBytes;
00885 }
00886 inputTuple[inputTuple.size() - 2].pData = pFirstDesc;
00887 numRIDsMoved += firstZeroLength * LbmOneByteSize;
00888 }
00889 }
00890
00891
00892
00893 inputStartRID += numRIDsMoved;
00894 }
00895 return false;
00896 }
00897
00898 bool LbmEntry::mergeEntry(TupleData &inputTuple)
00899 {
00900 assert (!isSingleBitmap());
00901
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912
00913
00914
00915
00916
00917 uint rowCount = getRowCount();
00918
00919
00920
00921
00922
00923
00924 LcsRid endRID = startRID + rowCount - 1;
00925 LcsRid &inputStartRID = *((LcsRid*)inputTuple[inputTuple.size() - 3].pData);
00926
00927
00928
00929
00930
00931
00932 if (inputStartRID <= endRID &&
00933 inputStartRID >= roundToByteBoundary(endRID))
00934 {
00935 if (adjustEntry(inputTuple)) {
00936 validateEntrySize();
00937 return true;
00938 }
00939 }
00940
00941
00942
00943
00944
00945
00946 if (inputStartRID <= endRID) {
00947 permAssert(isSingleton(inputTuple));
00948 bool rc = spliceSingleton(inputTuple);
00949 validateEntrySize();
00950 return rc;
00951 }
00952
00953 uint mergeSpaceRequired = getMergeSpaceRequired(inputTuple);
00954
00955 if (!growEntry(inputStartRID, mergeSpaceRequired)) {
00956
00957
00958
00959
00960
00961
00962 return false;
00963 }
00964
00965
00966
00967
00968 if (isSingleton(inputTuple)) {
00969 bool rc = setRID(inputStartRID);
00970 validateEntrySize();
00971 return rc;
00972 }
00973
00974
00975
00976
00977
00978 uint inputSegDescLength =
00979 inputTuple[inputTuple.size() - 2].pData ?
00980 inputTuple[inputTuple.size() - 2].cbData : 0;
00981 uint inputSegLength =
00982 inputTuple[inputTuple.size() - 1].pData ?
00983 inputTuple[inputTuple.size() - 1].cbData : 0;
00984 if (pSegStart - pSegEnd + inputSegLength > bitmapSegSize) {
00985 return false;
00986 }
00987
00988 if (!isSingleBitmap(inputTuple)) {
00989
00990
00991
00992
00993 memcpy(
00994 pSegDescEnd,
00995 inputTuple[inputTuple.size() - 2].pData,
00996 inputSegDescLength);
00997 pSegDescEnd += inputSegDescLength;
00998 currentEntrySize += inputSegDescLength;
00999 } else {
01000
01001
01002 uint reservedSpace = inputSegLength;
01003 if (!addSegDesc(reservedSpace, inputSegLength)) {
01004 return false;
01005 }
01006 }
01007
01008
01009
01010
01011 pSegEnd -= inputSegLength;
01012 memcpy(
01013 pSegEnd,
01014 inputTuple[inputTuple.size() - 1].pData,
01015 inputSegLength);
01016 currentEntrySize += inputSegLength;
01017 validateEntrySize();
01018 return true;
01019 }
01020
01021 bool LbmEntry::spliceSingleton(TupleData &inputTuple)
01022 {
01023 assert(!isSingleBitmap());
01024
01025 int ridField = inputTuple.size() - 3;
01026 LcsRid &inputStartRID = *((LcsRid*) inputTuple[ridField].pData);
01027
01028
01029
01030 if (inputStartRID < startRID) {
01031
01032
01033
01034 if (isSegmentOpen()) {
01035 closeCurrentSegment();
01036 }
01037 TupleData origCurrEntry;
01038 copyToMergeBuffer(origCurrEntry, startRID, pSegStart, pSegDescStart);
01039
01040 setEntryTuple(inputTuple);
01041 bool rc = mergeEntry(origCurrEntry);
01042
01043
01044
01045 if (rc == false) {
01046 for (int i = 0; i < origCurrEntry.size(); i++) {
01047 inputTuple[i] = origCurrEntry[i];
01048 }
01049 }
01050 return rc;
01051
01052 } else {
01053
01054
01055 PBuffer segDesc = pSegDescStart;
01056 PBuffer seg = pSegStart;
01057 LcsRid srid = startRID;
01058 while (segDesc < pSegDescEnd) {
01059 uint segBytes;
01060 uint zeroBytes;
01061 PBuffer prevSegDesc = segDesc;
01062 readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01063
01064
01065 if (inputStartRID >= srid &&
01066 inputStartRID < srid + (segBytes * LbmOneByteSize))
01067 {
01068 setRIDSegByte(
01069 seg - 1 -
01070 opaqueToInt(inputStartRID - srid) / LbmOneByteSize,
01071 inputStartRID);
01072 return true;
01073 }
01074 LcsRid nextSrid = srid + (zeroBytes + segBytes) * LbmOneByteSize;
01075
01076 if (inputStartRID < nextSrid) {
01077 return addNewSegment(
01078 inputTuple, srid, prevSegDesc, segDesc, seg, segBytes,
01079 zeroBytes);
01080 }
01081 seg -= segBytes;
01082 srid = nextSrid;
01083 }
01084
01085
01086 permAssert(false);
01087 return true;
01088 }
01089 }
01090
01091 void LbmEntry::copyToMergeBuffer(
01092 TupleData &newEntry, LcsRid newStartRID, PBuffer segStart,
01093 PBuffer segDescStart)
01094 {
01095 assert(mergeScratchBuffer != NULL);
01096 PBuffer pTempBuff = mergeScratchBuffer;
01097 memcpy(pTempBuff, scratchBuffer, keySize);
01098
01099 newEntry.resize(entryTuple.size());
01100 for (int i = 0; i < entryTuple.size() - 3; i++) {
01101 newEntry[i] = entryTuple[i];
01102 newEntry[i].pData = pTempBuff;
01103 newEntry[i].cbData = entryTuple[i].cbData;
01104 pTempBuff += entryTuple[i].cbData;
01105 }
01106 memcpy(pTempBuff, &newStartRID, sizeof(LcsRid));
01107 uint ridField = entryTuple.size() - 3;
01108 newEntry[ridField].pData = pTempBuff;
01109 newEntry[ridField].cbData = sizeof(LcsRid);
01110
01111 newEntry[ridField + 1].cbData = pSegDescEnd - segDescStart;
01112 if (newEntry[ridField + 1].cbData == 0) {
01113 newEntry[ridField + 1].pData = NULL;
01114 } else {
01115 newEntry[ridField + 1].pData = mergeScratchBuffer + keySize;
01116 memcpy(
01117 (void *) newEntry[ridField + 1].pData, segDescStart,
01118 pSegDescEnd - segDescStart);
01119 }
01120
01121 uint segLen = segStart - pSegEnd;
01122 newEntry[ridField + 2].cbData = segLen;
01123 if (segLen == 0) {
01124 newEntry[ridField + 2].pData = NULL;
01125 } else {
01126 newEntry[ridField + 2].pData =
01127 mergeScratchBuffer + scratchBufferSize - segLen;
01128 memcpy(
01129 (void *) newEntry[ridField + 2].pData, segStart - segLen, segLen);
01130 }
01131 }
01132
01133 bool LbmEntry::addNewSegment(
01134 TupleData &inputTuple, LcsRid prevSrid, PBuffer prevSegDesc,
01135 PBuffer nextSegDesc, PBuffer prevSeg, uint prevSegBytes, uint prevZeroBytes)
01136 {
01137 assert(isSingleton(inputTuple));
01138
01139
01140
01141 if (isSegmentOpen()) {
01142 closeCurrentSegment();
01143 }
01144
01145 LcsRid inputStartRID =
01146 *((LcsRid*) inputTuple[inputTuple.size() - 3].pData);
01147 LcsRid prevEridPlus1 = prevSrid + (prevSegBytes * LbmOneByteSize);
01148 LcsRid nextSrid =
01149 prevSrid + (prevSegBytes + prevZeroBytes) * LbmOneByteSize;
01150
01151
01152
01153 if ((inputStartRID >= prevEridPlus1 &&
01154 inputStartRID < prevEridPlus1 + LbmOneByteSize) ||
01155 (inputStartRID >= nextSrid - LbmOneByteSize &&
01156 inputStartRID < nextSrid))
01157 {
01158 return addNewAdjacentSegment(
01159 inputTuple, prevSrid, prevSegDesc, nextSegDesc, prevSeg,
01160 prevSegBytes, prevZeroBytes);
01161 }
01162
01163 return addNewMiddleSegment(
01164 inputTuple, prevSrid, prevSegDesc, nextSegDesc, prevSeg,
01165 prevSegBytes, prevZeroBytes);
01166 }
01167
01168 bool LbmEntry::addNewMiddleSegment(
01169 TupleData &inputTuple, LcsRid prevSrid, PBuffer prevSegDesc,
01170 PBuffer nextSegDesc, PBuffer prevSeg, uint prevSegBytes, uint prevZeroBytes)
01171 {
01172 assert(isSingleton(inputTuple));
01173
01174
01175
01176
01177 LcsRid inputStartRID =
01178 *((LcsRid*) inputTuple[inputTuple.size() - 3].pData);
01179 LcsRid prevEridPlus1 = prevSrid + (prevSegBytes * LbmOneByteSize);
01180 uint leftZeroBytes = opaqueToInt(inputStartRID - prevEridPlus1) /
01181 LbmOneByteSize;
01182 uint rightZeroBytes = prevZeroBytes - leftZeroBytes - 1;
01183 uint spaceRequired = 2;
01184 int spaceNeededBefore = computeSpaceForZeroBytes(prevZeroBytes);
01185 int spaceNeededAfter = computeSpaceForZeroBytes(leftZeroBytes) +
01186 computeSpaceForZeroBytes(rightZeroBytes);
01187 spaceRequired += (spaceNeededAfter - spaceNeededBefore);
01188 assert(spaceRequired >= 1);
01189 if (spaceRequired + currentEntrySize > scratchBufferUsableSize) {
01190 splitEntry(inputTuple);
01191 return false;
01192 }
01193
01194
01195
01196 uint remainingSegLen = computeSegLength(nextSegDesc);
01197
01198
01199 uint leftZeroLength;
01200 setZeroLength(leftZeroBytes, prevSegDesc, leftZeroLength);
01201
01202
01203
01204 uint shiftAmount = spaceNeededAfter + 1 - spaceNeededBefore;
01205 if (shiftAmount > 0) {
01206 uint remainingSegDescLen = computeSegDescLength(nextSegDesc);
01207 PBuffer segDesc = prevSegDesc + spaceNeededBefore + 1;
01208 for (int i = remainingSegDescLen - 1; i >= 0; i--) {
01209 segDesc[i + shiftAmount] = segDesc[i];
01210 }
01211 }
01212
01213
01214 setSegLength(*(prevSegDesc + 1 + leftZeroLength), 1);
01215 uint size;
01216 setZeroLength(rightZeroBytes, prevSegDesc + 1 + leftZeroLength, size);
01217 pSegDescEnd += shiftAmount;
01218
01219 addNewRid(
01220 nextSegDesc, prevSeg - prevSegBytes - 1, inputStartRID,
01221 remainingSegLen);
01222
01223 currentEntrySize += spaceRequired;
01224 return true;
01225 }
01226
01227 bool LbmEntry::addNewAdjacentSegment(
01228 TupleData &inputTuple, LcsRid prevSrid, PBuffer prevSegDesc,
01229 PBuffer nextSegDesc, PBuffer prevSeg, uint prevSegBytes, uint prevZeroBytes)
01230 {
01231 assert(isSingleton(inputTuple));
01232
01233 LcsRid inputStartRID =
01234 *((LcsRid*) inputTuple[inputTuple.size() - 3].pData);
01235 LcsRid prevEridPlus1 = prevSrid + (prevSegBytes * LbmOneByteSize);
01236 LcsRid nextSrid =
01237 prevSrid + (prevSegBytes + prevZeroBytes) * LbmOneByteSize;
01238
01239
01240
01241
01242
01243
01244
01245 uint spaceRequired = 1;
01246 int spaceNeededBefore = computeSpaceForZeroBytes(prevZeroBytes);
01247 int spaceNeededAfter = computeSpaceForZeroBytes(prevZeroBytes - 1);
01248 assert(spaceNeededAfter <= spaceNeededBefore);
01249 spaceRequired += (spaceNeededAfter - spaceNeededBefore);
01250 assert(spaceRequired >= 0);
01251
01252 bool combine = false;
01253 if (nextSrid == prevEridPlus1 + LbmOneByteSize) {
01254 combine = true;
01255 spaceRequired -= 1;
01256 }
01257
01258 if (spaceRequired + currentEntrySize > scratchBufferUsableSize) {
01259 splitEntry(inputTuple);
01260 return false;
01261 }
01262
01263
01264
01265 uint remainingSegLen = computeSegLength(nextSegDesc);
01266
01267
01268
01269 bool rc;
01270 if (inputStartRID >= prevEridPlus1 &&
01271 inputStartRID < prevEridPlus1 + LbmOneByteSize)
01272 {
01273 uint segLen = prevSegBytes + 1;
01274 if (combine) {
01275 segLen += getSegLength(*nextSegDesc);
01276 }
01277 rc = adjustSegLength(*prevSegDesc, segLen);
01278 } else {
01279 rc = adjustSegLength(*nextSegDesc, getSegLength(*nextSegDesc) + 1);
01280 }
01281
01282
01283
01284 if (!rc) {
01285 return addNewMiddleSegment(
01286 inputTuple, prevSrid, prevSegDesc, nextSegDesc, prevSeg,
01287 prevSegBytes, prevZeroBytes);
01288 }
01289
01290
01291
01292
01293 if (combine) {
01294 uint remainingSegDescLen = computeSegDescLength(nextSegDesc);
01295 uint zeroLen = *nextSegDesc & LbmZeroLengthMask;
01296 *prevSegDesc &= ~LbmZeroLengthMask;
01297 *prevSegDesc |= zeroLen;
01298 PBuffer segDesc = nextSegDesc;
01299 for (int i = 0; i < remainingSegDescLen - 1; i++) {
01300 segDesc[i] = segDesc[i + 1];
01301 }
01302 pSegDescEnd--;
01303
01304
01305 } else {
01306 uint size;
01307 setZeroLength(prevZeroBytes - 1, prevSegDesc, size);
01308
01309 if (spaceNeededBefore > spaceNeededAfter) {
01310
01311
01312 uint shiftAmount = spaceNeededBefore - spaceNeededAfter;
01313 uint remainingSegDescLen = computeSegDescLength(nextSegDesc);
01314 PBuffer segDesc = prevSegDesc + spaceNeededAfter + 1;
01315 for (int i = 0; i < remainingSegDescLen; i++) {
01316 segDesc[i] = segDesc[i + shiftAmount];
01317 }
01318 pSegDescEnd -= shiftAmount;
01319 }
01320 }
01321
01322
01323
01324
01325 addNewRid(
01326 nextSegDesc, prevSeg - prevSegBytes - 1, inputStartRID,
01327 remainingSegLen);
01328
01329 currentEntrySize += spaceRequired;
01330 return true;
01331 }
01332
01333 void LbmEntry::addNewRid(
01334 PBuffer nextSegDesc, PBuffer newSeg, LcsRid newRid, uint remainingSegLen)
01335 {
01336
01337
01338 assert(keyDesc.size() == 1);
01339
01340 PBuffer seg = pSegEnd;
01341 for (int i = 0; i < remainingSegLen; i++) {
01342 seg[i - 1] = seg[i];
01343 }
01344 *newSeg = (uint8_t) (1 << (opaqueToInt(newRid) % LbmOneByteSize));
01345 pSegEnd--;
01346 }
01347
01348 void LbmEntry::splitEntry(TupleData &inputTuple)
01349 {
01350 assert(isSingleton(inputTuple));
01351
01352
01353
01354
01355
01356 assert(countSegments() > 1);
01357
01358
01359
01360 uint targetSplitSize = (currentEntrySize - keySize) / 2;
01361
01362
01363 PBuffer segDesc = pSegDescStart;
01364 PBuffer prevPrevSegDesc = NULL;
01365 PBuffer prevSegDesc = NULL;
01366 PBuffer seg = pSegStart;
01367 LcsRid srid = startRID;
01368 LcsRid prevSrid = srid;
01369 uint segBytes;
01370 uint zeroBytes;
01371 while (segDesc < pSegDescEnd) {
01372 prevPrevSegDesc = prevSegDesc;
01373 prevSegDesc = segDesc;
01374 readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01375 seg -= segBytes;
01376 prevSrid = srid;
01377 srid += (segBytes + zeroBytes) * LbmOneByteSize;
01378 if ((segDesc - pSegDescStart) + (pSegStart - seg) >= targetSplitSize) {
01379 break;
01380 }
01381 }
01382
01383
01384
01385
01386
01387
01388 LcsRid &inputStartRID = *((LcsRid*)inputTuple[inputTuple.size() - 3].pData);
01389 if (segDesc >= pSegDescEnd || inputStartRID < prevSrid) {
01390 segDesc = prevSegDesc;
01391 permAssert(prevPrevSegDesc != NULL);
01392 prevSegDesc = prevPrevSegDesc;
01393 seg += segBytes;
01394 srid = prevSrid;
01395 }
01396
01397
01398
01399 TupleData newEntry;
01400 copyToMergeBuffer(newEntry, srid, seg, segDesc);
01401
01402
01403
01404 segDesc -= getZeroLengthByteCount(*prevSegDesc);
01405 *(prevSegDesc) &= ~LbmZeroLengthMask;
01406
01407
01408
01409 pSegDescEnd = segDesc;
01410 pSegEnd = seg;
01411 currentEntrySize =
01412 keySize + (pSegDescEnd - pSegDescStart) + (pSegStart - pSegEnd);
01413
01414
01415
01416 if (inputStartRID < srid) {
01417 bool rc = mergeEntry(inputTuple);
01418
01419
01420 permAssert(rc);
01421 for (int i = 0; i < newEntry.size(); i++) {
01422 inputTuple[i] = newEntry[i];
01423 }
01424 return;
01425 }
01426
01427
01428 mergeIntoSplitEntry(inputTuple, newEntry);
01429 }
01430
01431 void LbmEntry::mergeIntoSplitEntry(
01432 TupleData &inputTuple, TupleData &splitEntry)
01433 {
01434 assert(isSingleton(inputTuple));
01435
01436
01437 boost::scoped_array<FixedBuffer> tempBuffer;
01438 tempBuffer.reset(new FixedBuffer[scratchBufferSize]);
01439 #ifdef DEBUG
01440 memset(tempBuffer.get(), 0xFF, scratchBufferSize);
01441 #endif
01442 memcpy(tempBuffer.get(), scratchBuffer, scratchBufferSize);
01443 LcsRid savStartRID = startRID;
01444 PBuffer savSegStart = pSegStart;
01445 PBuffer savSegEnd = pSegEnd;
01446 PBuffer savSegDescStart = pSegDescStart;
01447 PBuffer savSegDescEnd = pSegDescEnd;
01448 uint savCurrentEntrySize = currentEntrySize;
01449
01450
01451 uint ridField = splitEntry.size() - 3;
01452 memcpy(scratchBuffer, mergeScratchBuffer, scratchBufferSize);
01453 startRID = *((LcsRid*) splitEntry[ridField].pData);
01454 LcsRid splitStartRid = startRID;
01455 pSegDescStart = scratchBuffer + keySize;
01456 pSegDescEnd = pSegDescStart + splitEntry[ridField + 1].cbData;
01457 pSegStart = scratchBuffer + scratchBufferSize;
01458 pSegEnd = pSegStart - splitEntry[ridField + 2].cbData;
01459 currentEntrySize =
01460 keySize + splitEntry[ridField + 1].cbData +
01461 splitEntry[ridField + 2].cbData;
01462
01463
01464
01465 bool rc = mergeEntry(inputTuple);
01466 permAssert(rc);
01467
01468
01469 copyToMergeBuffer(inputTuple, splitStartRid, pSegStart, pSegDescStart);
01470
01471
01472 memcpy(scratchBuffer, tempBuffer.get(), scratchBufferSize);
01473 startRID = savStartRID;
01474 pSegStart = savSegStart;
01475 pSegEnd = savSegEnd;
01476 pSegDescStart = savSegDescStart;
01477 pSegDescEnd = savSegDescEnd;
01478 currentEntrySize = savCurrentEntrySize;
01479 }
01480
01481 TupleData const &LbmEntry::produceEntryTuple()
01482 {
01483
01484
01485
01486 if (isSingleton()) {
01487 return entryTuple;
01488 }
01489
01490
01491
01492
01493
01494 if (isSegmentOpen()) {
01495 closeCurrentSegment();
01496 }
01497
01498
01499
01500
01501 uint RIDField = entryTuple.size() - 3;
01502 *((LcsRid *)entryTuple[RIDField].pData) = startRID;
01503
01504
01505
01506
01507 uint segmentField = entryTuple.size() - 1;
01508 entryTuple[segmentField].cbData = pSegStart - pSegEnd;
01509 entryTuple[segmentField].pData = pSegEnd;
01510
01511
01512
01513
01514 uint segmentDescField = entryTuple.size() - 2;
01515
01516
01517
01518
01519
01520
01521
01522 if ((pSegDescStart == pSegDescEnd) || (pSegDescStart == NULL)) {
01523
01524
01525
01526
01527 entryTuple[segmentDescField].cbData = 0;
01528 entryTuple[segmentDescField].pData = NULL;
01529 } else {
01530
01531
01532
01533
01534
01535 uint rowCount = getRowCount();
01536
01537 if (rowCount == entryTuple[segmentField].cbData * 8 &&
01538 rowCount <= maxSegSize * 8)
01539 {
01540
01541
01542
01543
01544
01545
01546
01547
01548
01549 entryTuple[segmentDescField].cbData = 0;
01550 entryTuple[segmentDescField].pData = NULL;
01551 } else {
01552
01553
01554 openLastSegment();
01555 closeCurrentSegment();
01556 entryTuple[segmentDescField].cbData = pSegDescEnd - pSegDescStart;
01557 entryTuple[segmentDescField].pData = pSegDescStart;
01558 }
01559 }
01560
01561 return entryTuple;
01562 }
01563
01564 string LbmEntry::printDatum(
01565 TupleDatum const &tupleDatum,
01566 bool reverseByteOrder)
01567 {
01568 ostringstream byteStr;
01569 PConstBuffer ptr = tupleDatum.pData;
01570 uint size = tupleDatum.cbData;
01571
01572 if (ptr) {
01573 if (reverseByteOrder) {
01574 PConstBuffer tmpPtr = ptr + size;
01575 while (size > 0) {
01576 tmpPtr --;
01577 byteStr << hex << setw(2) << setfill('0') << (uint)(*tmpPtr);
01578 size --;
01579 }
01580 } else {
01581 while (size > 0) {
01582 byteStr << hex << setw(2) << setfill('0') << (uint)(*ptr);
01583 ptr ++;
01584 size --;
01585 }
01586 }
01587 }
01588
01589 return byteStr.str();
01590 }
01591
01592 string LbmEntry::printByte(uint8_t byte)
01593 {
01594 ostringstream byteStr;
01595 uint byteLength = 8;
01596
01597 while (byteLength > 0) {
01598 byteStr << byte % 2 << " ";
01599 byte = byte >> 1;
01600 byteLength --;
01601 }
01602 return byteStr.str();
01603 }
01604
01605 string LbmEntry::dumpSeg(PBuffer segDesc, PBuffer segDescEnd, PBuffer seg)
01606 {
01607 uint segBytes;
01608 ostringstream entryLine;
01609 uint zeroBytes;
01610 uint startPos = 0;
01611 uint i;
01612
01613 while (segDesc < segDescEnd) {
01614
01615
01616
01617 readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01618
01619
01620
01621
01622 for (i = 0; i < segBytes + startPos; i++) {
01623 seg --;
01624 entryLine << printByte((uint8_t)(*seg));
01625 if (i % 5 == 4) {
01626 entryLine << "\n";
01627 } else {
01628 entryLine << " ";
01629 }
01630 }
01631
01632
01633
01634
01635 for (; i < zeroBytes + segBytes + startPos; i++) {
01636 entryLine << "0 0 0 0 0 0 0 0 ";
01637 if (i % 5 == 4) {
01638 entryLine << "\n";
01639 } else {
01640 entryLine << " ";
01641 }
01642 }
01643 }
01644 return entryLine.str();
01645 }
01646
01647 string LbmEntry::dumpSegRID(
01648 PBuffer segDesc,
01649 PBuffer segDescEnd,
01650 PBuffer seg,
01651 string prefix,
01652 LcsRid srid)
01653 {
01654 uint segBytes;
01655 uint zeroBytes;
01656 ostringstream entryTrace;
01657 uint8_t bitmapByte;
01658 uint byteLength;
01659 uint i;
01660
01661 while (segDesc < segDescEnd) {
01662
01663
01664
01665 readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
01666
01667 for (i = 0; i < segBytes; i++) {
01668 seg --;
01669 bitmapByte = *(uint8_t *)seg;
01670 for (byteLength = 8; byteLength > 0; byteLength--) {
01671 if (bitmapByte % 2) {
01672 entryTrace << prefix << opaqueToInt(srid) << "]\n";
01673 }
01674 bitmapByte = bitmapByte >> 1;
01675 srid ++;
01676 }
01677 }
01678
01679
01680
01681
01682 srid += zeroBytes * LbmOneByteSize;
01683 }
01684 return entryTrace.str();
01685 }
01686
01687 string LbmEntry::dumpBitmap(PBuffer seg, uint segBytes)
01688 {
01689 ostringstream entryLine;
01690
01691
01692
01693 for (uint i = 0; i < segBytes; i++) {
01694 seg --;
01695 entryLine << printByte((uint8_t)(*seg));
01696 if (i % 5 == 4) {
01697 entryLine << "\n";
01698 } else {
01699 entryLine << " ";
01700 }
01701 }
01702 entryLine << "\n";
01703 return entryLine.str();
01704 }
01705
01706 string LbmEntry::dumpBitmapRID(
01707 PBuffer seg,
01708 uint segBytes,
01709 string prefix,
01710 LcsRid srid)
01711 {
01712 ostringstream entryTrace;
01713 uint byteLength;
01714 uint8_t bitmapByte;
01715
01716
01717
01718
01719
01720 for (uint i = 0; i < segBytes; i++) {
01721 seg --;
01722 bitmapByte = *(uint8_t *)seg;
01723 for (byteLength = 8; byteLength > 0; byteLength--) {
01724 if (bitmapByte % 2) {
01725 entryTrace << prefix << opaqueToInt(srid) << "]\n";
01726 }
01727 bitmapByte = bitmapByte >> 1;
01728 srid ++;
01729 }
01730 }
01731 return entryTrace.str();
01732 }
01733
01734 string LbmEntry::toString(TupleData const&inputTuple, bool printRID)
01735 {
01736 if (printRID) {
01737 return toRIDString(inputTuple);
01738 } else {
01739 return toBitmapString(inputTuple);
01740 }
01741 }
01742
01743 string LbmEntry::toBitmapString(TupleData const&inputTuple)
01744 {
01745 ostringstream tupleTrace;
01746 uint tupleSize = inputTuple.size();
01747 LcsRid inputStartRID = *((LcsRid *)inputTuple[tupleSize - 3].pData);
01748
01749 tupleTrace << "Key [";
01750
01751 for (uint i = 0; i < tupleSize - 3 ; i ++) {
01752 tupleTrace << printDatum(inputTuple[i], true);
01753 if (i < tupleSize - 4) {
01754 tupleTrace << "|";
01755 }
01756 }
01757
01758 tupleTrace << "] startRID ["
01759 << opaqueToInt(inputStartRID)
01760 << "] ";
01761
01762 if (isSingleton(inputTuple)) {
01763 tupleTrace <<"Singleton";
01764 } else {
01765 PBuffer segDesc = (PBuffer)inputTuple[tupleSize - 2].pData;
01766
01767
01768
01769 PBuffer seg = (PBuffer)inputTuple[tupleSize - 1].pData
01770 + inputTuple[tupleSize - 1].cbData;
01771
01772 if (segDesc) {
01773 tupleTrace << "Compressed Bitmap: SegDesc "
01774 << inputTuple[tupleSize - 2].cbData
01775 << " bytes, Seg "
01776 << inputTuple[tupleSize - 1].cbData
01777 << " bytes.\n";
01778
01779
01780
01781
01782 PBuffer segDescEnd = segDesc + inputTuple[tupleSize - 2].cbData;
01783
01784 tupleTrace << dumpSeg(segDesc, segDescEnd, seg);
01785 } else {
01786 tupleTrace << "Single Bitmap: Seg "
01787 << inputTuple[tupleSize - 1].cbData
01788 << " bytes\n";
01789
01790
01791
01792 tupleTrace << dumpBitmap(seg, inputTuple[tupleSize - 1].cbData);
01793 }
01794 }
01795
01796 tupleTrace << "\n";
01797
01798 return tupleTrace.str();
01799 }
01800
01801 string LbmEntry::toRIDString(TupleData const &inputTuple)
01802 {
01803 ostringstream tupleTrace;
01804 ostringstream keyTrace;
01805 uint tupleSize = inputTuple.size();
01806 LcsRid inputStartRID = *((LcsRid *)inputTuple[tupleSize - 3].pData);
01807
01808 keyTrace << "Key [";
01809 for (uint i = 0; i < tupleSize - 3 ; i ++) {
01810 keyTrace << printDatum(inputTuple[i], true);
01811 if (i < tupleSize - 4) {
01812 keyTrace << "|";
01813 }
01814 }
01815
01816 keyTrace << "] ";
01817
01818 tupleTrace << keyTrace.str()
01819 << "startRID ["
01820 << opaqueToInt(inputStartRID)
01821 << "] ";
01822
01823 if (isSingleton(inputTuple)) {
01824 tupleTrace <<"Singleton\n";
01825 } else {
01826 keyTrace << "RID [";
01827 PBuffer segDesc = (PBuffer)inputTuple[tupleSize - 2].pData;
01828
01829
01830
01831 PBuffer seg = (PBuffer)inputTuple[tupleSize - 1].pData
01832 + inputTuple[tupleSize - 1].cbData;
01833
01834 tupleTrace << "\n";
01835
01836 if (segDesc) {
01837
01838
01839
01840 PBuffer segDescEnd = segDesc + inputTuple[tupleSize - 2].cbData;
01841
01842 tupleTrace <<
01843 dumpSegRID(
01844 segDesc,
01845 segDescEnd,
01846 seg,
01847 keyTrace.str(),
01848 inputStartRID);
01849 } else {
01850
01851
01852
01853 tupleTrace <<
01854 dumpBitmapRID(
01855 seg,
01856 inputTuple[tupleSize - 1].cbData,
01857 keyTrace.str(),
01858 inputStartRID);
01859 }
01860 }
01861
01862 return tupleTrace.str();
01863 }
01864
01865 string LbmEntry::toString()
01866 {
01867 ostringstream tupleTrace;
01868 uint tupleSize = entryTuple.size();
01869
01870 tupleTrace << "Key [";
01871
01872 for (uint i = 0; i < tupleSize - 3 ; i ++) {
01873 tupleTrace << printDatum(entryTuple[i], true);
01874 if (i < tupleSize - 4) {
01875 tupleTrace << "|";
01876 }
01877 }
01878
01879 tupleTrace << "] RID ["
01880 << opaqueToInt(*(LcsRid *)entryTuple[tupleSize - 3].pData)
01881 << "] ";
01882
01883 if (isSingleton()) {
01884 tupleTrace <<"Singleton";
01885 } else {
01886 PBuffer pSegDesc = pSegDescStart;
01887
01888
01889
01890 PBuffer pSeg = pSegStart;
01891
01892 if (pSegDesc) {
01893 tupleTrace << "Compressed Bitmap: SegDesc "
01894 << pSegDescEnd - pSegDescStart
01895 << " bytes, Seg "
01896 << pSegStart - pSegEnd
01897 << " bytes.\n";
01898
01899
01900
01901
01902 tupleTrace << dumpSeg(pSegDesc, pSegDescEnd, pSeg);
01903 } else {
01904 tupleTrace << "Single Bitmap: Seg "
01905 << pSegStart - pSegEnd
01906 << " bytes\n";
01907
01908
01909
01910 tupleTrace << dumpBitmap(pSeg, pSegStart - pSegEnd);
01911 }
01912 }
01913
01914 tupleTrace << "\n";
01915
01916 return tupleTrace.str();
01917 }
01918
01919 void LbmEntry::getSizeBounds(
01920 TupleDescriptor const &indexTupleDesc, uint pageSize,
01921 uint &minEntrySize, uint &maxEntrySize)
01922 {
01923 uint tupleSize = indexTupleDesc.size();
01924 TupleStorageByteLength segDescFieldMaxLength =
01925 indexTupleDesc[tupleSize - 2].cbStorage;
01926 TupleStorageByteLength segFieldMaxLength =
01927 indexTupleDesc[tupleSize - 1].cbStorage;
01928
01929
01930
01931
01932
01933
01934
01935 assert(segDescFieldMaxLength == segFieldMaxLength);
01936
01937
01938
01939
01940
01941 minEntrySize =
01942 indexTupleDesc.getMaxByteCount()
01943 - segDescFieldMaxLength
01944 - segFieldMaxLength
01945 + 5;
01946
01947 if (minEntrySize > pageSize) {
01948 minEntrySize = pageSize;
01949 }
01950
01951
01952
01953
01954
01955 maxEntrySize =
01956 indexTupleDesc.getMaxByteCount()
01957 - segDescFieldMaxLength;
01958
01959
01960
01961 uint maxEntrySizeForPage = pageSize / LbmMinEntryPerPage;
01962
01963 if (maxEntrySizeForPage < minEntrySize) {
01964 maxEntrySize = minEntrySize;
01965 } else {
01966 maxEntrySize = min(maxEntrySizeForPage, maxEntrySize);
01967 }
01968 assert(minEntrySize <= maxEntrySize);
01969 }
01970
01971 void LbmEntry::generateRIDs(
01972 TupleData const &inputTuple, vector<LcsRid> &ridValues)
01973 {
01974 uint tupleSize = inputTuple.size();
01975 LcsRid inputStartRID = *((LcsRid *)inputTuple[tupleSize - 3].pData);
01976
01977 if (isSingleton(inputTuple)) {
01978 ridValues.push_back(inputStartRID);
01979 } else {
01980 PBuffer segDesc = (PBuffer) inputTuple[tupleSize - 2].pData;
01981
01982
01983
01984 PBuffer seg =
01985 (PBuffer) inputTuple[tupleSize - 1].pData
01986 + inputTuple[tupleSize - 1].cbData;
01987 if (segDesc) {
01988
01989
01990
01991 PBuffer segDescEnd = segDesc + inputTuple[tupleSize - 2].cbData;
01992
01993 generateSegRIDs(segDesc, segDescEnd, seg, ridValues, inputStartRID);
01994 } else {
01995
01996
01997
01998 generateBitmapRIDs(
01999 seg, inputTuple[tupleSize - 1].cbData, ridValues,
02000 inputStartRID);
02001 }
02002 }
02003 }
02004
02005 void LbmEntry::generateSegRIDs(
02006 PBuffer segDesc, PBuffer segDescEnd, PBuffer seg,
02007 vector<LcsRid> &ridValues, LcsRid srid)
02008 {
02009 uint segBytes;
02010 uint zeroBytes;
02011 uint8_t bitmapByte;
02012 uint byteLength;
02013 uint i;
02014
02015 while (segDesc < segDescEnd) {
02016 readSegDescAndAdvance(segDesc, segBytes, zeroBytes);
02017
02018 for (i = 0; i < segBytes; i++) {
02019 seg --;
02020 bitmapByte = *(uint8_t *)seg;
02021 for (byteLength = 8; byteLength > 0; byteLength--) {
02022 if (bitmapByte % 2) {
02023 ridValues.push_back(srid);
02024 }
02025 bitmapByte = bitmapByte >> 1;
02026 srid ++;
02027 }
02028 }
02029
02030
02031
02032
02033 srid += zeroBytes * LbmOneByteSize;
02034 }
02035 }
02036
02037 void LbmEntry::generateBitmapRIDs(
02038 PBuffer seg, uint segBytes, vector<LcsRid> &ridValues, LcsRid srid)
02039 {
02040 uint byteLength;
02041 uint8_t bitmapByte;
02042
02043 for (uint i = 0; i < segBytes; i++) {
02044 seg --;
02045 bitmapByte = *(uint8_t *)seg;
02046 for (byteLength = 8; byteLength > 0; byteLength--) {
02047 if (bitmapByte % 2) {
02048 ridValues.push_back(srid);
02049 }
02050 bitmapByte = bitmapByte >> 1;
02051 srid ++;
02052 }
02053 }
02054 }
02055
02056 uint LbmEntry::getScratchBufferSize(uint bitmapColSize)
02057 {
02058
02059
02060
02061 return bitmapColSize + sizeof(LcsRid) + LbmZeroLengthExtended;
02062 }
02063
02064 uint LbmEntry::getMaxBitmapSize(uint bitmapColSize)
02065 {
02066
02067
02068
02069 uint descriptorBytesPerSegment = 1;
02070 uint maxNumSegments =
02071 (bitmapColSize / (LbmMaxSegSize + descriptorBytesPerSegment))
02072 + ((bitmapColSize % (LbmMaxSegSize + descriptorBytesPerSegment)) == 0
02073 ? 0
02074 : 1);
02075 return (bitmapColSize - maxNumSegments * descriptorBytesPerSegment);
02076 }
02077
02078 bool LbmEntry::containsRid(LcsRid rid)
02079 {
02080 LcsRid startRid = startRID;
02081
02082 if (isSingleton()) {
02083 return (startRid == rid);
02084 } else {
02085 PBuffer pSegDesc = pSegDescStart;
02086 PBuffer pSeg = pSegStart;
02087 if (pSegDesc) {
02088
02089 while (pSegDesc < pSegDescEnd) {
02090 uint nSegBytes;
02091 uint nZeroBytes;
02092
02093 readSegDescAndAdvance(pSegDesc, nSegBytes, nZeroBytes);
02094 int rc = segmentContainsRid(rid, startRid, pSeg, nSegBytes);
02095 if (rc == 0) {
02096 return true;
02097 } else if (rc < 0) {
02098 return false;
02099 } else {
02100
02101
02102 startRid += (nSegBytes + nZeroBytes) * LbmOneByteSize;
02103 pSeg -= nSegBytes;
02104
02105
02106 if (rid < startRid) {
02107 return false;
02108 }
02109 }
02110 }
02111 return false;
02112 } else {
02113
02114 int rc =
02115 segmentContainsRid(
02116 rid,
02117 startRid,
02118 pSegStart,
02119 pSegStart - pSegEnd);
02120 if (rc == 0) {
02121 return true;
02122 } else {
02123 return false;
02124 }
02125 }
02126 }
02127 }
02128
02129 int LbmEntry::segmentContainsRid(
02130 LcsRid rid,
02131 LcsRid startRid,
02132 PBuffer pSeg,
02133 uint nSegBytes)
02134 {
02135 if (rid >= startRid && rid < startRid + (nSegBytes * LbmOneByteSize)) {
02136
02137
02138 int byteNum = (opaqueToInt(rid - startRid) / LbmOneByteSize) + 1;
02139 uint8_t setRid = 1 << (opaqueToInt(rid) % LbmOneByteSize);
02140 if (pSeg[-byteNum] & setRid) {
02141 return 0;
02142 } else {
02143 return -1;
02144 }
02145 } else {
02146 return 1;
02147 }
02148 }
02149
02150 bool LbmEntry::inRange(LcsRid rid)
02151 {
02152 LcsRid endRID = startRID + getRowCount() - 1;
02153 return (rid >= startRID && rid <= endRID);
02154 }
02155
02156 void LbmEntry::validateEntrySize()
02157 {
02158 if (currentEntrySize <= scratchBufferSize) {
02159 return;
02160 }
02161
02162 TupleData keyData;
02163 keyData.resize(keyDesc.size());
02164 for (uint i = 0; i < keyData.size(); i++) {
02165 keyData[i] = entryTuple[i];
02166 }
02167
02168 std::ostringstream oss;
02169 TuplePrinter tuplePrinter;
02170 tuplePrinter.print(oss, keyDesc, keyData);
02171 throw FennelResource::instance().bitmapEntryTooLong(
02172 currentEntrySize,
02173 scratchBufferSize,
02174 oss.str());
02175 }
02176
02177 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmEntry.cpp#26 $");
02178
02179