LcsClusterNodeWriter.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterNodeWriter.h"
00024 #include "fennel/tuple/TupleAccessor.h"
00025 #include <boost/scoped_array.hpp>
00026 
00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $");
00028 
00029 LcsClusterNodeWriter::LcsClusterNodeWriter(
00030     BTreeDescriptor const &treeDescriptorInit,
00031     SegmentAccessor const &accessorInit,
00032     TupleDescriptor const &colTupleDescInit,
00033     SharedTraceTarget pTraceTargetInit,
00034     std::string nameInit) :
00035         LcsClusterAccessBase(treeDescriptorInit),
00036         TraceSource(pTraceTargetInit, nameInit)
00037 {
00038     scratchAccessor = accessorInit;
00039     bufferLock.accessSegment(scratchAccessor);
00040     bTreeWriter = SharedBTreeWriter(
00041         new BTreeWriter(treeDescriptorInit, scratchAccessor, true));
00042     colTupleDesc = colTupleDescInit;
00043     clusterDump =
00044         SharedLcsClusterDump(
00045             new LcsClusterDump(
00046                 treeDescriptorInit,
00047                 colTupleDesc,
00048                 TRACE_FINE,
00049                 pTraceTargetInit,
00050                 nameInit));
00051     nClusterCols = 0;
00052     pHdr = 0;
00053     hdrSize = 0;
00054     pIndexBlock = 0;
00055     pBlock = 0;
00056     szBlock = 0;
00057     minSzLeft = 0;
00058     batchDirs.reset();
00059     pValBank.reset();
00060     oValBank.reset();
00061     batchOffset.reset();
00062     batchCount.reset();
00063     szLeft = 0;
00064     nBits.reset();
00065     nextWidthChange.reset();
00066     arraysAllocated = false;
00067     valBankStart.reset();
00068     bForceMode.reset();
00069     forceModeCount.reset();
00070     maxValueSize.reset();
00071 }
00072 
00073 LcsClusterNodeWriter::~LcsClusterNodeWriter()
00074 {
00075     close();
00076 }
00077 
00078 void LcsClusterNodeWriter::close()
00079 {
00080     // flush and unlock last page written
00081     if (clusterLock.isLocked()) {
00082         clusterLock.flushPage(true);
00083     }
00084     clusterLock.unlock();
00085 
00086     bTreeWriter.reset();
00087     batchDirs.reset();
00088     pValBank.reset();
00089     valBankStart.reset();
00090     forceModeCount.reset();
00091     bForceMode.reset();
00092     oValBank.reset();
00093     batchOffset.reset();
00094     batchCount.reset();
00095     nBits.reset();
00096     nextWidthChange.reset();
00097     maxValueSize.reset();
00098     attrAccessors.reset();
00099 }
00100 
00101 bool LcsClusterNodeWriter::getLastClusterPageForWrite(
00102     PLcsClusterNode &pBlock, LcsRid &firstRid)
00103 {
00104     // get the last key in the btree (if it exists) and read the cluster
00105     // page based on the pageid stored in that btree record
00106 
00107     if (bTreeWriter->searchLast() == false) {
00108         bTreeWriter->endSearch();
00109         return false;
00110     }
00111 
00112     bTreeWriter->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00113     clusterPageId = readClusterPageId();
00114     clusterLock.lockExclusive(clusterPageId);
00115     pBlock = &(clusterLock.getNodeForWrite());
00116     firstRid = pBlock->firstRID;
00117 
00118     // End the search so the BTreeWriter doesn't think it's positioned within
00119     // the btree.  We'll position properly on the first monotonic insert.
00120     bTreeWriter->endSearch();
00121 
00122     if (isTracingLevel(TRACE_FINE)) {
00123         FENNEL_TRACE(
00124             TRACE_FINE,
00125             "Calling ClusterDump from getLastClusterPageForWrite");
00126         clusterDump->dump(opaqueToInt(clusterPageId), pBlock, szBlock);
00127     }
00128 
00129     return true;
00130 }
00131 
00132 PLcsClusterNode LcsClusterNodeWriter::allocateClusterPage(LcsRid firstRid)
00133 {
00134     // allocate a new cluster page and insert the corresponding rid, pageid
00135     // record into the btree
00136 
00137     PageId prevPageId = NULL_PAGE_ID;
00138 
00139     if (clusterLock.isLocked()) {
00140         // Remember the predecessor so that we can chain it below.
00141         prevPageId = clusterLock.getPageId();
00142 
00143         // Kick off an asynchronous write on the page we've just finished
00144         // so that when it comes time to checkpoint or victimize,
00145         // maybe it will be on disk already.
00146         clusterLock.flushPage(true);
00147     }
00148 
00149     clusterPageId = clusterLock.allocatePage();
00150     if (prevPageId != NULL_PAGE_ID) {
00151         segmentAccessor.pSegment->setPageSuccessor(prevPageId, clusterPageId);
00152     }
00153     bTreeRid = firstRid;
00154     bTreeTupleData[0].pData = reinterpret_cast<uint8_t *> (&firstRid);
00155     bTreeTupleData[1].pData = reinterpret_cast<uint8_t *> (&clusterPageId);
00156     bTreeWriter->insertTupleData(bTreeTupleData, DUP_FAIL);
00157     return &(clusterLock.getNodeForWrite());
00158 }
00159 
00160 void LcsClusterNodeWriter::init(
00161     uint nColumn, PBuffer iBlock, PBuffer *pB, uint szB)
00162 {
00163     nClusterCols = nColumn;
00164     pIndexBlock = iBlock;
00165     pBlock = pB;
00166     szBlock = szB;
00167     pHdr = (PLcsClusterNode) pIndexBlock;
00168 
00169     hdrSize = getClusterSubHeaderSize(nClusterCols);
00170 
00171     // initialize lastVal, firstVal, and nVal fields in the header
00172     // to point to the appropriate positions in the indexBlock
00173 
00174     setHdrOffsets(pHdr);
00175 
00176     minSzLeft = nClusterCols * (LcsMaxLeftOver * sizeof(uint16_t) +
00177                      sizeof(LcsBatchDir));
00178 
00179     allocArrays();
00180 }
00181 
00182 void LcsClusterNodeWriter::openNew(LcsRid startRID)
00183 {
00184     int i;
00185 
00186     // Inialize block header and batches
00187     pHdr->firstRID = startRID;
00188     pHdr->nColumn = nClusterCols;
00189     pHdr->nBatch = 0;
00190     pHdr->oBatch = hdrSize;
00191 
00192     for (i = 0; i < nClusterCols; i++) {
00193         lastVal[i] = szBlock;
00194         firstVal[i] = (uint16_t) szBlock;
00195         nVal[i] = 0;
00196         delta[i] = 0;
00197         batchDirs[i].mode = LCS_COMPRESSED;
00198         batchDirs[i].nVal = 0;
00199         batchDirs[i].nRow = 0;
00200         batchDirs[i].oVal = 0;
00201         batchDirs[i].oLastValHighMark = lastVal[i];
00202         batchDirs[i].nValHighMark = nVal[i];
00203         batchOffset[i] = hdrSize;
00204         // # of bits it takes to represent 0 values
00205         nBits[i] = 0;
00206         nextWidthChange[i] = 1;
00207         batchCount[i] = 0;
00208     }
00209 
00210     // account for the header size, account for at least 1 batch for each column
00211     // and leave space for one addtional batch for a "left-over" batch
00212 
00213     szLeft = szBlock - hdrSize -
00214                 (2 * sizeof(LcsBatchDir)) * nClusterCols;
00215     szLeft = std::max(szLeft, 0);
00216     assert(szLeft >= 0);
00217 }
00218 
00219 bool LcsClusterNodeWriter::openAppend(
00220     uint *nValOffsets, uint16_t *lastValOffsets, RecordNum &nrows)
00221 {
00222     int i;
00223 
00224     // leave space for one batch for each column entry
00225     szLeft = lastVal[nClusterCols - 1] - pHdr->oBatch -
00226                 (pHdr->nBatch + 2* nClusterCols) * sizeof(LcsBatchDir);
00227     szLeft = std::max(szLeft, 0);
00228     assert(szLeft >= 0);
00229 
00230     // Let's move the values, batch directories, and batches to
00231     // temporary blocks from index block
00232     nrows = moveFromIndexToTemp();
00233 
00234     for (i = 0; i < nClusterCols; i++) {
00235         nValOffsets[i] = nVal[i];
00236         lastValOffsets[i] = lastVal[i];
00237         memset(&batchDirs[i], 0, sizeof(LcsBatchDir));
00238 
00239         batchDirs[i].oLastValHighMark = lastVal[i];
00240         batchDirs[i].nValHighMark = nVal[i];
00241         batchDirs[i].mode = LCS_COMPRESSED;
00242 
00243         // # of bits it takes to represent 0 values
00244         nBits[i] = 0;
00245         nextWidthChange[i] = 1;
00246 
00247         oValBank[i] = 0;
00248         batchCount[i] = pHdr->nBatch / nClusterCols;
00249     }
00250 
00251     return (szLeft == 0);
00252 }
00253 
00254 void LcsClusterNodeWriter::describeLastBatch(
00255     uint column, uint &dRow, uint &recSize)
00256 {
00257     PLcsBatchDir pBatch;
00258 
00259     pBatch = (PLcsBatchDir) (pBlock[column] + batchOffset[column]);
00260     dRow = pBatch[batchCount[column] -1].nRow % 8;
00261     recSize = pBatch[batchCount[column] -1].recSize;
00262 }
00263 
00264 uint16_t LcsClusterNodeWriter::getNextVal(uint column, uint16_t thisVal)
00265 {
00266     if (thisVal && thisVal != szBlock) {
00267         return
00268             (uint16_t) (thisVal +
00269                 attrAccessors[column].getStoredByteCount(
00270                     pBlock[column] + thisVal));
00271     } else {
00272         return 0;
00273     }
00274 }
00275 
00276 void LcsClusterNodeWriter::rollBackLastBatch(uint column, PBuffer pBuf)
00277 {
00278     uint i;
00279     PLcsBatchDir pBatch;
00280     uint16_t *pValOffsets;
00281 
00282     uint8_t *pBit;                      // bitVecs start address
00283     WidthVec w;                         // bitVec width vector
00284     PtrVec p;                           // bitVec offsets
00285     uint iV;                            // # of bit vectors
00286 
00287     uint16_t rows[LcsMaxRollBack];      // row index storage
00288     int origSzLeft;
00289     uint len;
00290 
00291     // load last batch, nBatch must be at least 1
00292     pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
00293     batchDirs[column]  = pBatch[batchCount[column] -1];
00294 
00295     // compute size left in temporary block before roll back
00296     origSzLeft = lastVal[column] - batchOffset[column] -
00297                     (batchCount[column]+2)*sizeof(LcsBatchDir);
00298 
00299     if ((batchDirs[column].nRow > 8) || (batchDirs[column].nRow % 8) == 0) {
00300         return;
00301     }
00302 
00303     if (batchDirs[column].mode == LCS_COMPRESSED) {
00304         // calculate the bit vector widthes
00305         iV = bitVecWidth(calcWidth(batchDirs[column].nVal), w);
00306 
00307         // this is where the bit vectors start
00308         pBit = pBlock[column] + batchDirs[column].oVal +
00309                 batchDirs[column].nVal * sizeof(uint16_t);
00310 
00311         // nByte are taken by the bit vectors
00312         bitVecPtr(batchDirs[column].nRow, iV, w, p, pBit);
00313 
00314         // there are at most 8 rows in this batch
00315         readBitVecs(rows, iV, w, p, 0, batchDirs[column].nRow);
00316 
00317         // get the address of the batches value offsets
00318         pValOffsets = (uint16_t *)(pBlock[column] + batchDirs[column].oVal);
00319 
00320         // fill up buffer with batches values
00321         for (i = 0; i < batchDirs[column].nRow;
00322             i++, pBuf += batchDirs[column].recSize)
00323         {
00324             len =
00325                 attrAccessors[column].getStoredByteCount(
00326                     pBlock[column] + pValOffsets[rows[i]]);
00327             memcpy(pBuf, pBlock[column] + pValOffsets[rows[i]], len);
00328         }
00329 
00330     } else if (batchDirs[column].mode == LCS_FIXED) {
00331         // fixed size record batch
00332         // copy the values into the given buffer
00333         memcpy(
00334             pBuf,
00335             pBlock[column] + batchDirs[column].oVal,
00336             batchDirs[column].nRow * batchDirs[column].recSize);
00337     } else {
00338         // variable sized records (batch.mode == LCS_VARIABLE)
00339         // get the address of the batches value offsets
00340         pValOffsets = (uint16_t *)(pBlock[column] + batchDirs[column].oVal);
00341 
00342         // fill up buffer with batches values
00343         for (i = 0; i < batchDirs[column].nRow;
00344             i++, pBuf += batchDirs[column].recSize)
00345         {
00346             len =
00347                 attrAccessors[column].getStoredByteCount(
00348                     pBlock[column] + pValOffsets[i]);
00349             memcpy(pBuf, pBlock[column] + pValOffsets[i], len);
00350         }
00351     }
00352 
00353     // Reset the last batch
00354     batchCount[column]--;
00355     // batch dir offset points to the beginning of the rolled back batch
00356     batchOffset[column] = batchDirs[column].oVal;
00357 
00358     // copy the batch dir back to the end of the prev batch.
00359     memmove(
00360         pBlock[column] + batchOffset[column],
00361         pBatch,
00362         batchCount[column] * sizeof(LcsBatchDir));
00363 
00364     // recalc size left
00365     // leave place for one new batch(the rolled back one will be rewriten)
00366     // and possibley one to follow.  Subtract the difference of the new size
00367     // and the original size and add this to szLeft in index variable
00368     int newSz;
00369     newSz = lastVal[column] - batchOffset[column] -
00370             (batchCount[column] + 2) * sizeof(LcsBatchDir);
00371     szLeft += (newSz - origSzLeft);
00372     szLeft = std::max(szLeft, 0);
00373     assert(szLeft >= 0);
00374 
00375     // # of bits it takes to represent 0 values
00376     nBits[column] = 0;
00377     nextWidthChange[column] = 1;
00378 
00379     // set batch parameters
00380     batchDirs[column].mode = LCS_COMPRESSED;
00381     batchDirs[column].nVal = 0;
00382     batchDirs[column].nRow = 0;
00383     batchDirs[column].oVal = 0;
00384     batchDirs[column].recSize = 0;
00385 }
00386 
00387 // addValue() where the current value already exists
00388 
00389 bool LcsClusterNodeWriter::addValue(uint column, bool bFirstTimeInBatch)
00390 {
00391     // Calculate szleft assuming the value gets added.
00392     szLeft -= sizeof(uint16_t);
00393 
00394     // if there is not enough space left, reject value
00395     if (szLeft < ((int) nClusterCols * LcsMaxSzLeftError)) {
00396         // set szLeft to its previous value
00397         szLeft += sizeof(uint16_t);
00398         assert(szLeft >= 0);
00399         return false;
00400     }
00401 
00402     if (bFirstTimeInBatch) {
00403         // there is enough space to house the value, increment batch
00404         // value count
00405         batchDirs[column].nVal++;
00406 
00407         // check if nBits needs to change by comparing the value count
00408         // the change point count
00409         if (batchDirs[column].nVal == nextWidthChange[column]) {
00410             // calculate the next nBits value, and the count of values
00411             // for the next chane
00412             nBits[column] = calcWidth(batchDirs[column].nVal);
00413             nextWidthChange[column] = (1 << nBits[column]) + 1;
00414         }
00415     }
00416 
00417     return true;
00418 }
00419 
00420 // addValue() where the value must be added to the bottom of the page
00421 
00422 bool LcsClusterNodeWriter::addValue(uint column, PBuffer pVal, uint16_t *oVal)
00423 {
00424     uint16_t lastValOffset;
00425     int oldSzLeft = szLeft;
00426     uint szVal = attrAccessors[column].getStoredByteCount(pVal);
00427 
00428     // if we are in forced fixed compression mode,
00429     // see if the maximum record size in this batch has increased.
00430     // if so, adjust the szLeft based on the idea that each previous element
00431     // will now be taking more space
00432     if (bForceMode[column] == fixed) {
00433         if (szVal > maxValueSize[column]) {
00434             szLeft -= batchDirs[column].nVal *
00435                 (szVal - maxValueSize[column]);
00436             maxValueSize[column] = szVal;
00437         }
00438     }
00439 
00440     // adjust szleft (upper bound on amount of space left in the block).
00441     // If we are in a forced fixed compression mode then we can calculate this
00442     // exactly.  If we are in "none" mode, then we calculate szLeft according to
00443     // variable mode compression (which should be an upper bound), and adjust it
00444     // later in pickCompressionMode
00445     if (bForceMode[column] == fixed) {
00446         szLeft -= maxValueSize[column];
00447     } else {
00448         // assume value is being added in variable mode
00449         // (note: this assumes that whenever we convert from
00450         // variable mode to compressed mode, the compressed mode will
00451         // take less space, for only in this case is szLeft an upper bound)
00452         szLeft -= (sizeof(uint16_t) + szVal) ;
00453     }
00454 
00455     // if there is not enough space left reject value
00456     if (szLeft < ((int) nClusterCols * LcsMaxSzLeftError)) {
00457         // set szLeft to its previous value
00458         szLeft = oldSzLeft;
00459         assert(szLeft >= 0);
00460         return false;
00461     }
00462 
00463     // otherwise, go ahead and add the value...
00464 
00465     lastValOffset = lastVal[column] - szVal;
00466 
00467     // there is enough space to house the value, increment batch value count
00468     batchDirs[column].nVal++;
00469 
00470     // check if nBits needs to change by comparing the value count
00471     // the change point count
00472     if (batchDirs[column].nVal == nextWidthChange[column]) {
00473         // calculate the next nBits value, and the count of values
00474         // for the next chane
00475         nBits[column] = calcWidth(batchDirs[column].nVal);
00476         nextWidthChange[column] = (1 << nBits[column]) + 1;
00477     }
00478 
00479     lastVal[column] = lastValOffset;
00480 
00481     // Save the value being inserted.  If we are building the
00482     // block in fixed mode then save the value into pValBank
00483     // rather than saving it into the block
00484     if (fixed == bForceMode[column]) {
00485         memcpy(pValBank[column] + lastValOffset, pVal, szVal);
00486     } else {
00487         memcpy(pBlock[column] + lastValOffset, pVal, szVal);
00488     }
00489 
00490     // return the block offset of the new value;
00491     *oVal = lastValOffset;
00492 
00493     nVal[column]++;
00494 
00495     return true;
00496 }
00497 
00498 void LcsClusterNodeWriter::undoValue(
00499     uint column, PBuffer pVal, bool bFirstInBatch)
00500 {
00501     // pVal may be null if the value already exists, in which case, it wasn't
00502     // added to the value list.  However, if it was the first such value for
00503     // the batch, addValue was called to bump-up the batch value count
00504     // so we still need to call undoValue
00505     uint szVal =
00506         (pVal) ? attrAccessors[column].getStoredByteCount(pVal) : 0;
00507 
00508     // add back size subtracted for offset
00509     szLeft += (sizeof(uint16_t) + szVal) ;
00510     assert(szLeft >= 0);
00511 
00512     // If value was new to the batch, then adjust counters
00513     if (bFirstInBatch) {
00514         // decrement batch count
00515         batchDirs[column].nVal--;
00516 
00517         //reset nextWidthChange
00518         if (batchDirs[column].nVal == 0) {
00519             nextWidthChange[column] = 1;
00520         } else {
00521             // calculate the next nBits value, and the count of values
00522             // for the next chane
00523             nBits[column] = calcWidth(batchDirs[column].nVal);
00524             nextWidthChange[column] = (1 << nBits[column]) + 1;
00525         }
00526     }
00527 
00528     if (pVal) {
00529         // upgrage header
00530         lastVal[column] += szVal;
00531         nVal[column]--;
00532     }
00533 }
00534 
00535 void LcsClusterNodeWriter::putCompressedBatch(
00536     uint column, PBuffer pRows, PBuffer pBuf)
00537 {
00538     uint        i, j, b;
00539     uint        iRow;
00540     uint        nByte;
00541     uint8_t     *pBit;
00542     uint16_t    *pOffs;
00543     PLcsBatchDir pBatch;
00544 
00545     WidthVec    w;      // bitVec width vector
00546     PtrVec      p;      // bitVec offsets
00547     uint        iV;     // number of bit vectors
00548 
00549     // pickCompressionMode() was called prior to putCompressedBatch,
00550     // and the following has been already done:
00551     // -- the batch descriptors were moved to the back of the batch
00552     // -- a batch descriptor for this batch has been placed in the batch
00553     //    directory
00554     // -- this->batch contains up to date info
00555     // -- the caller has copied nVal value offsets to the head of this batch
00556 
00557     // write to buffer values for rows over the 8 boundary if nrow is
00558     // greater then 8
00559 
00560     if (batchDirs[column].nRow > 8) {
00561         uint len;
00562         pOffs = (uint16_t *)(pBlock[column] + batchDirs[column].oVal);
00563         for (i = round8Boundary((uint32_t) batchDirs[column].nRow);
00564             i < batchDirs[column].nRow; i++, pBuf += batchDirs[column].recSize)
00565         {
00566             iRow = ((uint16_t *) pRows)[i];
00567             len =
00568                 attrAccessors[column].getStoredByteCount(
00569                     pBlock[column] + pOffs[iRow]);
00570             memcpy(pBuf, pBlock[column] + pOffs[iRow], len);
00571         }
00572         batchDirs[column].nRow =
00573             round8Boundary((uint32_t) batchDirs[column].nRow);
00574     }
00575 
00576     // calculate the bit vector widthes, sum(w[i]) is nBits
00577     iV = bitVecWidth(nBits[column], w);
00578 
00579     // this is where the bit vectors start
00580     pBit = pBlock[column] + batchDirs[column].oVal +
00581             batchDirs[column].nVal*sizeof(uint16_t);
00582 
00583     // nByte are taken by the bit vectors, clear them befor OR-ing
00584     nByte = bitVecPtr(batchDirs[column].nRow, iV, w, p, pBit);
00585     memset(pBit, 0, nByte);
00586 
00587     for (j = 0, b = 0; j < iV ; j++) {
00588         switch (w[j]) {
00589         case 16:
00590             memcpy(p[j], pRows, batchDirs[column].nRow * sizeof(uint16_t));
00591             break;
00592 
00593         case 8:
00594             for (i = 0; i < batchDirs[column].nRow ; i++) {
00595                 (p[j])[i] = (uint8_t)((uint16_t *) pRows)[i];
00596             }
00597             break;
00598 
00599         case 4:
00600             for (i = 0; i < batchDirs[column].nRow ; i++) {
00601                 setBits(
00602                     p[j] + i / 2 ,
00603                     4,
00604                     (i % 2) * 4,
00605                     (uint16_t)(((uint16_t *) pRows)[i] >> b));
00606             }
00607             break;
00608 
00609         case 2:
00610             for (i = 0; i < batchDirs[column].nRow ; i++) {
00611                 setBits(
00612                     p[j] + i / 4 ,
00613                     2,
00614                     (i % 4) * 2,
00615                     (uint16_t)(((uint16_t *) pRows)[i] >> b));
00616             }
00617             break;
00618 
00619         case 1:
00620             for (i = 0; i < batchDirs[column].nRow ; i++) {
00621                 setBits(
00622                     p[j] + i / 8 ,
00623                     1,
00624                     (i % 8),
00625                     (uint16_t)(((uint16_t *)pRows)[i] >> b));
00626             }
00627             break;
00628 
00629         default:
00630                 ;
00631         }
00632         b += w[j];
00633     }
00634 
00635     // put the batch in the batch directory
00636     pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
00637     pBatch[batchCount[column]] = batchDirs[column];
00638     batchCount[column]++;
00639 
00640     // reset the batch state
00641     batchDirs[column].mode = LCS_COMPRESSED;
00642     batchDirs[column].oLastValHighMark = lastVal[column];
00643     batchDirs[column].nValHighMark = nVal[column];
00644     batchDirs[column].nVal = 0;
00645     batchDirs[column].oVal = batchOffset[column];
00646     batchDirs[column].nRow = 0;
00647 
00648     // # of bits it takes to represent 0 values
00649     nBits[column] = 0;
00650     nextWidthChange[column] = 1 ;
00651 }
00652 
00653 void LcsClusterNodeWriter::putFixedVarBatch(
00654     uint column, uint16_t *pRows, PBuffer pBuf)
00655 {
00656     uint        i;
00657     uint        batchRows;
00658     PBuffer     pVal;
00659     PLcsBatchDir pBatch;
00660     PBuffer     src;
00661     uint        batchRecSize;
00662     uint16_t    localLastVal;
00663     uint16_t    localoValBank;
00664     PBuffer     localpValBank, localpBlock;
00665 
00666 
00667     // The # of rows in a batch will be smaller then 8 or a multiple
00668     // of 8.  All but the last batch in a load will be greater then 8.
00669     // Round to nearest 8 boundary, unless this is the last batch (which
00670     // we determine by the nRows in the batch being less than 8).
00671     // If it turns it isn't the latch batch, then our caller (WriteBatch)
00672     // will roll back the whole batch and add it to a new block instead.
00673     batchRows = (batchDirs[column].nRow > 8)
00674         ? batchDirs[column].nRow & 0xfffffff8 : batchDirs[column].nRow;
00675 
00676     // the value destination
00677     pVal = pBlock[column] + batchDirs[column].oVal;
00678     if (batchDirs[column].mode == LCS_VARIABLE) {
00679         // For a variable mode, copy in the value offsets into the RI block.
00680         // The left over i.e. the rows over the highest 8 boundary will be
00681         // copied to pBuf
00682         memcpy(pVal, pRows, batchRows * sizeof(uint16_t));
00683     } else {
00684         // it's a fixed record batch
00685         assert(batchDirs[column].mode == LCS_FIXED);
00686 
00687         batchRecSize  = batchDirs[column].recSize;
00688         localLastVal  = lastVal[column];
00689         localpValBank = pValBank[column] + valBankStart[column];
00690         localoValBank = oValBank[column];
00691         localpBlock   = pBlock[column];
00692 
00693         // Copy the values themselves into the block.
00694         // The values are currently stored in pValBank
00695         for (i = 0; i < batchRows; i++) {
00696             // valueSource determines by the offset whether the value comes from
00697             // the bank of from the block
00698             src = valueSource(
00699                 localLastVal, localpValBank, localoValBank,
00700                 localpBlock, pRows[i]);
00701             uint len = attrAccessors[column].getStoredByteCount(src);
00702             memcpy(pVal, src, len);
00703             pVal += batchRecSize;
00704         }
00705     }
00706 
00707     // if forced fixed mode is true we need to periodically set it false, so
00708     // we can at least check if the data can be compressed
00709     if (bForceMode[column] != none) {
00710         if (forceModeCount[column] > 20) {
00711             bForceMode[column] = none;
00712             forceModeCount[column] = 0;
00713         }
00714     }
00715 
00716     batchRecSize  = batchDirs[column].recSize;
00717     localLastVal  = lastVal[column];
00718     localpValBank = pValBank[column] + valBankStart[column];
00719     localoValBank = oValBank[column];
00720     localpBlock   = pBlock[column];
00721 
00722     // copy the tail of the batch (the last nRow % 8 values) to pBuf
00723     pVal = pBuf;
00724     for (i = batchRows; i < batchDirs[column].nRow; i++) {
00725         // valueSource determines by the offset whether the value comes from
00726         // the bank or from the block. if the value bank is not used
00727         // valueSource will get all the values fron the block
00728         src = valueSource(
00729             localLastVal, localpValBank, localoValBank,
00730             localpBlock, pRows[i]);
00731         uint len = attrAccessors[column].getStoredByteCount(src);
00732         memcpy(pVal, src, len);
00733         pVal += batchRecSize;
00734     }
00735 
00736     if (pValBank[column]) {
00737         oValBank[column] = 0;
00738     }
00739 
00740     // Put batch descriptor in batch directory
00741     batchDirs[column].nRow = batchRows;
00742     pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
00743     pBatch[batchCount[column]] = batchDirs[column];
00744 
00745     // inc. batch count
00746     batchCount[column]++;
00747 
00748     // reset the batch state.  set batch back to compressed mode
00749     // unless the flag has been set that next
00750     switch (bForceMode[column]) {
00751     case none:
00752         batchDirs[column].mode = LCS_COMPRESSED;
00753         break;
00754     case fixed:
00755         batchDirs[column].mode = LCS_FIXED;
00756         break;
00757     case variable:
00758         batchDirs[column].mode = LCS_VARIABLE;
00759         break;
00760     default:
00761         assert(false);
00762     }
00763     batchDirs[column].oLastValHighMark = lastVal[column];
00764     batchDirs[column].nValHighMark = nVal[column];
00765     batchDirs[column].nVal = 0;
00766     batchDirs[column].oVal = batchOffset[column];
00767     batchDirs[column].nRow = 0;
00768 
00769     // # of bits it takes to represent 0 values
00770     nBits[column] = 0;
00771     nextWidthChange[column] = 1 ;
00772 
00773     maxValueSize[column] = 0;
00774 }
00775 
00776 void LcsClusterNodeWriter::pickCompressionMode(
00777     uint column, uint recSize, uint nRow, uint16_t **pValOffset,
00778     LcsBatchMode &compressionMode)
00779 {
00780     uint        nByte;
00781     PLcsBatchDir pBatch;
00782     WidthVec    w;      // bitVec m_width vector
00783     uint        iV;     // number of bit vectors
00784 
00785 
00786     uint        szCompressed;   // size of the compressed batch
00787     uint        szVariable;     // size of the variable sized batch
00788     uint        szFixed;        // size of the fixed batch
00789     uint        szNonCompressed;
00790     uint        deltaVal;
00791     uint        batchRows;      // # of rows in the batch that is nRows rounded
00792                                 // down to the nearest 8 boundary
00793 
00794     // update batch fields
00795     batchDirs[column].nRow = nRow;
00796     batchDirs[column].recSize = recSize;
00797 
00798     // calculate the size required for a compressed and sorted batch
00799     // by summing the spcae required for the value offsets, the bit vectors
00800     // and the values that were put in since the batch started
00801 
00802     // the # of rows in a batch will be smaller then 8 or a multiple
00803     // of 8. all but the last batch in a load will be greater then 8.
00804     batchRows = (nRow > 8) ? nRow & 0xfffffff8 : nRow;
00805 
00806     szCompressed = batchDirs[column].nVal*sizeof(uint16_t) +
00807                         (nBits[column]*nRow + LcsMaxSzLeftError * 8) / 8
00808                    + (batchDirs[column].oLastValHighMark - lastVal[column]);
00809 
00810     // the variable size batch does not have the bit vectors
00811     szVariable = batchDirs[column].nRow * sizeof(uint16_t)
00812                    + (batchDirs[column].oLastValHighMark - lastVal[column]);
00813 
00814     // calculate the size required for the non compressed fixed mode
00815     // add max left overs to allow left overs to be added back
00816     uint    leftOverSize;
00817     leftOverSize = LcsMaxLeftOver * sizeof(uint16_t) +
00818                     (3 * LcsMaxLeftOver + LcsMaxSzLeftError * 8) / 8
00819                        + LcsMaxLeftOver * recSize;
00820     szFixed = nRow * recSize + leftOverSize;
00821 
00822     szNonCompressed = std::min(szFixed, szVariable);
00823 
00824     // Should we store this block in one of the non-compressed modes (fixed or
00825     // variable)?  We do this if either
00826     //    1) the non-compressed size is smaller than the compressed size
00827     // or 2) we built the block in forced uncompress mode
00828     //
00829     // test if the compressed size is bigger then the non compressed size
00830 
00831     if ((fixed == bForceMode[column] || variable == bForceMode[column])
00832         || szCompressed > szNonCompressed) {
00833         // switch to one of the noncompressed modes
00834         *pValOffset = NULL;
00835         batchDirs[column].nVal = 0;
00836 
00837         forceModeCount[column]++;
00838 
00839         // If we are storing it in fixed mode...
00840         if (fixed == bForceMode[column] || szNonCompressed == szFixed) {
00841             // batch will be stored in fixed mode
00842             // change mode
00843             batchDirs[column].mode = LCS_FIXED;
00844 
00845             // Are we switching from variable mode to fixed mode?
00846             if (bForceMode[column] != fixed) {
00847                 // We are going to store the batch in fixed mode.  But currently
00848                 // it is saved in variable mode.  Save the value pointers into
00849                 // pValBank and we will use them later to help in conversion
00850 
00851                 // number of bytes taken by value over the batch high mark point
00852                 deltaVal = batchDirs[column].oLastValHighMark - lastVal[column];
00853 
00854                 // save the values obtained while this batch was in
00855                 // variable mode
00856                 if (deltaVal) {
00857                     memcpy(
00858                         pValBank[column],
00859                         pBlock[column] + lastVal[column],
00860                         deltaVal);
00861                 }
00862 
00863                 valBankStart[column] = 0;
00864 
00865                 // mark that for the next few times, automatically go to
00866                 // fixed mode without checking if it is the best
00867                 bForceMode[column] = fixed;
00868 
00869                 // Adjust szLeft since we have freed up some space in
00870                 // the block
00871                 assert(szVariable >= szFixed);
00872                 szLeft += (szVariable - szFixed);
00873                 assert(szLeft >= 0);
00874             } else {
00875                 valBankStart[column] = lastVal[column];
00876             }
00877 
00878             // Reclaim the space at the bottom of the block that
00879             // used to hold the values
00880 
00881             // first offset included in the bank
00882             oValBank[column] = lastVal[column];
00883             lastVal[column] = batchDirs[column].oLastValHighMark;
00884             nVal[column] = batchDirs[column].nValHighMark;
00885 
00886             // the number of bytes taken by the batch
00887             nByte = batchRows * batchDirs[column].recSize;
00888 
00889         } else {
00890             // batch will be stored in variable mode
00891 
00892             batchDirs[column].mode = LCS_VARIABLE;
00893 
00894             // the number of bytes taken by the batch
00895             nByte = batchRows*sizeof(uint16_t);
00896 
00897             // mark that for the next few times, automatically go to
00898             // fixed mode without checking if it is the best
00899             bForceMode[column] = variable;
00900         }
00901     } else {
00902         // batch will be stored in compressed mode
00903         // values will be put at the start of the new batch
00904 
00905         *pValOffset = (uint16_t *)(pBlock[column] + batchOffset[column]);
00906 
00907         // calculate the bit vector widthes
00908         iV = bitVecWidth(nBits[column], w);
00909 
00910         // nByte is the # bytes taken by the batch, it is the sum of
00911         // the bit vectors size and the value offsets
00912         nByte = sizeofBitVec(batchRows, iV, w) +
00913                 batchDirs[column].nVal * sizeof(uint16_t);
00914 
00915         // Adjust szLeft since we have freed up some space in the block
00916         assert(szVariable >= szCompressed);
00917         szLeft += (szVariable - szCompressed);
00918         assert(szLeft >= 0);
00919     }
00920 
00921     compressionMode = batchDirs[column].mode;
00922 
00923     // Slide down the batch directories to make room for new batch data (batch
00924     // directories occur after the batches).
00925     pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column] + nByte);
00926     memmove(
00927         pBatch,
00928         pBlock[column] + batchOffset[column],
00929         batchCount[column] * sizeof(LcsBatchDir));
00930 
00931     // adjust szLeft for the space used by the next batch to reflect only
00932     // its batch directory
00933     szLeft -= sizeof(LcsBatchDir);
00934     szLeft = std::max(szLeft, 0);
00935     assert(szLeft >= 0);
00936 
00937     // batchDirs[column].oVal points to where the batch dir used to be,
00938     // and this is where the batch records will start
00939     batchDirs[column].oVal = batchOffset[column];
00940 
00941     // set batchOffset[column] to point to the start of the batch
00942     // directores (if we have another batch then this will become the
00943     // offset of the new batch)
00944     batchOffset[column] = (batchOffset[column] + nByte);
00945 }
00946 
00947 // myCopy: like memcpy(), but optimized for case where source
00948 // and destination are the same (ie, when we have a single column
00949 // cluster and we are copying from the index block to the temporary
00950 // blocks, this will do nothing because the temp block just points
00951 // back to the index block)
00952 void myCopy(void* pDest, void* pSrc, uint sz)
00953 {
00954     if (pDest == pSrc) {
00955         return;
00956     } else {
00957         memcpy(pDest, pSrc, sz);
00958     }
00959 }
00960 
00961 RecordNum LcsClusterNodeWriter::moveFromIndexToTemp()
00962 {
00963     PLcsBatchDir pBatch;
00964     boost::scoped_array<uint16_t> batchDirOffset;
00965     uint16_t loc;
00966     uint column;
00967     uint batchCount = pHdr->nBatch / nClusterCols;
00968     uint b;
00969 
00970     batchDirOffset.reset(new uint16_t[pHdr->nBatch]);
00971 
00972     // First move the values
00973     //
00974     // copy values from index for all columns starting with the
00975     // 1st column in cluster.
00976     for (column = 0; column < nClusterCols; column++) {
00977         uint sz = firstVal[column] - lastVal[column];
00978         loc = (uint16_t) (szBlock - sz);
00979         myCopy(pBlock[column] + loc, pIndexBlock + lastVal[column], sz);
00980 
00981         // adjust lastVal and firstVal to offset in temporary block
00982         lastVal[column]  = loc;
00983         firstVal[column] = (uint16_t) szBlock;
00984     }
00985 
00986     // Next move the batches
00987 
00988     pBatch = (PLcsBatchDir)(pIndexBlock + pHdr->oBatch);
00989     for (column = 0; column < nClusterCols; column++) {
00990         uint i;
00991         loc = hdrSize;
00992 
00993         // move every batch for this column
00994         for (b = column, i = 0; i < batchCount; i++, b = b + nClusterCols) {
00995             uint16_t    batchStart = loc;
00996 
00997             if (pBatch[b].mode == LCS_COMPRESSED) {
00998                 uint8_t     *pBit;
00999                 WidthVec    w;      // bitVec m_width vector
01000                 PtrVec      p;      // bitVec offsets
01001                 uint        iV;     // # of bit vectors
01002                 uint        sizeOffsets, nBytes;
01003 
01004                 //copy offsets
01005                 sizeOffsets =  pBatch[b].nVal * sizeof(uint16_t);
01006                 myCopy(
01007                     pBlock[column] + loc, pIndexBlock + pBatch[b].oVal,
01008                     sizeOffsets);
01009 
01010                 // step past offsets
01011                 loc = (uint16_t) (loc + sizeOffsets);
01012 
01013                 // calculate the bit vector widthes
01014                 iV = bitVecWidth(calcWidth(pBatch[b].nVal), w);
01015 
01016                 // this is where the bit vectors start
01017                 pBit = pIndexBlock + pBatch[b].oVal + sizeOffsets;
01018 
01019                 // nByte are taken by the bit vectors
01020                 nBytes = bitVecPtr(pBatch[b].nRow, iV, w, p, pBit);
01021 
01022                 myCopy(pBlock[column] + loc, pBit, nBytes);
01023 
01024                 // step past bit vectors
01025                 loc = (uint16_t) (loc + nBytes);
01026             } else if (pBatch[b].mode == LCS_VARIABLE) {
01027                 uint        sizeOffsets;
01028 
01029                 sizeOffsets = pBatch[b].nRow * sizeof(uint16_t);
01030 
01031                 // variable size record batch
01032                 myCopy(
01033                     pBlock[column] + loc, pIndexBlock + pBatch[b].oVal,
01034                     sizeOffsets);
01035 
01036                 // step past offsets
01037                 loc = (uint16_t) (loc + sizeOffsets);
01038             } else {
01039                 // fixed mode batch
01040                 uint sizeFixed;
01041 
01042                 sizeFixed =  pBatch[b].nRow * pBatch[b].recSize;
01043                 // fixed size record batch
01044                 myCopy(
01045                     pBlock[column] + loc, pIndexBlock + pBatch[b].oVal,
01046                     sizeFixed);
01047 
01048                 //step past fixed records
01049                 loc = (uint16_t) (loc + sizeFixed);
01050             }
01051 
01052             // set offset where values start in temp block
01053             batchDirOffset[b] = batchStart;
01054         }
01055 
01056         // move batch directories for this column
01057 
01058         uint16_t  dirLoc;
01059         b = column;
01060         dirLoc = loc;
01061         batchOffset[column] = dirLoc;
01062 
01063         // move every batch for this column
01064         for (i = 0; i < batchCount; i++) {
01065             PLcsBatchDir pTempBatch = (PLcsBatchDir)(pBlock[column] + dirLoc);
01066             myCopy(pTempBatch, &pBatch[b], sizeof(LcsBatchDir));
01067 
01068             pTempBatch->oVal = batchDirOffset[b];
01069             // increment to next batch and next location in temp block
01070             b = b + nClusterCols;
01071             dirLoc += sizeof(LcsBatchDir);
01072         }
01073     }
01074 
01075     // compute the number of rows on the page
01076     pBatch = (PLcsBatchDir)(pIndexBlock + pHdr->oBatch);
01077     RecordNum nrows = 0;
01078     for (b = 0; b < pHdr->nBatch; b = b + nClusterCols) {
01079         nrows += pBatch[b].nRow;
01080     }
01081 
01082     batchDirOffset.reset();
01083     return nrows;
01084 }
01085 
01086 void LcsClusterNodeWriter::moveFromTempToIndex()
01087 {
01088     PLcsBatchDir pBatch;
01089     uint        sz, numBatches = batchCount[0];
01090     uint16_t    offset, loc;
01091     uint        column, b;
01092 
01093     // Copy values from temporary blocks for all columns starting with the
01094     // 1st column in cluster.
01095 
01096     for (offset = (uint16_t) szBlock, column = 0; column < nClusterCols;
01097         column++)
01098     {
01099         sz = szBlock - lastVal[column];
01100         myCopy(
01101             pIndexBlock + (offset - sz), pBlock[column] + lastVal[column], sz);
01102 
01103         //  set delta value to subtract from offsets to get relative offset
01104         delta[column] = (uint16_t)(szBlock - offset);
01105 
01106         // adjust firstVal and lastVal in the leaf block header to appropriate
01107         // offsets in index block (currently base on offsets in temporary block)
01108         firstVal[column] = offset;
01109         offset = (uint16_t) (offset - sz);
01110         lastVal[column] = offset;
01111     }
01112 
01113     // copy batch descriptors (which point to the batches)
01114 
01115     for (loc =  hdrSize, b = 0; b < numBatches; b++) {
01116         for (column = 0; column < nClusterCols; column++) {
01117             uint16_t    batchStart = loc;
01118 
01119             pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
01120 
01121             if (pBatch[b].mode == LCS_COMPRESSED) {
01122                 uint8_t     *pBit;
01123                 WidthVec    w;      // bitVec m_width vector
01124                 PtrVec      p;      // bitVec offsets
01125                 uint        iV;     // # of bit vectors
01126                 uint        sizeOffsets, nBytes;
01127 
01128                 sizeOffsets =  pBatch[b].nVal * sizeof(uint16_t);
01129 
01130                 // first copy offsets then bit vectors
01131                 myCopy(
01132                     pIndexBlock + loc, pBlock[column] + pBatch[b].oVal,
01133                     sizeOffsets);
01134 
01135                 // step past offsets
01136                 loc = (uint16_t) (loc + sizeOffsets);
01137 
01138                 // calculate the bit vector widthes
01139                 iV = bitVecWidth(calcWidth(pBatch[b].nVal), w);
01140 
01141                 // this is where the bit vectors start in temporary block
01142                 pBit = pBlock[column] + pBatch[b].oVal + sizeOffsets;
01143 
01144                 // nByte are taken by the bit vectors
01145                 nBytes = bitVecPtr(pBatch[b].nRow, iV, w, p, pBit);
01146 
01147                 myCopy(pIndexBlock + loc, pBit, nBytes);
01148 
01149                 // step past bit vectors
01150                 loc = (uint16_t)(loc + nBytes);
01151 
01152             } else if (pBatch[b].mode == LCS_VARIABLE) {
01153                 uint        sizeOffsets;
01154 
01155                 sizeOffsets =  pBatch[b].nRow * sizeof(uint16_t);
01156 
01157                 // variable size record batch
01158                 myCopy(
01159                     pIndexBlock + loc, pBlock[column] + pBatch[b].oVal,
01160                     sizeOffsets);
01161 
01162                 // step past offsets
01163                 loc = (uint16_t) (loc + sizeOffsets);
01164             } else {
01165                 // Fixed mode
01166                 uint sizeFixed;
01167 
01168                 sizeFixed =  pBatch[b].nRow * pBatch[b].recSize;
01169                 // fixed size record batch
01170                 myCopy(
01171                     pIndexBlock + loc, pBlock[column] + pBatch[b].oVal,
01172                     sizeFixed);
01173 
01174                 //step past fixed records
01175                 loc = (uint16_t) (loc + sizeFixed);
01176             }
01177 
01178             // set offset where values start in indexBlock
01179             pBatch[b].oVal = batchStart;
01180         }
01181     }
01182 
01183     //adjust batch count in leaf block header
01184     pHdr->nBatch = nClusterCols * numBatches;
01185 
01186     // start batch directory at end of last batch
01187     pHdr->oBatch = loc;
01188 
01189     // copy batch directories
01190     for (b = 0; b < numBatches; b++) {
01191         for (column = 0; column < nClusterCols; column++) {
01192             pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]);
01193             myCopy(pIndexBlock + loc, &pBatch[b], sizeof(LcsBatchDir));
01194             loc += sizeof(LcsBatchDir);
01195         }
01196     }
01197 
01198     if (isTracingLevel(TRACE_FINE)) {
01199         FENNEL_TRACE(
01200             TRACE_FINE, "Calling ClusterDump from moveFromTempToIndex");
01201         clusterDump->dump(opaqueToInt(clusterPageId), pHdr, szBlock);
01202     }
01203 }
01204 
01205 void LcsClusterNodeWriter::allocArrays()
01206 {
01207     // allocate arrays only if they have not been allocated already
01208     if (!arraysAllocated) {
01209         arraysAllocated = true;
01210 
01211         batchDirs.reset(new LcsBatchDir[nClusterCols]);
01212 
01213         pValBank.reset(new PBuffer[nClusterCols]);
01214 
01215         // allocate larger buffers for the individual pages in the value bank
01216 
01217         attrAccessors.reset(new UnalignedAttributeAccessor[nClusterCols]);
01218 
01219         for (uint col = 0; col < nClusterCols; col++) {
01220             bufferLock.allocatePage();
01221             pValBank[col] = bufferLock.getPage().getWritableData();
01222             // Similar to what's done in external sorter, we rely on the fact
01223             // that the underlying ScratchSegment keeps the page pinned for us.
01224             // The pages will be released when all other pages associated with
01225             // the ScratchSegment are released.
01226             bufferLock.unlock();
01227 
01228             attrAccessors[col].compute(colTupleDesc[col]);
01229         }
01230 
01231         valBankStart.reset(new uint16_t[nClusterCols]);
01232 
01233         forceModeCount.reset(new uint[nClusterCols]);
01234 
01235         bForceMode.reset(new ForceMode[nClusterCols]);
01236 
01237         oValBank.reset(new uint16_t[nClusterCols]);
01238 
01239         batchOffset.reset(new uint16_t[nClusterCols]);
01240 
01241         batchCount.reset(new uint[nClusterCols]);
01242 
01243         nBits.reset(new uint[nClusterCols]);
01244 
01245         nextWidthChange.reset(new uint[nClusterCols]);
01246 
01247         maxValueSize.reset(new uint[nClusterCols]);
01248     }
01249 
01250     memset(valBankStart.get(), 0, nClusterCols * sizeof(uint16_t));
01251     memset(forceModeCount.get(), 0, nClusterCols * sizeof(uint));
01252     memset(bForceMode.get(), 0, nClusterCols * sizeof(ForceMode));
01253     memset(oValBank.get(), 0, nClusterCols * sizeof(uint16_t));
01254     memset(batchOffset.get(), 0, nClusterCols * sizeof(uint16_t));
01255     memset(batchCount.get(), 0, nClusterCols * sizeof(uint));
01256     memset(nBits.get(), 0, nClusterCols * sizeof(uint));
01257     memset(nextWidthChange.get(), 0, nClusterCols * sizeof(uint));
01258     memset(maxValueSize.get(), 0, nClusterCols * sizeof(uint));
01259 }
01260 
01261 
01262 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $");
01263 
01264 // End LcsClusterNodeWriter.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1