LbmSplicerExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmSplicerExecStream.cpp#25 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 LucidEra, Inc.
00005 // Copyright (C) 2006-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/common/FennelResource.h"
00024 #include "fennel/segment/SegmentFactory.h"
00025 #include "fennel/btree/BTreeBuilder.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/lucidera/bitmap/LbmGeneratorExecStream.h"
00028 #include "fennel/lucidera/bitmap/LbmSplicerExecStream.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmSplicerExecStream.cpp#25 $");
00031 
00032 void LbmSplicerExecStream::prepare(LbmSplicerExecStreamParams const &params)
00033 {
00034     DiffluenceExecStream::prepare(params);
00035     scratchAccessor = params.scratchAccessor;
00036 
00037     // Setup btree accessed by splicer
00038     assert(params.bTreeParams.size() <= 2);
00039     assert(params.bTreeParams[0].pRootMap == NULL);
00040     BTreeExecStream::copyParamsToDescriptor(
00041         writeBTreeDesc,
00042         params.bTreeParams[0],
00043         params.pCacheAccessor);
00044 
00045     insertRowCountParamId = params.insertRowCountParamId;
00046     computeRowCount = (opaqueToInt(insertRowCountParamId) == 0);
00047     writeRowCountParamId = params.writeRowCountParamId;
00048 
00049     bitmapTupleDesc = writeBTreeDesc.tupleDescriptor;
00050     bTreeTupleData.compute(bitmapTupleDesc);
00051     tempBTreeTupleData.compute(bitmapTupleDesc);
00052     inputTuple.compute(bitmapTupleDesc);
00053     nIdxKeys = writeBTreeDesc.keyProjection.size() - 1;
00054 
00055     // if the rowcount needs to be computed, then the input contains singleton
00056     // rids; so setup a special tupleData to receive that input
00057     if (computeRowCount) {
00058         assert(nIdxKeys == 0);
00059         assert(pInAccessor->getTupleDesc().size() == 1);
00060         singletonTuple.compute(pInAccessor->getTupleDesc());
00061     } else {
00062         assert(
00063             writeBTreeDesc.tupleDescriptor == pInAccessor->getTupleDesc());
00064     }
00065 
00066     uint minEntrySize;
00067     LbmEntry::getSizeBounds(
00068         bitmapTupleDesc,
00069         writeBTreeDesc.segmentAccessor.pSegment->getUsablePageSize(),
00070         minEntrySize,
00071         maxEntrySize);
00072 
00073     // setup output tuple
00074     outputTuple.compute(outAccessors[0]->getTupleDesc());
00075     outputTuple[0].pData = (PConstBuffer) &numRowsLoaded;
00076     assert(outputTuple.size() == 1);
00077 
00078     // constraint checking
00079     uniqueKey = false;
00080     if (params.bTreeParams.size() >= 2) {
00081         uniqueKey = true;
00082         BTreeExecStream::copyParamsToDescriptor(
00083             deletionBTreeDesc,
00084             params.bTreeParams[1],
00085             params.pCacheAccessor);
00086         deletionTuple.compute(deletionBTreeDesc.tupleDescriptor);
00087 
00088         TupleDescriptor currUniqueKeyDesc;
00089         for (uint i = 0; i < nIdxKeys; i++) {
00090             currUniqueKeyDesc.push_back(bitmapTupleDesc[i]);
00091         }
00092         currUniqueKey.computeAndAllocate(currUniqueKeyDesc);
00093 
00094         // setup violation output
00095         if (outAccessors.size() > 1) {
00096             violationAccessor = outAccessors[1];
00097             violationTuple.compute(violationAccessor->getTupleDesc());
00098         }
00099     }
00100 
00101     createNewIndex = params.createNewIndex;
00102     // If the index is going to be dynamically created, save the original
00103     // root pageId of the index so we can use it later to version the
00104     // index root page.
00105     if (createNewIndex) {
00106         origRootPageId = writeBTreeDesc.rootPageId;
00107     }
00108 }
00109 
00110 void LbmSplicerExecStream::open(bool restart)
00111 {
00112     DiffluenceExecStream::open(restart);
00113 
00114     if (!restart) {
00115         bitmapBuffer.reset(new FixedBuffer[maxEntrySize]);
00116         mergeBuffer.reset(new FixedBuffer[maxEntrySize]);
00117         pCurrentEntry = SharedLbmEntry(new LbmEntry());
00118         pCurrentEntry->init(
00119             bitmapBuffer.get(), mergeBuffer.get(), maxEntrySize,
00120             bitmapTupleDesc);
00121 
00122         newIndexCreated = false;
00123         emptyTable = false;
00124         emptyTableUnknown = true;
00125         bTreeWriter = SharedBTreeWriter(
00126             new BTreeWriter(writeBTreeDesc, scratchAccessor, emptyTable));
00127         bTreeWriterMoved = true;
00128 
00129         if (opaqueToInt(writeRowCountParamId) > 0) {
00130             pDynamicParamManager->createParam(
00131                 writeRowCountParamId,
00132                 outAccessors[0]->getTupleDesc()[0]);
00133         }
00134 
00135         if (uniqueKey) {
00136             SharedBTreeReader deletionBTreeReader = SharedBTreeReader(
00137                 new BTreeReader(deletionBTreeDesc));
00138             deletionReader.init(deletionBTreeReader, deletionTuple);
00139         }
00140 
00141         // If the index is going to be dynamically created, the underlying
00142         // segment associated with the index needs to be a snapshot segment.
00143         // Retrieve the snapshot segment.  This needs to be done at open time
00144         // because the segment changes across transaction boundaries.
00145         if (createNewIndex) {
00146             pSnapshotSegment =
00147                 SegmentFactory::getSnapshotSegment(
00148                     writeBTreeDesc.segmentAccessor.pSegment);
00149             assert(pSnapshotSegment != NULL);
00150         }
00151     }
00152     isDone = false;
00153     currEntry = false;
00154     currExistingEntry = false;
00155     numRowsLoaded = 0;
00156 
00157     currValidation = false;
00158     firstValidation = true;
00159 }
00160 
00161 void LbmSplicerExecStream::getResourceRequirements(
00162     ExecStreamResourceQuantity &minQuantity,
00163     ExecStreamResourceQuantity &optQuantity)
00164 {
00165     DiffluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00166 
00167     // btree pages
00168     minQuantity.nCachePages += 5;
00169     if (uniqueKey) {
00170         minQuantity.nCachePages += 5;
00171     }
00172 
00173     optQuantity = minQuantity;
00174 }
00175 
00176 bool LbmSplicerExecStream::isEmpty()
00177 {
00178     if (emptyTableUnknown) {
00179         if (bTreeWriter->searchFirst() == false) {
00180             bTreeWriter->endSearch();
00181             emptyTable = true;
00182             // switch writer to monotonic now that we know the table
00183             // is empty
00184             bTreeWriter.reset();
00185             bTreeWriter = SharedBTreeWriter(
00186                 new BTreeWriter(writeBTreeDesc, scratchAccessor, true));
00187         } else {
00188             emptyTable = false;
00189         }
00190         emptyTableUnknown = false;
00191     }
00192     return emptyTable;
00193 }
00194 
00195 ExecStreamResult LbmSplicerExecStream::execute(ExecStreamQuantum const &quantum)
00196 {
00197     if (isDone) {
00198         // Version the index roots if the index was dynamically created
00199         if (newIndexCreated) {
00200             pSnapshotSegment->versionPage(
00201                 origRootPageId,
00202                 writeBTreeDesc.rootPageId);
00203         }
00204 
00205         for (uint i = 0; i < outAccessors.size(); i++) {
00206             outAccessors[i]->markEOS();
00207         }
00208         return EXECRC_EOS;
00209     }
00210 
00211     // no more input; write out last bitmap entry and produce final row count
00212     // which is either stored in a dynamic parameter set upstream or is
00213     // computed by splicer
00214 
00215     if (pInAccessor->getState() == EXECBUF_EOS) {
00216         if (currEntry) {
00217             insertBitmapEntry();
00218         }
00219         if (!computeRowCount) {
00220             numRowsLoaded = *reinterpret_cast<RecordNum const *>(
00221                 pDynamicParamManager->getParam(
00222                     insertRowCountParamId).getDatum().pData);
00223         }
00224         if (opaqueToInt(writeRowCountParamId) > 0) {
00225             TupleDatum rowCountDatum;
00226             rowCountDatum.pData = (PConstBuffer) &numRowsLoaded;
00227             rowCountDatum.cbData = sizeof(numRowsLoaded);
00228             pDynamicParamManager->writeParam(
00229                 writeRowCountParamId,
00230                 rowCountDatum);
00231         }
00232         bool rc = outAccessors[0]->produceTuple(outputTuple);
00233         assert(rc);
00234         isDone = true;
00235         return EXECRC_BUF_OVERFLOW;
00236     }
00237 
00238     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00239         ExecStreamResult rc = getValidatedTuple();
00240         if (rc != EXECRC_YIELD) {
00241             return rc;
00242         }
00243 
00244         if (uniqueRequired(inputTuple)) {
00245             if (currEntry) {
00246                 // Write out the current entry before we insert the unique
00247                 // key.
00248                 insertBitmapEntry();
00249                 currEntry = false;
00250             }
00251             upsertSingleton(inputTuple);
00252         } else if (!currEntry) {
00253             // if the key already exists in the index, splice the
00254             // entry just read to the existing btree entry
00255             if (existingEntry(inputTuple)) {
00256                 spliceEntry(inputTuple);
00257             }
00258         } else {
00259             // Compare the key values of the currentEntry with the
00260             // input tuple.  If they're the same, try splicing with
00261             // currentEntry.  Otherwise, write out currentEntry and
00262             // set currentEntry to the new input
00263             int keyComp = pCurrentEntry->compareEntry(
00264                 inputTuple, bitmapTupleDesc);
00265             assert(keyComp <= 0);
00266             if (keyComp == 0) {
00267                 // If we're in the mode where we're splicing in random
00268                 // singleton entries, first make sure there isn't a "better"
00269                 // entry in the btree for the entry we're trying to splice in.
00270                 // A better entry is one whose startRID is closer to the
00271                 // singleton rid we're trying to splice in.
00272                 if (computeRowCount) {
00273                     findBetterEntry(inputTuple);
00274                 }
00275                 spliceEntry(inputTuple);
00276             } else {
00277                 insertBitmapEntry();
00278                 if (existingEntry(inputTuple)) {
00279                     spliceEntry(inputTuple);
00280                 }
00281             }
00282         }
00283         pInAccessor->consumeTuple();
00284     }
00285 
00286     return EXECRC_QUANTUM_EXPIRED;
00287 }
00288 
00289 void LbmSplicerExecStream::closeImpl()
00290 {
00291     if (bTreeWriter) {
00292         bTreeWriter->endSearch();
00293     }
00294     deletionReader.endSearch();
00295     DiffluenceExecStream::closeImpl();
00296     bitmapBuffer.reset();
00297     mergeBuffer.reset();
00298     pCurrentEntry.reset();
00299     bTreeWriter.reset();
00300 }
00301 
00302 bool LbmSplicerExecStream::existingEntry(TupleData const &bitmapEntry)
00303 {
00304     if (!isEmpty()) {
00305         // if entry already exists in the btree, then the current bitmap
00306         // entry becomes that existing btree entry
00307         if (findBTreeEntry(bitmapEntry, bTreeTupleData)) {
00308             currExistingEntry = true;
00309             createNewBitmapEntry(bTreeTupleData);
00310             bTreeWriterMoved = false;
00311             return true;
00312         }
00313     }
00314 
00315     // set current bitmap entry to new entry
00316     currExistingEntry = false;
00317     createNewBitmapEntry(bitmapEntry);
00318     return false;
00319 }
00320 
00321 bool LbmSplicerExecStream::findMatchingBTreeEntry(
00322     TupleData const &bitmapEntry,
00323     TupleData &bTreeTupleData,
00324     bool leastUpper)
00325 {
00326     bool match =
00327         bTreeWriter->searchForKey(
00328             bitmapEntry,
00329             DUP_SEEK_BEGIN,
00330             leastUpper);
00331     bTreeWriter->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00332     return match;
00333 }
00334 
00335 bool LbmSplicerExecStream::findBTreeEntry(
00336     TupleData const &bitmapEntry, TupleData &bTreeTupleData)
00337 {
00338     // First do a greatest lower bound lookup into the btree, searching on
00339     // both the actual key index values and the startRid
00340     bool match =
00341         findMatchingBTreeEntry(bitmapEntry, bTreeTupleData, (nIdxKeys > 0));
00342 
00343     if (match == false) {
00344         if (nIdxKeys == 0) {
00345             // If there are no index keys, then we are splicing individual
00346             // rids.  In that case, we should always be splicing into the
00347             // best btree entry available.  First see if the greatest lower
00348             // bound entry overlaps the rid we're looking for.  If it doesn't,
00349             // try the next entry.  If that doesn't overlap, go back to the
00350             // greatest lower bound entry so we can splice the new rid to
00351             // the end of that entry.
00352             LcsRid newRid = *reinterpret_cast<LcsRid const *>
00353                 (bitmapEntry[0].pData);
00354             if (!ridOverlaps(newRid, bTreeTupleData, false)) {
00355                 match = bTreeWriter->searchNext();
00356                 if (match) {
00357                     bTreeWriter->getTupleAccessorForRead().unmarshal(
00358                         bTreeTupleData);
00359                     if (!ridOverlaps(newRid, bTreeTupleData, true)) {
00360                         match = bTreeWriter->searchForKey(
00361                             bitmapEntry, DUP_SEEK_BEGIN, false);
00362                         assert(match == false);
00363                         bTreeWriter->getTupleAccessorForRead().unmarshal(
00364                             bTreeTupleData);
00365                     }
00366                 }
00367             }
00368             match = true;
00369 
00370         } else {
00371             // In the case where we have actual index keys, we've done a
00372             // least upper bound search to locate the entry.  See if
00373             // the keys without the startRid match.  If they do, then we've
00374             // located a singleton rid that overlaps with the entry we're
00375             // trying to splice.  If so, that is the entry we want to splice
00376             // into.  Otherwise, the desired entry may be in front of the
00377             // one we've located.  Therefore, we need to do a greatest lower
00378             // bound search to locate that previous entry (since we don't have
00379             // a BTreeReader::searchPrev method), and then compare the keys
00380             // to see if we have a match.
00381             if (!bTreeWriter->isSingular()) {
00382                 int keyComp =
00383                     bitmapTupleDesc.compareTuplesKey(
00384                         bTreeTupleData,
00385                         bitmapEntry,
00386                         nIdxKeys);
00387                 if (keyComp == 0) {
00388                     assert(
00389                         LbmSegment::roundToByteBoundary(
00390                             *reinterpret_cast<LcsRid const *>(
00391                                 bTreeTupleData[nIdxKeys].pData)) ==
00392                         LbmSegment::roundToByteBoundary(
00393                             *reinterpret_cast<LcsRid const *>(
00394                                 bitmapEntry[nIdxKeys].pData)));
00395                     return true;
00396                 }
00397             }
00398 
00399             // Position to the previous entry by doing a glb search
00400             match =
00401                 bTreeWriter->searchForKey(bitmapEntry, DUP_SEEK_BEGIN, false);
00402             assert(match == false);
00403             bTreeWriter->getTupleAccessorForRead().unmarshal(bTreeTupleData);
00404             int keyComp =
00405                 bitmapTupleDesc.compareTuplesKey(
00406                     bTreeTupleData,
00407                     bitmapEntry,
00408                     nIdxKeys);
00409             if (keyComp == 0) {
00410                 match = true;
00411             }
00412         }
00413     }
00414     return match;
00415 }
00416 
00417 bool LbmSplicerExecStream::ridOverlaps(
00418     LcsRid rid,
00419     TupleData &bitmapTupleData,
00420     bool firstByte)
00421 {
00422     // Convert singletons to the rid range representing all bits in the byte
00423     // corresponding to the singleton rid
00424     LcsRid startRid =
00425         LbmSegment::roundToByteBoundary(
00426             *reinterpret_cast<LcsRid const *>(bitmapTupleData[0].pData));
00427     uint rowCount;
00428     if (firstByte) {
00429        rowCount = LbmSegment::LbmOneByteSize;
00430     } else {
00431         rowCount = LbmEntry::getRowCount(bitmapTupleData);
00432         if (rowCount == 1) {
00433             rowCount = LbmSegment::LbmOneByteSize;
00434         }
00435     }
00436     if (rid >= startRid && rid < startRid + rowCount) {
00437         return true;
00438     } else {
00439         return false;
00440     }
00441 }
00442 
00443 void LbmSplicerExecStream::findBetterEntry(TupleData const &bitmapEntry)
00444 {
00445     // If there is a better btree entry, write out the current entry and set
00446     // the current entry to the btree entry found.  The btree entry is "better"
00447     // if it's the entry that we should be splicing the new rid into.
00448     //
00449     // In other words, one of the following conditions must be true:
00450     //
00451     // 1) bTreeStartRid <= newRid < currentStartRid
00452     // 2) currentStartRid < bTreeStartRid <= newRid
00453     // 3) newRid <= bTreeStartRid < currentStartRid
00454     //
00455     // NOTE - condition 1 occurs when the current bitmap entry is split, and
00456     // the current entry becomes the right portion of that bitmap entry.
00457     // Also, conditions 1 and 3 can be combined into:
00458     //
00459     // currentStartRid > newRid && currentStartRid > bTreeStartRid
00460 
00461     assert(computeRowCount);
00462     if (!isEmpty()) {
00463         if (findBTreeEntry(bitmapEntry, bTreeTupleData)) {
00464             LcsRid bTreeRid =
00465                 LbmSegment::roundToByteBoundary(
00466                     *reinterpret_cast<LcsRid const *> (
00467                         bTreeTupleData[0].pData));
00468             LcsRid newRid = *reinterpret_cast<LcsRid const *>
00469                 (bitmapEntry[0].pData);
00470             LcsRid currRid =
00471                 LbmSegment::roundToByteBoundary(pCurrentEntry->getStartRID());
00472 
00473             if ((currRid > newRid && currRid > bTreeRid) ||
00474                 (newRid >= bTreeRid && bTreeRid > currRid))
00475             {
00476                 // If the current entry is a superset of the btree entry found,
00477                 // then ignore the btree entry, and continuing splicing into
00478                 // the current entry
00479                 uint rowCount = pCurrentEntry->getRowCount();
00480                 if (rowCount == 1) {
00481                     rowCount = LbmSegment::LbmOneByteSize;
00482                 }
00483                 if ((bTreeRid >= currRid) && (bTreeRid < currRid + rowCount)) {
00484                     return;
00485                 }
00486 
00487                 // Write out the current entry before we switch over to the
00488                 // new one
00489                 insertBitmapEntry();
00490                 currExistingEntry = true;
00491                 createNewBitmapEntry(bTreeTupleData);
00492             }
00493         }
00494     }
00495 }
00496 
00497 void LbmSplicerExecStream::spliceEntry(TupleData &bitmapEntry)
00498 {
00499     FENNEL_TRACE(TRACE_FINE, "splice two entries");
00500     FENNEL_TRACE(TRACE_FINE, pCurrentEntry->toString());
00501     FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(bitmapEntry));
00502 
00503     if (!pCurrentEntry->mergeEntry(bitmapEntry)) {
00504         insertBitmapEntry();
00505         createNewBitmapEntry(bitmapEntry);
00506     }
00507 }
00508 
00509 void LbmSplicerExecStream::insertBitmapEntry()
00510 {
00511     TupleData const &indexTuple = pCurrentEntry->produceEntryTuple();
00512 
00513     // implement btree updates as deletes followed by inserts
00514     if (currExistingEntry) {
00515         // when we're inserting in random singleton mode, we may have
00516         // repositioned in the btree, trying to find a better btree entry,
00517         // so we need to position back to the original btree entry before
00518         // we delete it. the btree may also reposition for validation
00519         if (bTreeWriterMoved) {
00520             for (uint i = 0; i < nIdxKeys; i++) {
00521                 tempBTreeTupleData[i] = indexTuple[i];
00522             }
00523         }
00524         if (computeRowCount || bTreeWriterMoved) {
00525             tempBTreeTupleData[nIdxKeys].pData =
00526                 (PConstBuffer) &currBTreeStartRid;
00527             bool match =
00528                 findMatchingBTreeEntry(
00529                     tempBTreeTupleData,
00530                     tempBTreeTupleData,
00531                     false);
00532             permAssert(match);
00533         }
00534         FENNEL_TRACE(TRACE_FINE, "delete Tuple from BTree");
00535         FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(bTreeTupleData));
00536 
00537         bTreeWriter->deleteCurrent();
00538         currExistingEntry = false;
00539     }
00540 
00541     FENNEL_TRACE(TRACE_FINE, "insert Tuple into BTree");
00542     FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(indexTuple));
00543 
00544     bTreeWriter->insertTupleData(indexTuple, DUP_FAIL);
00545 }
00546 
00547 void LbmSplicerExecStream::createNewBitmapEntry(TupleData const &bitmapEntry)
00548 {
00549     pCurrentEntry->setEntryTuple(bitmapEntry);
00550     currBTreeStartRid = *reinterpret_cast<LcsRid const *>
00551         (bitmapEntry[nIdxKeys].pData);
00552     currEntry = true;
00553 }
00554 
00555 void LbmSplicerExecStream::upsertSingleton(TupleData const &bitmapEntry)
00556 {
00557     if (!isEmpty()) {
00558         if (findBTreeEntry(bitmapEntry, bTreeTupleData)) {
00559             assert(LbmEntry::isSingleton(bTreeTupleData));
00560             bTreeWriter->deleteCurrent();
00561         }
00562     }
00563     bTreeWriter->insertTupleData(bitmapEntry, DUP_FAIL);
00564 }
00565 
00566 ExecStreamResult LbmSplicerExecStream::getValidatedTuple()
00567 {
00568     while (true) {
00569         if (!currValidation) {
00570             if (!pInAccessor->demandData()) {
00571                 return EXECRC_BUF_UNDERFLOW;
00572             }
00573 
00574             if (computeRowCount) {
00575                 pInAccessor->unmarshalTuple(singletonTuple);
00576                 inputTuple[0] = singletonTuple[0];
00577                 inputTuple[1].pData = NULL;
00578                 inputTuple[1].cbData = 0;
00579                 inputTuple[2].pData = NULL;
00580                 inputTuple[2].cbData = 0;
00581                 numRowsLoaded++;
00582             } else {
00583                 pInAccessor->unmarshalTuple(inputTuple);
00584             }
00585 
00586             FENNEL_TRACE(TRACE_FINE, "input Tuple from sorter");
00587             FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(inputTuple));
00588 
00589             // If we're creating a new index, we need to defer creating it until
00590             // we know that there are new input tuples.  Otherwise, there's no
00591             // point in creating it.
00592             if (createNewIndex && !newIndexCreated) {
00593                 newIndexCreated = true;
00594                 writeBTreeDesc.rootPageId = NULL_PAGE_ID;
00595                 BTreeBuilder builder(
00596                     writeBTreeDesc,
00597                     writeBTreeDesc.segmentAccessor.pSegment);
00598                 builder.createEmptyRoot();
00599                 writeBTreeDesc.rootPageId = builder.getRootPageId();
00600                 emptyTable = true;
00601                 emptyTableUnknown = false;
00602                 bTreeWriter = SharedBTreeWriter(
00603                     new BTreeWriter(
00604                         writeBTreeDesc,
00605                         scratchAccessor,
00606                         emptyTable));
00607             }
00608 
00609             if (!uniqueRequired(inputTuple)) {
00610                 return EXECRC_YIELD;
00611             }
00612 
00613             // count existing entries for key, if the key has not been seen yet
00614             if (firstValidation
00615                 || bitmapTupleDesc.compareTuplesKey(
00616                     inputTuple, currUniqueKey, nIdxKeys) != 0)
00617             {
00618                 firstValidation = false;
00619                 currUniqueKey.resetBuffer();
00620                 for (uint i = 0; i < nIdxKeys; i++) {
00621                     currUniqueKey[i].memCopyFrom(inputTuple[i]);
00622                 }
00623                 nKeyRows = countKeyRows(inputTuple);
00624             }
00625 
00626             // prepare to emit rids for key violations
00627             inputRidReader.init(inputTuple);
00628             nullUpsertRid = true;
00629             currValidation = true;
00630         }
00631 
00632         // if there were no undeleted values for the current key, we can
00633         // insert/update a single rid
00634         if (nKeyRows == 0) {
00635             assert(getUpsertRidPtr() == NULL);
00636             if (!createNewIndex) {
00637                 setUpsertRid(inputRidReader.getNext());
00638                 nKeyRows++;
00639             } else {
00640                 // Loop until we find a non-deleted rid.  Deleted rids only
00641                 // occur when rebuilding an existing index.
00642                 do {
00643                     LcsRid rid = inputRidReader.getNext();
00644                     if (!deletionReader.searchForRid(rid)) {
00645                         setUpsertRid(rid);
00646                         nKeyRows++;
00647                         break;
00648                     }
00649                 } while (inputRidReader.hasNext());
00650             }
00651         }
00652 
00653         // all other rids are rejected as duplicate keys, unless they're deleted
00654         // rids
00655         while (inputRidReader.hasNext()) {
00656             if (!violationTuple.size()) {
00657                 // if there is a possibility of violations, the splicer should
00658                 // have been initialized with a second output
00659                 permAssert(false);
00660             }
00661             LcsRid rid = inputRidReader.peek();
00662             if (createNewIndex && deletionReader.searchForRid(rid)) {
00663                 inputRidReader.advance();
00664                 continue;
00665             }
00666             violationTuple[0].pData = reinterpret_cast<PConstBuffer>(&rid);
00667             violationTuple[0].cbData = 8;
00668             if (!violationAccessor->produceTuple(violationTuple)) {
00669                 return EXECRC_BUF_OVERFLOW;
00670             }
00671             postViolation(inputTuple, violationTuple);
00672             inputRidReader.advance();
00673         }
00674         currValidation = false;
00675 
00676         if (getUpsertRidPtr() != NULL) {
00677             // since a rid was accepted, return it as a validated tuple
00678             inputTuple[nIdxKeys].pData =
00679                 reinterpret_cast<PConstBuffer>(getUpsertRidPtr());
00680             inputTuple[nIdxKeys].cbData = 8;
00681             inputTuple[nIdxKeys + 1].pData = NULL;
00682             inputTuple[nIdxKeys + 1].cbData = 0;
00683             inputTuple[nIdxKeys + 2].pData = NULL;
00684             inputTuple[nIdxKeys + 2].cbData = 0;
00685             return EXECRC_YIELD;
00686         }
00687 
00688         // every rid in the current tuple was either rejected or already
00689         // deleted, so try the next tuple
00690         pInAccessor->consumeTuple();
00691     }
00692 }
00693 
00694 bool LbmSplicerExecStream::uniqueRequired(const TupleData &tuple)
00695 {
00696     if (uniqueKey) {
00697         for (uint i = 0; i < nIdxKeys; i++) {
00698             if (tuple[i].isNull()) {
00699                 return false;
00700             }
00701         }
00702         return true;
00703     }
00704     return false;
00705 }
00706 
00707 uint LbmSplicerExecStream::countKeyRows(const TupleData &tuple)
00708 {
00709     assert(uniqueKey);
00710     if (isEmpty()) {
00711         return 0;
00712     }
00713 
00714     if (!findBTreeEntry(inputTuple, bTreeTupleData)) {
00715         return 0;
00716     }
00717     assert(LbmEntry::isSingleton(bTreeTupleData));
00718     LcsRid rid = LbmEntry::getStartRid(bTreeTupleData);
00719 
00720     if (deletionReader.searchForRid(rid)) {
00721         return 0;
00722     }
00723     return 1;
00724 }
00725 
00726 void LbmSplicerExecStream::postViolation(
00727     const TupleData &input, const TupleData &violation)
00728 {
00729     if (!errorTuple.size()) {
00730         for (uint i = 0; i < nIdxKeys + 1; i++) {
00731             errorDesc.push_back(bitmapTupleDesc[i]);
00732         }
00733         errorTuple.compute(errorDesc);
00734         errorMsg = FennelResource::instance().uniqueConstraintViolated();
00735     }
00736 
00737     for (uint i = 0; i < nIdxKeys; i++) {
00738         errorTuple[i] = input[i];
00739     }
00740     errorTuple[nIdxKeys] = violation[0];
00741 
00742     postError(ROW_ERROR, errorMsg, errorDesc, errorTuple, -1);
00743 }
00744 
00745 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmSplicerExecStream.cpp#25 $");
00746 
00747 // End LbmSplicerExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1