LcsClusterAppendExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00024 #include "fennel/lucidera/colstore/LcsClusterNode.h"
00025 #include "fennel/exec/ExecStreamBufAccessor.h"
00026 #include "fennel/btree/BTreeWriter.h"
00027 #include <boost/scoped_array.hpp>
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $");
00030 
00031 void LcsClusterAppendExecStream::prepare(
00032     LcsClusterAppendExecStreamParams const &params)
00033 {
00034     BTreeExecStream::prepare(params);
00035     ConduitExecStream::prepare(params);
00036 
00037     tableColsTupleDesc = pInAccessor->getTupleDesc();
00038     initTupleLoadParams(params.inputProj);
00039 
00040     // setup descriptors, accessors and data to access only the columns
00041     // for this cluster, based on the input projection
00042 
00043     pInAccessor->bindProjection(params.inputProj);
00044 
00045     // setup bufferLock to access temporary large page blocks
00046 
00047     scratchAccessor = params.scratchAccessor;
00048     bufferLock.accessSegment(scratchAccessor);
00049 
00050     // The output stream from the loader is either a single column representing
00051     // the number of rows loaded or two columns -- number of rows loaded and
00052     // starting rid value.  The latter applies when there are
00053     // downstream create indexes
00054 
00055     TupleDescriptor outputTupleDesc;
00056 
00057     outputTupleDesc = pOutAccessor->getTupleDesc();
00058     outputTuple.compute(outputTupleDesc);
00059     outputTuple[0].pData = (PConstBuffer) &numRowCompressed;
00060     if (outputTupleDesc.size() > 1) {
00061         outputTuple[1].pData = (PConstBuffer) &startRow;
00062     }
00063 
00064     outputTupleAccessor = & pOutAccessor->getScratchTupleAccessor();
00065 
00066     blockSize = treeDescriptor.segmentAccessor.pSegment->getUsablePageSize();
00067 
00068 }
00069 
00070 void LcsClusterAppendExecStream::initTupleLoadParams(
00071     const TupleProjection &inputProj)
00072 {
00073     numColumns = inputProj.size();
00074     clusterColsTupleDesc.projectFrom(tableColsTupleDesc, inputProj);
00075     clusterColsTupleData.compute(clusterColsTupleDesc);
00076 
00077     // setup one tuple descriptor per cluster column
00078     colTupleDesc.reset(new TupleDescriptor[numColumns]);
00079     for (int i = 0; i < numColumns; i++) {
00080         colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i]]);
00081     }
00082 }
00083 
00084 void LcsClusterAppendExecStream::getResourceRequirements(
00085     ExecStreamResourceQuantity &minQuantity,
00086     ExecStreamResourceQuantity &optQuantity)
00087 {
00088     ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00089 
00090     // REVIEW --
00091     // 4 pages per cluster column
00092     // - 1 for indexBlock
00093     // - 1 for rowBlock
00094     // - 1 for hash,
00095     // - 1 for value bank
00096     // 5 pages for btree and then 1 for cluster page
00097     minQuantity.nCachePages += (numColumns * 4) + 6;
00098 
00099     // TODO
00100     optQuantity = minQuantity;
00101 }
00102 
00103 void LcsClusterAppendExecStream::open(bool restart)
00104 {
00105     BTreeExecStream::open(restart);
00106     ConduitExecStream::open(restart);
00107 
00108     if (!restart) {
00109         outputTupleBuffer.reset(
00110             new FixedBuffer[outputTupleAccessor->getMaxByteCount()]);
00111     }
00112 
00113     init();
00114     isDone = false;
00115 }
00116 
00117 ExecStreamResult LcsClusterAppendExecStream::execute(
00118         ExecStreamQuantum const &quantum)
00119 {
00120     return compress(quantum);
00121 }
00122 
00123 void LcsClusterAppendExecStream::closeImpl()
00124 {
00125     BTreeExecStream::closeImpl();
00126     ConduitExecStream::closeImpl();
00127     outputTupleBuffer.reset();
00128     close();
00129 }
00130 
00131 void LcsClusterAppendExecStream::init()
00132 {
00133     pIndexBlock = 0;
00134     firstRow = LcsRid(0);
00135     lastRow = LcsRid(0);
00136     startRow = LcsRid(0);
00137     rowCnt = 0;
00138     indexBlockDirty = false;
00139     arraysAlloced = false;
00140     compressCalled = false;
00141     numRowCompressed = 0;
00142 }
00143 
00144 ExecStreamResult LcsClusterAppendExecStream::compress(
00145     ExecStreamQuantum const &quantum)
00146 {
00147     uint i, j, k;
00148     bool canFit = false;
00149     bool undoInsert = false;
00150 
00151     if (isDone) {
00152         // already returned final result
00153         pOutAccessor->markEOS();
00154         return EXECRC_EOS;
00155     }
00156 
00157     for (i = 0; i < quantum.nTuplesMax; i++) {
00158         // if we have finished processing the previous row, retrieve
00159         // the next cluster tuple and then convert the columns in the
00160         // cluster into individual tuples, one per cluster column
00161         ExecStreamResult rc = getTupleForLoad();
00162 
00163         // no more input; produce final row count
00164         if (rc == EXECRC_EOS) {
00165             // since we're done adding rows to the index, write the last batch
00166             // and block
00167             if (rowCnt) {
00168                 // if rowCnt < 8 or a multiple of 8, force writeBatch to
00169                 // treat this as the last batch
00170                 if (rowCnt < 8 || (rowCnt % 8) == 0) {
00171                     writeBatch(true);
00172                 } else {
00173                     writeBatch(false);
00174                 }
00175             }
00176 
00177             // Write out the last block and then free up resources
00178             // rather than waiting until stream close. This will keep
00179             // resource usage window smaller and avoid interference with
00180             // downstream processing such as writing to unclustered indexes.
00181             writeBlock();
00182             if (lcsBlockBuilder) {
00183                 lcsBlockBuilder->close();
00184             }
00185             close();
00186 
00187             // outputTuple was already initialized to point to numRowCompressed/
00188             // startRow in prepare()
00189             // Write a single outputTuple(numRowCompressed, [startRow])
00190             // and indicate OVERFLOW.
00191 
00192             outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get());
00193             pOutAccessor->provideBufferForConsumption(
00194                 outputTupleBuffer.get(),
00195                 outputTupleBuffer.get() +
00196                     outputTupleAccessor->getCurrentByteCount());
00197 
00198             isDone = true;
00199             return EXECRC_BUF_OVERFLOW;
00200         } else if (rc != EXECRC_YIELD) {
00201             return rc;
00202         }
00203 
00204         // Go through each column value for current row and insert it.
00205         // If we run out of space then rollback all the columns that
00206         // I already inserted.
00207         undoInsert = false;
00208 
00209         for (j = 0; j < numColumns; j++) {
00210             hash[j].insert(
00211                 clusterColsTupleData[j], &hashValOrd[j], &undoInsert);
00212 
00213             if (undoInsert) {
00214                 // rollback cluster columns already inserted
00215                 // j has not been incremented yet, so the condition should be
00216                 //     k <= j
00217                 for (k = 0; k <= j; k++) {
00218                     hash[k].undoInsert(clusterColsTupleData[k]);
00219                 }
00220                 break;
00221             }
00222         }
00223 
00224         // Was there enough space to add this row?  Note that the Insert()
00225         // calls above accounted for the space needed by addValueOrdinal()
00226         // below, so we don't have to worry about addValueOrdinal() running
00227         // out of space
00228         if (!undoInsert) {
00229             canFit = true;
00230         } else {
00231             canFit = false;
00232         }
00233 
00234         if (canFit) {
00235             // Add the pointers from the batch to the data values
00236             for (j = 0; j < numColumns; j++) {
00237                 addValueOrdinal(j, hashValOrd[j].getValOrd());
00238             }
00239 
00240             rowCnt++;
00241 
00242             // if reach max rows that can fit in row array then write batch
00243             if (isRowArrayFull()) {
00244                 writeBatch(false);
00245             }
00246         } else {
00247             // since we can't fit anymore values write out current batch
00248             writeBatch(false);
00249 
00250             // restart using last value retrieved from stream because it
00251             // could not fit in the batch; by continuing we can avoid
00252             // a goto to jump back to the top of this for loop at the
00253             // expense of a harmless increment of the quantum
00254             continue;
00255         }
00256 
00257         // only consume the tuple after we know the row can fit
00258         // on the current page
00259         postProcessTuple();
00260     }
00261 
00262     return EXECRC_QUANTUM_EXPIRED;
00263 }
00264 
00265 ExecStreamResult LcsClusterAppendExecStream::getTupleForLoad()
00266 {
00267     if (pInAccessor->getState() == EXECBUF_EOS) {
00268         return EXECRC_EOS;
00269     }
00270 
00271     if (!pInAccessor->demandData()) {
00272         return EXECRC_BUF_UNDERFLOW;
00273     }
00274 
00275     // Initialize the load, only after we know we have input available
00276     initLoad();
00277 
00278     if (!pInAccessor->isTupleConsumptionPending()) {
00279         pInAccessor->unmarshalProjectedTuple(clusterColsTupleData);
00280     }
00281 
00282     return EXECRC_YIELD;
00283 }
00284 
00285 void LcsClusterAppendExecStream::initLoad()
00286 {
00287     // If this is the first time this method is called, then
00288     // start a new block (for the new table), or load the last block
00289     // (of the existing table).  We do this here rather than in
00290     // init() because for INSERT into T as SELECT * from T,
00291     // we need to make sure that we extract all the data from
00292     // T before modifying the blocks there; hence that's why this
00293     // method should not be called until there is input available.
00294     // Use the boolean to ensure that initialization of cluster page
00295     // is only done once.
00296 
00297     if (!compressCalled) {
00298         compressCalled = true;
00299 
00300         // The dynamic allocated memory in lcsBlockBuilder is allocated for
00301         // every LcsClusterAppendExecStream.open() and deallocated for every
00302         // LcsClusterAppendExecStream.closeImpl(). The dynamic memory is not
00303         // reused across calls(e.g. when issueing the same statement twice).
00304         lcsBlockBuilder = SharedLcsClusterNodeWriter(
00305             new LcsClusterNodeWriter(
00306                 treeDescriptor,
00307                 scratchAccessor,
00308                 clusterColsTupleDesc,
00309                 getSharedTraceTarget(),
00310                 getTraceSourceName()));
00311 
00312         allocArrays();
00313 
00314         // get blocks from cache to use as temporary space and initialize arrays
00315         for (uint i = 0; i < numColumns; i++) {
00316             bufferLock.allocatePage();
00317             rowBlock[i] = bufferLock.getPage().getWritableData();
00318             bufferLock.unlock();
00319 
00320             bufferLock.allocatePage();
00321             hashBlock[i] = bufferLock.getPage().getWritableData();
00322             bufferLock.unlock();
00323 
00324             bufferLock.allocatePage();
00325             builderBlock[i] = bufferLock.getPage().getWritableData();
00326             bufferLock.unlock();
00327 
00328             hash[i].init(
00329                 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize);
00330         }
00331 
00332         nRowsMax = blockSize / sizeof(uint16_t);
00333 
00334         // if the index exists, get last block written
00335 
00336         PLcsClusterNode pExistingIndexBlock;
00337 
00338         bool found = getLastBlock(pExistingIndexBlock);
00339         if (found) {
00340             // indicate we are updating a leaf
00341             pIndexBlock = pExistingIndexBlock;
00342 
00343             // extract rows and values from last batch so we can
00344             // add to it.
00345             loadExistingBlock();
00346         } else {
00347             // Start writing a new block
00348             startNewBlock();
00349             startRow = LcsRid(0);
00350         }
00351     }
00352 }
00353 
00354 void LcsClusterAppendExecStream::postProcessTuple()
00355 {
00356     pInAccessor->consumeTuple();
00357     numRowCompressed++;
00358 }
00359 
00360 void LcsClusterAppendExecStream::close()
00361 {
00362     if (scratchAccessor.pSegment) {
00363         scratchAccessor.pSegment->deallocatePageRange(
00364             NULL_PAGE_ID, NULL_PAGE_ID);
00365     }
00366     rowBlock.reset();
00367     hashBlock.reset();
00368     builderBlock.reset();
00369 
00370     hash.reset();
00371     hashValOrd.reset();
00372     tempBuf.reset();
00373     maxValueSize.reset();
00374 
00375     lcsBlockBuilder.reset();
00376 }
00377 
00378 void LcsClusterAppendExecStream::startNewBlock()
00379 {
00380     firstRow = lastRow;
00381 
00382     // Get a new cluster page from the btree segment
00383     pIndexBlock = lcsBlockBuilder->allocateClusterPage(firstRow);
00384 
00385     // Reset index block and block builder.
00386     lcsBlockBuilder->init(
00387         numColumns, reinterpret_cast<uint8_t *> (pIndexBlock),
00388         builderBlock.get(), blockSize);
00389 
00390     // reset Hashes
00391     for (uint i = 0; i < numColumns; i++) {
00392         hash[i].init(
00393             hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize);
00394     }
00395 
00396     // reset row count
00397     // NOTE:  if the rowCnt is less than eight then we know we are carrying
00398     //        over rows from previous block because the count did not end
00399     //        on a boundary of 8
00400     if (rowCnt >= 8) {
00401         rowCnt = 0;
00402     }
00403     indexBlockDirty = false;
00404 
00405     // Start writing a new block.
00406     lcsBlockBuilder->openNew(firstRow);
00407 }
00408 
00409 bool LcsClusterAppendExecStream::getLastBlock(PLcsClusterNode &pBlock)
00410 {
00411     if (!lcsBlockBuilder->getLastClusterPageForWrite(pBlock, firstRow)) {
00412         return false;
00413     } else {
00414         return true;
00415     }
00416 }
00417 
00418 void LcsClusterAppendExecStream::loadExistingBlock()
00419 {
00420     boost::scoped_array<uint> numVals;      // number of values in block
00421     boost::scoped_array<uint16_t> lastValOff;
00422     boost::scoped_array<boost::scoped_array<FixedBuffer> > aLeftOverBufs;
00423                                             // array of buffers to hold
00424                                             // rolled back data for each
00425                                             // column
00426     uint anLeftOvers;                       // number of leftover rows for
00427                                             // each col; since the value is
00428                                             // same for every column, no need
00429                                             // for this to be an array
00430     boost::scoped_array<uint> aiFixedSize;  // how much space was used for
00431                                             // each column; should be
00432                                             // equal for each value in a column
00433     LcsHashValOrd vOrd;
00434 
00435     uint i, j;
00436     RecordNum startRowCnt;
00437     RecordNum nrows;
00438 
00439     lcsBlockBuilder->init(
00440         numColumns, reinterpret_cast<uint8_t *> (pIndexBlock),
00441         builderBlock.get(), blockSize);
00442 
00443     lastValOff.reset(new uint16_t[numColumns]);
00444     numVals.reset(new uint[numColumns]);
00445 
00446     // REVIEW jvs 28-Nov-2005:  A simpler approach to this whole problem
00447     // might be to pretend we were starting an entirely new block,
00448     // use an LcsClusterReader to read the old one logically and append
00449     // the old rows into the new block, and then carry on from there with
00450     // the new rows.
00451 
00452     // Append to an existing cluster page.  Set the last rowid based on
00453     // the first rowid and the number of rows currently on the page.
00454     // As rows are "rolled back", lastRow is decremented accordingly
00455 
00456     bool bStartNewBlock =
00457         lcsBlockBuilder->openAppend(numVals.get(), lastValOff.get(), nrows);
00458     lastRow = firstRow + nrows;
00459     startRow = lastRow;
00460 
00461     // Setup structures to hold rolled back information
00462     aiFixedSize.reset(new uint[numColumns]);
00463     aLeftOverBufs.reset(new boost::scoped_array<FixedBuffer>[numColumns]);
00464 
00465     startRowCnt = rowCnt;
00466 
00467     // Rollback the final batch for each column
00468     // We need to rollback all
00469     // the batches before we can start the new batches because
00470     //  1) in openAppend() we adjust m_szLeft to not include space
00471     //     for numColumns * sizeof(RIBatch).  So if the
00472     //     block was full, then m_szLeft would be negative,
00473     //     since we decreased it by numColumns * sizeof(LcsBatch)
00474     //  2) the rollback code will add sizeof(LcsBatch) to szLeft
00475     //                      for each batch it rolls back
00476     //  3) the code to add values to a batch gets upset if
00477     //                      szLeft < 0
00478     for (i = 0; i < numColumns; i++) {
00479         //reset everytime through loop
00480         rowCnt = startRowCnt;
00481         lcsBlockBuilder->describeLastBatch(i, anLeftOvers, aiFixedSize[i]);
00482 
00483         // if we have left overs from the last batch (ie. batch did not end on
00484         // an 8 boundary), rollback and store in temporary mem
00485         // aLeftOverBufs[i]
00486         if (anLeftOvers > 0) {
00487             aLeftOverBufs[i].reset(
00488                 new FixedBuffer[anLeftOvers * aiFixedSize[i]]);
00489             lcsBlockBuilder->rollBackLastBatch(i, aLeftOverBufs[i].get());
00490             indexBlockDirty = true;
00491         }
00492     }
00493 
00494     // Decrement lastRow if there was a rollback of the last batch
00495     lastRow -= anLeftOvers;
00496 
00497     // If the last page is already full, then write it out and start a new one
00498     if (bStartNewBlock) {
00499         writeBlock();
00500         startNewBlock();
00501     }
00502 
00503     // Start a new batch for each column.
00504     for (i = 0; i < numColumns; i++) {
00505         //reset everytime through loop
00506         rowCnt = startRowCnt;
00507 
00508         if (!bStartNewBlock) {
00509             // Repopulate the hash table with the values already in the
00510             // data segment at the bottom of the block (because we didn't
00511             // roll back these values, we only roll back the pointers to
00512             // these values).  But only do this if we haven't started a new
00513             // block.
00514             hash[i].restore(numVals[i], lastValOff[i]);
00515         }
00516 
00517         // if we had left overs from the last batch, start a new batch
00518         // NOTE: we are guaranteed to be able to add these values back
00519         // to the current block
00520         if (anLeftOvers > 0) {
00521             uint8_t *val;
00522             bool undoInsert = false;
00523 
00524             // There is a very small probability that when the hash is
00525             // restored, using the existing values in the block, that the
00526             // hash will be full and some of the left over values
00527             // can not be stored in the hash.  If this is true then clear
00528             // the hash.
00529             if (hash[i].isHashFull(anLeftOvers)) {
00530                 hash[i].startNewBatch(anLeftOvers);
00531             }
00532 
00533             for (j = 0, val = aLeftOverBufs[i].get();
00534                 j < anLeftOvers;
00535                 j++, val += aiFixedSize[i])
00536             {
00537                 hash[i].insert(val, &vOrd, &undoInsert);
00538 
00539                 //If we have left overs they should fit in the block
00540                 assert(!undoInsert);
00541                 addValueOrdinal(i, vOrd.getValOrd());
00542                 rowCnt++;
00543             }
00544         }
00545     }
00546 }
00547 
00548 void LcsClusterAppendExecStream::addValueOrdinal(uint column, uint16_t vOrd)
00549 {
00550     uint16_t *rowWordArray = (uint16_t *) rowBlock[column];
00551     rowWordArray[rowCnt] = vOrd;
00552 
00553     // since we added a row mark block as dirty
00554     indexBlockDirty = true;
00555 }
00556 
00557 bool LcsClusterAppendExecStream::isRowArrayFull()
00558 {
00559     if (rowCnt >= nRowsMax) {
00560         return true;
00561     } else {
00562         return false;
00563     }
00564 }
00565 
00566 void LcsClusterAppendExecStream::writeBatch(bool lastBatch)
00567 {
00568     uint16_t *oVals;
00569     uint leftOvers;
00570     PBuffer val;
00571     LcsBatchMode mode;
00572     uint i, j;
00573     uint origRowCnt, count = 0;
00574 
00575     lastRow += rowCnt;
00576 
00577     for (origRowCnt = rowCnt, i = 0; i < numColumns; i++) {
00578         rowCnt = origRowCnt;
00579 
00580         // save max value size so we can read leftovers
00581         maxValueSize[i] = hash[i].getMaxValueSize();
00582 
00583         // Pick which compression mode to use (fixed, variable, or compressed)
00584         lcsBlockBuilder->pickCompressionMode(
00585             i, maxValueSize[i], rowCnt, &oVals, mode);
00586         leftOvers = rowCnt > 8 ? rowCnt % 8 : 0;
00587 
00588         // all batches must end on an eight boundary so we move
00589         // values over eight boundary to the next batch.
00590         // if there are leftOvers or if the there are less than
00591         // eight values in this batch allocate buffer to store
00592         // values to be written to next batch
00593         if (leftOvers) {
00594             tempBuf[i].reset(new FixedBuffer[leftOvers * maxValueSize[i]]);
00595             count = leftOvers;
00596 
00597         } else if (origRowCnt < 8) {
00598             tempBuf[i].reset(new FixedBuffer[origRowCnt * maxValueSize[i]]);
00599             count = origRowCnt;
00600         } else {
00601             // no values to write to next batch (ie on boundary of 8)
00602             tempBuf[i].reset();
00603         }
00604 
00605         // Write out the batch and collect the leftover rows in tempBuf
00606         if (LCS_FIXED == mode || LCS_VARIABLE == mode) {
00607             hash[i].prepareFixedOrVariableBatch(
00608                 (PBuffer) rowBlock[i], rowCnt);
00609             lcsBlockBuilder->putFixedVarBatch(
00610                 i, (uint16_t *) rowBlock[i], tempBuf[i].get());
00611             if (mode == LCS_FIXED) {
00612                 hash[i].clearFixedEntries();
00613             }
00614 
00615         } else {
00616             uint16_t numVals;
00617 
00618             // write orderVals to oVals and remap val ords in row array
00619             hash[i].prepareCompressedBatch(
00620                 (PBuffer) rowBlock[i], rowCnt, (uint16_t *) &numVals, oVals);
00621             lcsBlockBuilder->putCompressedBatch(
00622                 i, (PBuffer) rowBlock[i], tempBuf[i].get());
00623         }
00624 
00625         // setup next batch
00626         rowCnt = 0;
00627         hash[i].startNewBatch(!lastBatch ? count : 0);
00628     }
00629 
00630     //compensate for left over and rolled back rows
00631     if (!lastBatch) {
00632         lastRow -= count;
00633     }
00634     bool bStartNewBlock;
00635     bStartNewBlock = false;
00636 
00637     // If we couldn't even fit 8 values into the batch (and this is not the
00638     // final batch), then the block must be full.  putCompressedBatch()/
00639     // putFixedVarBatch() assumed that this was the last batch, so they wrote
00640     // out these rows in a small batch.  Roll back the entire batch (putting
00641     // rolled back results in tempBuf) and move to next block
00642     if (!lastBatch && origRowCnt < 8) {
00643         // rollback each batch
00644         for (i = 0; i < numColumns; i++) {
00645             lcsBlockBuilder->rollBackLastBatch(i, tempBuf[i].get());
00646         }
00647         bStartNewBlock = true;
00648     }
00649 
00650     // Should we move to a new block?  Move if
00651     //    (a) bStartNewBlock (we need to move just to write the current batch)
00652     // or (b) lcsBlockBuilder->isEndOfBlock() (there isn't room to even start
00653     // the next batch)
00654     if (bStartNewBlock || (!lastBatch && lcsBlockBuilder->isEndOfBlock())) {
00655         writeBlock();
00656         startNewBlock();
00657     }
00658 
00659     // Add leftOvers or rolled back values to new batch
00660     if (!lastBatch) {
00661         for (i = 0; i < numColumns; i++) {
00662             rowCnt = 0;
00663             for (j = 0, val = tempBuf[i].get(); j < count; j++) {
00664                 LcsHashValOrd vOrd;
00665                 bool undoInsert = false;
00666 
00667                 hash[i].insert(val, &vOrd, &undoInsert);
00668 
00669                 // If we have leftovers they should fit in the current block
00670                 // (because we moved to a new block above, if it was necessary)
00671                 assert(!undoInsert);
00672                 addValueOrdinal(i, vOrd.getValOrd());
00673                 rowCnt++;
00674                 val += maxValueSize[i];
00675             }
00676         }
00677     }
00678 
00679     for (i = 0; i < numColumns; i++) {
00680         if (tempBuf[i].get()) {
00681             tempBuf[i].reset();
00682         }
00683     }
00684 }
00685 
00686 void LcsClusterAppendExecStream::writeBlock()
00687 {
00688     if (indexBlockDirty) {
00689         // If the rowCnt is not zero, then the last batch was not on
00690         // a boundary of 8 so we need to write the last batch
00691         if (rowCnt) {
00692             writeBatch(true);
00693 
00694             // REVIEW jvs 28-Nov-2005:  it must be possible to eliminate
00695             // this circularity between writeBlock and writeBatch.
00696 
00697             // Handle corner case. writeBatch may have written this block
00698             // to the btree
00699             if (!indexBlockDirty) {
00700                 return;
00701             }
00702         }
00703 
00704         // Tell block builder we are done so it can wrap up writing to the
00705         // index block
00706         lcsBlockBuilder->endBlock();
00707 
00708         // Dump out the page contents to trace if appropriate
00709 
00710         indexBlockDirty = false;
00711     }
00712 }
00713 
00714 void LcsClusterAppendExecStream::allocArrays()
00715 {
00716     // allocate arrays only if they have not been allocated already
00717     if (arraysAlloced) {
00718         return;
00719     }
00720     arraysAlloced = true;
00721 
00722     // instantiate hashes
00723     hash.reset(new LcsHash[numColumns]);
00724 
00725     // allocate pointers for row, hash blocks, other arrays
00726     rowBlock.reset(new PBuffer[numColumns]);
00727     hashBlock.reset(new PBuffer[numColumns]);
00728 
00729     builderBlock.reset(new PBuffer[numColumns]);
00730 
00731     hashValOrd.reset(new LcsHashValOrd[numColumns]);
00732     tempBuf.reset(new boost::scoped_array<FixedBuffer>[numColumns]);
00733     maxValueSize.reset(new uint[numColumns]);
00734 }
00735 
00736 ExecStreamBufProvision
00737     LcsClusterAppendExecStream::getOutputBufProvision() const
00738 {
00739     return BUFPROV_PRODUCER;
00740 }
00741 
00742 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $");
00743 
00744 // End LcsClusterAppendExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1