00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterNodeWriter.h"
00024 #include "fennel/tuple/TupleAccessor.h"
00025 #include <boost/scoped_array.hpp>
00026
00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $");
00028
00029 LcsClusterNodeWriter::LcsClusterNodeWriter(
00030 BTreeDescriptor const &treeDescriptorInit,
00031 SegmentAccessor const &accessorInit,
00032 TupleDescriptor const &colTupleDescInit,
00033 SharedTraceTarget pTraceTargetInit,
00034 std::string nameInit) :
00035 LcsClusterAccessBase(treeDescriptorInit),
00036 TraceSource(pTraceTargetInit, nameInit)
00037 {
00038 scratchAccessor = accessorInit;
00039 bufferLock.accessSegment(scratchAccessor);
00040 bTreeWriter = SharedBTreeWriter(
00041 new BTreeWriter(treeDescriptorInit, scratchAccessor, true));
00042 colTupleDesc = colTupleDescInit;
00043 clusterDump =
00044 SharedLcsClusterDump(
00045 new LcsClusterDump(
00046 treeDescriptorInit,
00047 colTupleDesc,
00048 TRACE_FINE,
00049 pTraceTargetInit,
00050 nameInit));
00051 nClusterCols = 0;
00052 pHdr = 0;
00053 hdrSize = 0;
00054 pIndexBlock = 0;
00055 pBlock = 0;
00056 szBlock = 0;
00057 minSzLeft = 0;
00058 batchDirs.reset();
00059 pValBank.reset();
00060 oValBank.reset();
00061 batchOffset.reset();
00062 batchCount.reset();
00063 szLeft = 0;
00064 nBits.reset();
00065 nextWidthChange.reset();
00066 arraysAllocated = false;
00067 valBankStart.reset();
00068 bForceMode.reset();
00069 forceModeCount.reset();
00070 maxValueSize.reset();
00071 }
00072
00073 LcsClusterNodeWriter::~LcsClusterNodeWriter()
00074 {
00075 close();
00076 }
00077
00078 void LcsClusterNodeWriter::close()
00079 {
00080
00081 if (clusterLock.isLocked()) {
00082 clusterLock.flushPage(true);
00083 }
00084 clusterLock.unlock();
00085
00086 bTreeWriter.reset();
00087 batchDirs.reset();
00088 pValBank.reset();
00089 valBankStart.reset();
00090 forceModeCount.reset();
00091 bForceMode.reset();
00092 oValBank.reset();
00093 batchOffset.reset();
00094 batchCount.reset();
00095 nBits.reset();
00096 nextWidthChange.reset();
00097 maxValueSize.reset();
00098 attrAccessors.reset();
00099 }
00100
00101 bool LcsClusterNodeWriter::getLastClusterPageForWrite(
00102 PLcsClusterNode &pBlock, LcsRid &firstRid)
00103 {
00104
00105
00106
00107 if (bTreeWriter->searchLast() == false) {
00108 bTreeWriter->endSearch();
00109 return false;
00110 }
00111
00112 bTreeWriter->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00113 clusterPageId = readClusterPageId();
00114 clusterLock.lockExclusive(clusterPageId);
00115 pBlock = &(clusterLock.getNodeForWrite());
00116 firstRid = pBlock->firstRID;
00117
00118
00119
00120 bTreeWriter->endSearch();
00121
00122 if (isTracingLevel(TRACE_FINE)) {
00123 FENNEL_TRACE(
00124 TRACE_FINE,
00125 "Calling ClusterDump from getLastClusterPageForWrite");
00126 clusterDump->dump(opaqueToInt(clusterPageId), pBlock, szBlock);
00127 }
00128
00129 return true;
00130 }
00131
00132 PLcsClusterNode LcsClusterNodeWriter::allocateClusterPage(LcsRid firstRid)
00133 {
00134
00135
00136
00137 PageId prevPageId = NULL_PAGE_ID;
00138
00139 if (clusterLock.isLocked()) {
00140
00141 prevPageId = clusterLock.getPageId();
00142
00143
00144
00145
00146 clusterLock.flushPage(true);
00147 }
00148
00149 clusterPageId = clusterLock.allocatePage();
00150 if (prevPageId != NULL_PAGE_ID) {
00151 segmentAccessor.pSegment->setPageSuccessor(prevPageId, clusterPageId);
00152 }
00153 bTreeRid = firstRid;
00154 bTreeTupleData[0].pData = reinterpret_cast<uint8_t *> (&firstRid);
00155 bTreeTupleData[1].pData = reinterpret_cast<uint8_t *> (&clusterPageId);
00156 bTreeWriter->insertTupleData(bTreeTupleData, DUP_FAIL);
00157 return &(clusterLock.getNodeForWrite());
00158 }
00159
00160 void LcsClusterNodeWriter::init(
00161 uint nColumn, PBuffer iBlock, PBuffer *pB, uint szB)
00162 {
00163 nClusterCols = nColumn;
00164 pIndexBlock = iBlock;
00165 pBlock = pB;
00166 szBlock = szB;
00167 pHdr = (PLcsClusterNode) pIndexBlock;
00168
00169 hdrSize = getClusterSubHeaderSize(nClusterCols);
00170
00171
00172
00173
00174 setHdrOffsets(pHdr);
00175
00176 minSzLeft = nClusterCols * (LcsMaxLeftOver * sizeof(uint16_t) +
00177 sizeof(LcsBatchDir));
00178
00179 allocArrays();
00180 }
00181
00182 void LcsClusterNodeWriter::openNew(LcsRid startRID)
00183 {
00184 int i;
00185
00186
00187 pHdr->firstRID = startRID;
00188 pHdr->nColumn = nClusterCols;
00189 pHdr->nBatch = 0;
00190 pHdr->oBatch = hdrSize;
00191
00192 for (i = 0; i < nClusterCols; i++) {
00193 lastVal[i] = szBlock;
00194 firstVal[i] = (uint16_t) szBlock;
00195 nVal[i] = 0;
00196 delta[i] = 0;
00197 batchDirs[i].mode = LCS_COMPRESSED;
00198 batchDirs[i].nVal = 0;
00199 batchDirs[i].nRow = 0;
00200 batchDirs[i].oVal = 0;
00201 batchDirs[i].oLastValHighMark = lastVal[i];
00202 batchDirs[i].nValHighMark = nVal[i];
00203 batchOffset[i] = hdrSize;
00204
00205 nBits[i] = 0;
00206 nextWidthChange[i] = 1;
00207 batchCount[i] = 0;
00208 }
00209
00210
00211
00212
00213 szLeft = szBlock - hdrSize -
00214 (2 * sizeof(LcsBatchDir)) * nClusterCols;
00215 szLeft = std::max(szLeft, 0);
00216 assert(szLeft >= 0);
00217 }
00218
00219 bool LcsClusterNodeWriter::openAppend(
00220 uint *nValOffsets, uint16_t *lastValOffsets, RecordNum &nrows)
00221 {
00222 int i;
00223
00224
00225 szLeft = lastVal[nClusterCols - 1] - pHdr->oBatch -
00226 (pHdr->nBatch + 2* nClusterCols) * sizeof(LcsBatchDir);
00227 szLeft = std::max(szLeft, 0);
00228 assert(szLeft >= 0);
00229
00230
00231
00232 nrows = moveFromIndexToTemp();
00233
00234 for (i = 0; i < nClusterCols; i++) {
00235 nValOffsets[i] = nVal[i];
00236 lastValOffsets[i] = lastVal[i];
00237 memset(&batchDirs[i], 0, sizeof(LcsBatchDir));
00238
00239 batchDirs[i].oLastValHighMark = lastVal[i];
00240 batchDirs[i].nValHighMark = nVal[i];
00241 batchDirs[i].mode = LCS_COMPRESSED;
00242
00243
00244 nBits[i] = 0;
00245 nextWidthChange[i] = 1;
00246
00247 oValBank[i] = 0;
00248 batchCount[i] = pHdr->nBatch / nClusterCols;
00249 }
00250
00251 return (szLeft == 0);
00252 }
00253
00254 void LcsClusterNodeWriter::describeLastBatch(
00255 uint column, uint &dRow, uint &recSize)
00256 {
00257 PLcsBatchDir pBatch;
00258
00259 pBatch = (PLcsBatchDir) (pBlock[column] + batchOffset[column]);
00260 dRow = pBatch[batchCount[column] -1].nRow % 8;
00261 recSize = pBatch[batchCount[column] -1].recSize;
00262 }
00263
00264 uint16_t LcsClusterNodeWriter::getNextVal(uint column, uint16_t thisVal)
00265 {
00266 if (thisVal && thisVal != szBlock) {
00267 return
00268 (uint16_t) (thisVal +
00269 attrAccessors[column].getStoredByteCount(
00270 pBlock[column] + thisVal));
00271 } else {
00272 return 0;
00273 }
00274 }
00275
00276 void LcsClusterNodeWriter::rollBackLastBatch(uint column, PBuffer pBuf)
00277 {
00278 uint i;
00279 PLcsBatchDir pBatch;
00280 uint16_t *pValOffsets;
00281
00282 uint8_t *pBit;
00283 WidthVec w;
00284 PtrVec p;
00285 uint iV;
00286
00287 uint16_t rows[LcsMaxRollBack];
00288 int origSzLeft;
00289 uint len;
00290
00291
00292 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
00293 batchDirs[column] = pBatch[batchCount[column] -1];
00294
00295
00296 origSzLeft = lastVal[column] - batchOffset[column] -
00297 (batchCount[column]+2)*sizeof(LcsBatchDir);
00298
00299 if ((batchDirs[column].nRow > 8) || (batchDirs[column].nRow % 8) == 0) {
00300 return;
00301 }
00302
00303 if (batchDirs[column].mode == LCS_COMPRESSED) {
00304
00305 iV = bitVecWidth(calcWidth(batchDirs[column].nVal), w);
00306
00307
00308 pBit = pBlock[column] + batchDirs[column].oVal +
00309 batchDirs[column].nVal * sizeof(uint16_t);
00310
00311
00312 bitVecPtr(batchDirs[column].nRow, iV, w, p, pBit);
00313
00314
00315 readBitVecs(rows, iV, w, p, 0, batchDirs[column].nRow);
00316
00317
00318 pValOffsets = (uint16_t *)(pBlock[column] + batchDirs[column].oVal);
00319
00320
00321 for (i = 0; i < batchDirs[column].nRow;
00322 i++, pBuf += batchDirs[column].recSize)
00323 {
00324 len =
00325 attrAccessors[column].getStoredByteCount(
00326 pBlock[column] + pValOffsets[rows[i]]);
00327 memcpy(pBuf, pBlock[column] + pValOffsets[rows[i]], len);
00328 }
00329
00330 } else if (batchDirs[column].mode == LCS_FIXED) {
00331
00332
00333 memcpy(
00334 pBuf,
00335 pBlock[column] + batchDirs[column].oVal,
00336 batchDirs[column].nRow * batchDirs[column].recSize);
00337 } else {
00338
00339
00340 pValOffsets = (uint16_t *)(pBlock[column] + batchDirs[column].oVal);
00341
00342
00343 for (i = 0; i < batchDirs[column].nRow;
00344 i++, pBuf += batchDirs[column].recSize)
00345 {
00346 len =
00347 attrAccessors[column].getStoredByteCount(
00348 pBlock[column] + pValOffsets[i]);
00349 memcpy(pBuf, pBlock[column] + pValOffsets[i], len);
00350 }
00351 }
00352
00353
00354 batchCount[column]--;
00355
00356 batchOffset[column] = batchDirs[column].oVal;
00357
00358
00359 memmove(
00360 pBlock[column] + batchOffset[column],
00361 pBatch,
00362 batchCount[column] * sizeof(LcsBatchDir));
00363
00364
00365
00366
00367
00368 int newSz;
00369 newSz = lastVal[column] - batchOffset[column] -
00370 (batchCount[column] + 2) * sizeof(LcsBatchDir);
00371 szLeft += (newSz - origSzLeft);
00372 szLeft = std::max(szLeft, 0);
00373 assert(szLeft >= 0);
00374
00375
00376 nBits[column] = 0;
00377 nextWidthChange[column] = 1;
00378
00379
00380 batchDirs[column].mode = LCS_COMPRESSED;
00381 batchDirs[column].nVal = 0;
00382 batchDirs[column].nRow = 0;
00383 batchDirs[column].oVal = 0;
00384 batchDirs[column].recSize = 0;
00385 }
00386
00387
00388
00389 bool LcsClusterNodeWriter::addValue(uint column, bool bFirstTimeInBatch)
00390 {
00391
00392 szLeft -= sizeof(uint16_t);
00393
00394
00395 if (szLeft < ((int) nClusterCols * LcsMaxSzLeftError)) {
00396
00397 szLeft += sizeof(uint16_t);
00398 assert(szLeft >= 0);
00399 return false;
00400 }
00401
00402 if (bFirstTimeInBatch) {
00403
00404
00405 batchDirs[column].nVal++;
00406
00407
00408
00409 if (batchDirs[column].nVal == nextWidthChange[column]) {
00410
00411
00412 nBits[column] = calcWidth(batchDirs[column].nVal);
00413 nextWidthChange[column] = (1 << nBits[column]) + 1;
00414 }
00415 }
00416
00417 return true;
00418 }
00419
00420
00421
00422 bool LcsClusterNodeWriter::addValue(uint column, PBuffer pVal, uint16_t *oVal)
00423 {
00424 uint16_t lastValOffset;
00425 int oldSzLeft = szLeft;
00426 uint szVal = attrAccessors[column].getStoredByteCount(pVal);
00427
00428
00429
00430
00431
00432 if (bForceMode[column] == fixed) {
00433 if (szVal > maxValueSize[column]) {
00434 szLeft -= batchDirs[column].nVal *
00435 (szVal - maxValueSize[column]);
00436 maxValueSize[column] = szVal;
00437 }
00438 }
00439
00440
00441
00442
00443
00444
00445 if (bForceMode[column] == fixed) {
00446 szLeft -= maxValueSize[column];
00447 } else {
00448
00449
00450
00451
00452 szLeft -= (sizeof(uint16_t) + szVal) ;
00453 }
00454
00455
00456 if (szLeft < ((int) nClusterCols * LcsMaxSzLeftError)) {
00457
00458 szLeft = oldSzLeft;
00459 assert(szLeft >= 0);
00460 return false;
00461 }
00462
00463
00464
00465 lastValOffset = lastVal[column] - szVal;
00466
00467
00468 batchDirs[column].nVal++;
00469
00470
00471
00472 if (batchDirs[column].nVal == nextWidthChange[column]) {
00473
00474
00475 nBits[column] = calcWidth(batchDirs[column].nVal);
00476 nextWidthChange[column] = (1 << nBits[column]) + 1;
00477 }
00478
00479 lastVal[column] = lastValOffset;
00480
00481
00482
00483
00484 if (fixed == bForceMode[column]) {
00485 memcpy(pValBank[column] + lastValOffset, pVal, szVal);
00486 } else {
00487 memcpy(pBlock[column] + lastValOffset, pVal, szVal);
00488 }
00489
00490
00491 *oVal = lastValOffset;
00492
00493 nVal[column]++;
00494
00495 return true;
00496 }
00497
00498 void LcsClusterNodeWriter::undoValue(
00499 uint column, PBuffer pVal, bool bFirstInBatch)
00500 {
00501
00502
00503
00504
00505 uint szVal =
00506 (pVal) ? attrAccessors[column].getStoredByteCount(pVal) : 0;
00507
00508
00509 szLeft += (sizeof(uint16_t) + szVal) ;
00510 assert(szLeft >= 0);
00511
00512
00513 if (bFirstInBatch) {
00514
00515 batchDirs[column].nVal--;
00516
00517
00518 if (batchDirs[column].nVal == 0) {
00519 nextWidthChange[column] = 1;
00520 } else {
00521
00522
00523 nBits[column] = calcWidth(batchDirs[column].nVal);
00524 nextWidthChange[column] = (1 << nBits[column]) + 1;
00525 }
00526 }
00527
00528 if (pVal) {
00529
00530 lastVal[column] += szVal;
00531 nVal[column]--;
00532 }
00533 }
00534
00535 void LcsClusterNodeWriter::putCompressedBatch(
00536 uint column, PBuffer pRows, PBuffer pBuf)
00537 {
00538 uint i, j, b;
00539 uint iRow;
00540 uint nByte;
00541 uint8_t *pBit;
00542 uint16_t *pOffs;
00543 PLcsBatchDir pBatch;
00544
00545 WidthVec w;
00546 PtrVec p;
00547 uint iV;
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560 if (batchDirs[column].nRow > 8) {
00561 uint len;
00562 pOffs = (uint16_t *)(pBlock[column] + batchDirs[column].oVal);
00563 for (i = round8Boundary((uint32_t) batchDirs[column].nRow);
00564 i < batchDirs[column].nRow; i++, pBuf += batchDirs[column].recSize)
00565 {
00566 iRow = ((uint16_t *) pRows)[i];
00567 len =
00568 attrAccessors[column].getStoredByteCount(
00569 pBlock[column] + pOffs[iRow]);
00570 memcpy(pBuf, pBlock[column] + pOffs[iRow], len);
00571 }
00572 batchDirs[column].nRow =
00573 round8Boundary((uint32_t) batchDirs[column].nRow);
00574 }
00575
00576
00577 iV = bitVecWidth(nBits[column], w);
00578
00579
00580 pBit = pBlock[column] + batchDirs[column].oVal +
00581 batchDirs[column].nVal*sizeof(uint16_t);
00582
00583
00584 nByte = bitVecPtr(batchDirs[column].nRow, iV, w, p, pBit);
00585 memset(pBit, 0, nByte);
00586
00587 for (j = 0, b = 0; j < iV ; j++) {
00588 switch (w[j]) {
00589 case 16:
00590 memcpy(p[j], pRows, batchDirs[column].nRow * sizeof(uint16_t));
00591 break;
00592
00593 case 8:
00594 for (i = 0; i < batchDirs[column].nRow ; i++) {
00595 (p[j])[i] = (uint8_t)((uint16_t *) pRows)[i];
00596 }
00597 break;
00598
00599 case 4:
00600 for (i = 0; i < batchDirs[column].nRow ; i++) {
00601 setBits(
00602 p[j] + i / 2 ,
00603 4,
00604 (i % 2) * 4,
00605 (uint16_t)(((uint16_t *) pRows)[i] >> b));
00606 }
00607 break;
00608
00609 case 2:
00610 for (i = 0; i < batchDirs[column].nRow ; i++) {
00611 setBits(
00612 p[j] + i / 4 ,
00613 2,
00614 (i % 4) * 2,
00615 (uint16_t)(((uint16_t *) pRows)[i] >> b));
00616 }
00617 break;
00618
00619 case 1:
00620 for (i = 0; i < batchDirs[column].nRow ; i++) {
00621 setBits(
00622 p[j] + i / 8 ,
00623 1,
00624 (i % 8),
00625 (uint16_t)(((uint16_t *)pRows)[i] >> b));
00626 }
00627 break;
00628
00629 default:
00630 ;
00631 }
00632 b += w[j];
00633 }
00634
00635
00636 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
00637 pBatch[batchCount[column]] = batchDirs[column];
00638 batchCount[column]++;
00639
00640
00641 batchDirs[column].mode = LCS_COMPRESSED;
00642 batchDirs[column].oLastValHighMark = lastVal[column];
00643 batchDirs[column].nValHighMark = nVal[column];
00644 batchDirs[column].nVal = 0;
00645 batchDirs[column].oVal = batchOffset[column];
00646 batchDirs[column].nRow = 0;
00647
00648
00649 nBits[column] = 0;
00650 nextWidthChange[column] = 1 ;
00651 }
00652
00653 void LcsClusterNodeWriter::putFixedVarBatch(
00654 uint column, uint16_t *pRows, PBuffer pBuf)
00655 {
00656 uint i;
00657 uint batchRows;
00658 PBuffer pVal;
00659 PLcsBatchDir pBatch;
00660 PBuffer src;
00661 uint batchRecSize;
00662 uint16_t localLastVal;
00663 uint16_t localoValBank;
00664 PBuffer localpValBank, localpBlock;
00665
00666
00667
00668
00669
00670
00671
00672
00673 batchRows = (batchDirs[column].nRow > 8)
00674 ? batchDirs[column].nRow & 0xfffffff8 : batchDirs[column].nRow;
00675
00676
00677 pVal = pBlock[column] + batchDirs[column].oVal;
00678 if (batchDirs[column].mode == LCS_VARIABLE) {
00679
00680
00681
00682 memcpy(pVal, pRows, batchRows * sizeof(uint16_t));
00683 } else {
00684
00685 assert(batchDirs[column].mode == LCS_FIXED);
00686
00687 batchRecSize = batchDirs[column].recSize;
00688 localLastVal = lastVal[column];
00689 localpValBank = pValBank[column] + valBankStart[column];
00690 localoValBank = oValBank[column];
00691 localpBlock = pBlock[column];
00692
00693
00694
00695 for (i = 0; i < batchRows; i++) {
00696
00697
00698 src = valueSource(
00699 localLastVal, localpValBank, localoValBank,
00700 localpBlock, pRows[i]);
00701 uint len = attrAccessors[column].getStoredByteCount(src);
00702 memcpy(pVal, src, len);
00703 pVal += batchRecSize;
00704 }
00705 }
00706
00707
00708
00709 if (bForceMode[column] != none) {
00710 if (forceModeCount[column] > 20) {
00711 bForceMode[column] = none;
00712 forceModeCount[column] = 0;
00713 }
00714 }
00715
00716 batchRecSize = batchDirs[column].recSize;
00717 localLastVal = lastVal[column];
00718 localpValBank = pValBank[column] + valBankStart[column];
00719 localoValBank = oValBank[column];
00720 localpBlock = pBlock[column];
00721
00722
00723 pVal = pBuf;
00724 for (i = batchRows; i < batchDirs[column].nRow; i++) {
00725
00726
00727
00728 src = valueSource(
00729 localLastVal, localpValBank, localoValBank,
00730 localpBlock, pRows[i]);
00731 uint len = attrAccessors[column].getStoredByteCount(src);
00732 memcpy(pVal, src, len);
00733 pVal += batchRecSize;
00734 }
00735
00736 if (pValBank[column]) {
00737 oValBank[column] = 0;
00738 }
00739
00740
00741 batchDirs[column].nRow = batchRows;
00742 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
00743 pBatch[batchCount[column]] = batchDirs[column];
00744
00745
00746 batchCount[column]++;
00747
00748
00749
00750 switch (bForceMode[column]) {
00751 case none:
00752 batchDirs[column].mode = LCS_COMPRESSED;
00753 break;
00754 case fixed:
00755 batchDirs[column].mode = LCS_FIXED;
00756 break;
00757 case variable:
00758 batchDirs[column].mode = LCS_VARIABLE;
00759 break;
00760 default:
00761 assert(false);
00762 }
00763 batchDirs[column].oLastValHighMark = lastVal[column];
00764 batchDirs[column].nValHighMark = nVal[column];
00765 batchDirs[column].nVal = 0;
00766 batchDirs[column].oVal = batchOffset[column];
00767 batchDirs[column].nRow = 0;
00768
00769
00770 nBits[column] = 0;
00771 nextWidthChange[column] = 1 ;
00772
00773 maxValueSize[column] = 0;
00774 }
00775
00776 void LcsClusterNodeWriter::pickCompressionMode(
00777 uint column, uint recSize, uint nRow, uint16_t **pValOffset,
00778 LcsBatchMode &compressionMode)
00779 {
00780 uint nByte;
00781 PLcsBatchDir pBatch;
00782 WidthVec w;
00783 uint iV;
00784
00785
00786 uint szCompressed;
00787 uint szVariable;
00788 uint szFixed;
00789 uint szNonCompressed;
00790 uint deltaVal;
00791 uint batchRows;
00792
00793
00794
00795 batchDirs[column].nRow = nRow;
00796 batchDirs[column].recSize = recSize;
00797
00798
00799
00800
00801
00802
00803
00804 batchRows = (nRow > 8) ? nRow & 0xfffffff8 : nRow;
00805
00806 szCompressed = batchDirs[column].nVal*sizeof(uint16_t) +
00807 (nBits[column]*nRow + LcsMaxSzLeftError * 8) / 8
00808 + (batchDirs[column].oLastValHighMark - lastVal[column]);
00809
00810
00811 szVariable = batchDirs[column].nRow * sizeof(uint16_t)
00812 + (batchDirs[column].oLastValHighMark - lastVal[column]);
00813
00814
00815
00816 uint leftOverSize;
00817 leftOverSize = LcsMaxLeftOver * sizeof(uint16_t) +
00818 (3 * LcsMaxLeftOver + LcsMaxSzLeftError * 8) / 8
00819 + LcsMaxLeftOver * recSize;
00820 szFixed = nRow * recSize + leftOverSize;
00821
00822 szNonCompressed = std::min(szFixed, szVariable);
00823
00824
00825
00826
00827
00828
00829
00830
00831 if ((fixed == bForceMode[column] || variable == bForceMode[column])
00832 || szCompressed > szNonCompressed) {
00833
00834 *pValOffset = NULL;
00835 batchDirs[column].nVal = 0;
00836
00837 forceModeCount[column]++;
00838
00839
00840 if (fixed == bForceMode[column] || szNonCompressed == szFixed) {
00841
00842
00843 batchDirs[column].mode = LCS_FIXED;
00844
00845
00846 if (bForceMode[column] != fixed) {
00847
00848
00849
00850
00851
00852 deltaVal = batchDirs[column].oLastValHighMark - lastVal[column];
00853
00854
00855
00856 if (deltaVal) {
00857 memcpy(
00858 pValBank[column],
00859 pBlock[column] + lastVal[column],
00860 deltaVal);
00861 }
00862
00863 valBankStart[column] = 0;
00864
00865
00866
00867 bForceMode[column] = fixed;
00868
00869
00870
00871 assert(szVariable >= szFixed);
00872 szLeft += (szVariable - szFixed);
00873 assert(szLeft >= 0);
00874 } else {
00875 valBankStart[column] = lastVal[column];
00876 }
00877
00878
00879
00880
00881
00882 oValBank[column] = lastVal[column];
00883 lastVal[column] = batchDirs[column].oLastValHighMark;
00884 nVal[column] = batchDirs[column].nValHighMark;
00885
00886
00887 nByte = batchRows * batchDirs[column].recSize;
00888
00889 } else {
00890
00891
00892 batchDirs[column].mode = LCS_VARIABLE;
00893
00894
00895 nByte = batchRows*sizeof(uint16_t);
00896
00897
00898
00899 bForceMode[column] = variable;
00900 }
00901 } else {
00902
00903
00904
00905 *pValOffset = (uint16_t *)(pBlock[column] + batchOffset[column]);
00906
00907
00908 iV = bitVecWidth(nBits[column], w);
00909
00910
00911
00912 nByte = sizeofBitVec(batchRows, iV, w) +
00913 batchDirs[column].nVal * sizeof(uint16_t);
00914
00915
00916 assert(szVariable >= szCompressed);
00917 szLeft += (szVariable - szCompressed);
00918 assert(szLeft >= 0);
00919 }
00920
00921 compressionMode = batchDirs[column].mode;
00922
00923
00924
00925 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column] + nByte);
00926 memmove(
00927 pBatch,
00928 pBlock[column] + batchOffset[column],
00929 batchCount[column] * sizeof(LcsBatchDir));
00930
00931
00932
00933 szLeft -= sizeof(LcsBatchDir);
00934 szLeft = std::max(szLeft, 0);
00935 assert(szLeft >= 0);
00936
00937
00938
00939 batchDirs[column].oVal = batchOffset[column];
00940
00941
00942
00943
00944 batchOffset[column] = (batchOffset[column] + nByte);
00945 }
00946
00947
00948
00949
00950
00951
00952 void myCopy(void* pDest, void* pSrc, uint sz)
00953 {
00954 if (pDest == pSrc) {
00955 return;
00956 } else {
00957 memcpy(pDest, pSrc, sz);
00958 }
00959 }
00960
00961 RecordNum LcsClusterNodeWriter::moveFromIndexToTemp()
00962 {
00963 PLcsBatchDir pBatch;
00964 boost::scoped_array<uint16_t> batchDirOffset;
00965 uint16_t loc;
00966 uint column;
00967 uint batchCount = pHdr->nBatch / nClusterCols;
00968 uint b;
00969
00970 batchDirOffset.reset(new uint16_t[pHdr->nBatch]);
00971
00972
00973
00974
00975
00976 for (column = 0; column < nClusterCols; column++) {
00977 uint sz = firstVal[column] - lastVal[column];
00978 loc = (uint16_t) (szBlock - sz);
00979 myCopy(pBlock[column] + loc, pIndexBlock + lastVal[column], sz);
00980
00981
00982 lastVal[column] = loc;
00983 firstVal[column] = (uint16_t) szBlock;
00984 }
00985
00986
00987
00988 pBatch = (PLcsBatchDir)(pIndexBlock + pHdr->oBatch);
00989 for (column = 0; column < nClusterCols; column++) {
00990 uint i;
00991 loc = hdrSize;
00992
00993
00994 for (b = column, i = 0; i < batchCount; i++, b = b + nClusterCols) {
00995 uint16_t batchStart = loc;
00996
00997 if (pBatch[b].mode == LCS_COMPRESSED) {
00998 uint8_t *pBit;
00999 WidthVec w;
01000 PtrVec p;
01001 uint iV;
01002 uint sizeOffsets, nBytes;
01003
01004
01005 sizeOffsets = pBatch[b].nVal * sizeof(uint16_t);
01006 myCopy(
01007 pBlock[column] + loc, pIndexBlock + pBatch[b].oVal,
01008 sizeOffsets);
01009
01010
01011 loc = (uint16_t) (loc + sizeOffsets);
01012
01013
01014 iV = bitVecWidth(calcWidth(pBatch[b].nVal), w);
01015
01016
01017 pBit = pIndexBlock + pBatch[b].oVal + sizeOffsets;
01018
01019
01020 nBytes = bitVecPtr(pBatch[b].nRow, iV, w, p, pBit);
01021
01022 myCopy(pBlock[column] + loc, pBit, nBytes);
01023
01024
01025 loc = (uint16_t) (loc + nBytes);
01026 } else if (pBatch[b].mode == LCS_VARIABLE) {
01027 uint sizeOffsets;
01028
01029 sizeOffsets = pBatch[b].nRow * sizeof(uint16_t);
01030
01031
01032 myCopy(
01033 pBlock[column] + loc, pIndexBlock + pBatch[b].oVal,
01034 sizeOffsets);
01035
01036
01037 loc = (uint16_t) (loc + sizeOffsets);
01038 } else {
01039
01040 uint sizeFixed;
01041
01042 sizeFixed = pBatch[b].nRow * pBatch[b].recSize;
01043
01044 myCopy(
01045 pBlock[column] + loc, pIndexBlock + pBatch[b].oVal,
01046 sizeFixed);
01047
01048
01049 loc = (uint16_t) (loc + sizeFixed);
01050 }
01051
01052
01053 batchDirOffset[b] = batchStart;
01054 }
01055
01056
01057
01058 uint16_t dirLoc;
01059 b = column;
01060 dirLoc = loc;
01061 batchOffset[column] = dirLoc;
01062
01063
01064 for (i = 0; i < batchCount; i++) {
01065 PLcsBatchDir pTempBatch = (PLcsBatchDir)(pBlock[column] + dirLoc);
01066 myCopy(pTempBatch, &pBatch[b], sizeof(LcsBatchDir));
01067
01068 pTempBatch->oVal = batchDirOffset[b];
01069
01070 b = b + nClusterCols;
01071 dirLoc += sizeof(LcsBatchDir);
01072 }
01073 }
01074
01075
01076 pBatch = (PLcsBatchDir)(pIndexBlock + pHdr->oBatch);
01077 RecordNum nrows = 0;
01078 for (b = 0; b < pHdr->nBatch; b = b + nClusterCols) {
01079 nrows += pBatch[b].nRow;
01080 }
01081
01082 batchDirOffset.reset();
01083 return nrows;
01084 }
01085
01086 void LcsClusterNodeWriter::moveFromTempToIndex()
01087 {
01088 PLcsBatchDir pBatch;
01089 uint sz, numBatches = batchCount[0];
01090 uint16_t offset, loc;
01091 uint column, b;
01092
01093
01094
01095
01096 for (offset = (uint16_t) szBlock, column = 0; column < nClusterCols;
01097 column++)
01098 {
01099 sz = szBlock - lastVal[column];
01100 myCopy(
01101 pIndexBlock + (offset - sz), pBlock[column] + lastVal[column], sz);
01102
01103
01104 delta[column] = (uint16_t)(szBlock - offset);
01105
01106
01107
01108 firstVal[column] = offset;
01109 offset = (uint16_t) (offset - sz);
01110 lastVal[column] = offset;
01111 }
01112
01113
01114
01115 for (loc = hdrSize, b = 0; b < numBatches; b++) {
01116 for (column = 0; column < nClusterCols; column++) {
01117 uint16_t batchStart = loc;
01118
01119 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
01120
01121 if (pBatch[b].mode == LCS_COMPRESSED) {
01122 uint8_t *pBit;
01123 WidthVec w;
01124 PtrVec p;
01125 uint iV;
01126 uint sizeOffsets, nBytes;
01127
01128 sizeOffsets = pBatch[b].nVal * sizeof(uint16_t);
01129
01130
01131 myCopy(
01132 pIndexBlock + loc, pBlock[column] + pBatch[b].oVal,
01133 sizeOffsets);
01134
01135
01136 loc = (uint16_t) (loc + sizeOffsets);
01137
01138
01139 iV = bitVecWidth(calcWidth(pBatch[b].nVal), w);
01140
01141
01142 pBit = pBlock[column] + pBatch[b].oVal + sizeOffsets;
01143
01144
01145 nBytes = bitVecPtr(pBatch[b].nRow, iV, w, p, pBit);
01146
01147 myCopy(pIndexBlock + loc, pBit, nBytes);
01148
01149
01150 loc = (uint16_t)(loc + nBytes);
01151
01152 } else if (pBatch[b].mode == LCS_VARIABLE) {
01153 uint sizeOffsets;
01154
01155 sizeOffsets = pBatch[b].nRow * sizeof(uint16_t);
01156
01157
01158 myCopy(
01159 pIndexBlock + loc, pBlock[column] + pBatch[b].oVal,
01160 sizeOffsets);
01161
01162
01163 loc = (uint16_t) (loc + sizeOffsets);
01164 } else {
01165
01166 uint sizeFixed;
01167
01168 sizeFixed = pBatch[b].nRow * pBatch[b].recSize;
01169
01170 myCopy(
01171 pIndexBlock + loc, pBlock[column] + pBatch[b].oVal,
01172 sizeFixed);
01173
01174
01175 loc = (uint16_t) (loc + sizeFixed);
01176 }
01177
01178
01179 pBatch[b].oVal = batchStart;
01180 }
01181 }
01182
01183
01184 pHdr->nBatch = nClusterCols * numBatches;
01185
01186
01187 pHdr->oBatch = loc;
01188
01189
01190 for (b = 0; b < numBatches; b++) {
01191 for (column = 0; column < nClusterCols; column++) {
01192 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
01193 myCopy(pIndexBlock + loc, &pBatch[b], sizeof(LcsBatchDir));
01194 loc += sizeof(LcsBatchDir);
01195 }
01196 }
01197
01198 if (isTracingLevel(TRACE_FINE)) {
01199 FENNEL_TRACE(
01200 TRACE_FINE, "Calling ClusterDump from moveFromTempToIndex");
01201 clusterDump->dump(opaqueToInt(clusterPageId), pHdr, szBlock);
01202 }
01203 }
01204
01205 void LcsClusterNodeWriter::allocArrays()
01206 {
01207
01208 if (!arraysAllocated) {
01209 arraysAllocated = true;
01210
01211 batchDirs.reset(new LcsBatchDir[nClusterCols]);
01212
01213 pValBank.reset(new PBuffer[nClusterCols]);
01214
01215
01216
01217 attrAccessors.reset(new UnalignedAttributeAccessor[nClusterCols]);
01218
01219 for (uint col = 0; col < nClusterCols; col++) {
01220 bufferLock.allocatePage();
01221 pValBank[col] = bufferLock.getPage().getWritableData();
01222
01223
01224
01225
01226 bufferLock.unlock();
01227
01228 attrAccessors[col].compute(colTupleDesc[col]);
01229 }
01230
01231 valBankStart.reset(new uint16_t[nClusterCols]);
01232
01233 forceModeCount.reset(new uint[nClusterCols]);
01234
01235 bForceMode.reset(new ForceMode[nClusterCols]);
01236
01237 oValBank.reset(new uint16_t[nClusterCols]);
01238
01239 batchOffset.reset(new uint16_t[nClusterCols]);
01240
01241 batchCount.reset(new uint[nClusterCols]);
01242
01243 nBits.reset(new uint[nClusterCols]);
01244
01245 nextWidthChange.reset(new uint[nClusterCols]);
01246
01247 maxValueSize.reset(new uint[nClusterCols]);
01248 }
01249
01250 memset(valBankStart.get(), 0, nClusterCols * sizeof(uint16_t));
01251 memset(forceModeCount.get(), 0, nClusterCols * sizeof(uint));
01252 memset(bForceMode.get(), 0, nClusterCols * sizeof(ForceMode));
01253 memset(oValBank.get(), 0, nClusterCols * sizeof(uint16_t));
01254 memset(batchOffset.get(), 0, nClusterCols * sizeof(uint16_t));
01255 memset(batchCount.get(), 0, nClusterCols * sizeof(uint));
01256 memset(nBits.get(), 0, nClusterCols * sizeof(uint));
01257 memset(nextWidthChange.get(), 0, nClusterCols * sizeof(uint));
01258 memset(maxValueSize.get(), 0, nClusterCols * sizeof(uint));
01259 }
01260
01261
01262 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $");
01263
01264