LbmUnionExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 LucidEra, Inc.
00005 // Copyright (C) 2006-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/lucidera/bitmap/LbmUnionExecStream.h"
00025 
00026 #include <math.h>
00027 
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $");
00029 
00030 void LbmUnionExecStream::prepare(LbmUnionExecStreamParams const &params)
00031 {
00032     ConfluenceExecStream::prepare(params);
00033     maxRid = params.maxRid;
00034 
00035     // set dynanmic parameter ids
00036     ridLimitParamId = params.ridLimitParamId;
00037     assert(opaqueToInt(ridLimitParamId) > 0);
00038 
00039     // optional parameters
00040     startRidParamId = params.startRidParamId;
00041     segmentLimitParamId = params.segmentLimitParamId;
00042 
00043     // setup tupledatums for writing dynamic parameter values
00044     ridLimitDatum.pData = (PConstBuffer) &ridLimit;
00045     ridLimitDatum.cbData = sizeof(ridLimit);
00046 
00047     assert(inAccessors[0]->getTupleDesc() == pOutAccessor->getTupleDesc());
00048 
00049     // initialize reader
00050     inputTuple.compute(inAccessors[0]->getTupleDesc());
00051 
00052     // output buffer will come from scratch segment
00053     scratchAccessor = params.scratchAccessor;
00054     workspacePageLock.accessSegment(scratchAccessor);
00055     writerPageLock.accessSegment(scratchAccessor);
00056     pageSize = scratchAccessor.pSegment->getUsablePageSize();
00057 }
00058 
00059 void LbmUnionExecStream::getResourceRequirements(
00060     ExecStreamResourceQuantity &minQuantity,
00061     ExecStreamResourceQuantity &optQuantity,
00062     ExecStreamResourceSettingType &optType)
00063 {
00064     ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00065 
00066     // at least 2 scratch pages for constructing output bitmap segments
00067     //   - 1 for workspace
00068     //   - 1 for writer
00069     minQuantity.nCachePages += 2;
00070     optQuantity.nCachePages += 2 + computeOptWorkspacePages(maxRid) + 1;
00071     optType = EXEC_RESOURCE_ESTIMATE;
00072 }
00073 
00074 void LbmUnionExecStream::setResourceAllocation(
00075     ExecStreamResourceQuantity &quantity)
00076 {
00077     ConfluenceExecStream::setResourceAllocation(quantity);
00078 
00079     // TODO: can we just grab all the remaining pages like this?
00080     nWorkspacePages = quantity.nCachePages - 1;
00081     ridLimit = computeRidLimit(nWorkspacePages);
00082 }
00083 
00084 void LbmUnionExecStream::open(bool restart)
00085 {
00086     ConfluenceExecStream::open(restart);
00087 
00088     if (!restart) {
00089         uint bitmapColSize = pOutAccessor->getTupleDesc()[1].cbStorage;
00090         uint writerBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00091         writerPageLock.allocatePage();
00092         PBuffer writerBuf = writerPageLock.getPage().getWritableData();
00093         segmentWriter.init(
00094             writerBuf, writerBufSize, pOutAccessor->getTupleDesc(), false);
00095         // still have plenty of space for merging
00096         reverseArea = writerBuf + writerBufSize;
00097         reverseAreaSize =
00098             scratchAccessor.pSegment->getUsablePageSize() - writerBufSize;
00099 
00100         // allocate byte buffer for merging segments
00101         boost::shared_array<PBuffer> ppBuffers(new PBuffer[nWorkspacePages]);
00102         assert(ppBuffers != NULL);
00103         for (uint i = 0; i < nWorkspacePages; i++) {
00104             workspacePageLock.allocatePage();
00105             ppBuffers[i] = workspacePageLock.getPage().getWritableData();
00106             workspacePageLock.unlock();
00107         }
00108         ByteBuffer *pBuffer = new ByteBuffer();
00109         pBuffer->init(ppBuffers, nWorkspacePages, pageSize);
00110         SharedByteBuffer pWorkspaceBuffer(pBuffer);
00111         uint maxSegmentSize = LbmEntry::getMaxBitmapSize(bitmapColSize);
00112         workspace.init(pWorkspaceBuffer, maxSegmentSize);
00113 
00114         // create dynamic parameters
00115         pDynamicParamManager->createParam(
00116             ridLimitParamId, pOutAccessor->getTupleDesc()[0]);
00117         pDynamicParamManager->writeParam(ridLimitParamId, ridLimitDatum);
00118     } else {
00119         workspace.reset();
00120         segmentWriter.reset();
00121     }
00122 
00123     writePending = false;
00124     producePending = false;
00125     isDone = false;
00126     segmentReader.init(inAccessors[0], inputTuple);
00127 }
00128 
00129 ExecStreamResult LbmUnionExecStream::execute(
00130     ExecStreamQuantum const &quantum)
00131 {
00132     if (isDone) {
00133         pOutAccessor->markEOS();
00134         return EXECRC_EOS;
00135     }
00136 
00137     if (isConsumerSridSet()) {
00138         // avoid RIDs not required by the downstream consumer
00139         requestedSrid = (LcsRid) *reinterpret_cast<RecordNum const *>(
00140             pDynamicParamManager->getParam(startRidParamId).getDatum().pData);
00141         workspace.advanceToSrid(requestedSrid);
00142     }
00143     if (isSegmentLimitSet()) {
00144         segmentsRemaining = *reinterpret_cast<uint const *>(
00145             pDynamicParamManager->getParam(segmentLimitParamId)
00146             .getDatum().pData);
00147     }
00148 
00149     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00150         while (! producePending) {
00151             // yield control if segment limit is reached
00152             if (isSegmentLimitSet() && segmentsRemaining == 0) {
00153                 return EXECRC_QUANTUM_EXPIRED;
00154             }
00155 
00156             ExecStreamResult status = readSegment();
00157             if (status == EXECRC_EOS) {
00158                 // flush any remaining data as last tuple(s)
00159                 isDone = workspace.isEmpty() && segmentWriter.isEmpty();
00160                 if (! isDone) {
00161                     transferLast();
00162                     producePending = true;
00163                     break;
00164                 }
00165                 return EXECRC_EOS;
00166             }
00167             if (status != EXECRC_YIELD) {
00168                 return status;
00169             }
00170             if (! writeSegment()) {
00171                 producePending = (! segmentWriter.isEmpty());
00172             }
00173         }
00174 
00175         if (! produceTuple()) {
00176             return EXECRC_BUF_OVERFLOW;
00177         }
00178         producePending = false;
00179     }
00180     return EXECRC_QUANTUM_EXPIRED;
00181 }
00182 
00183 void LbmUnionExecStream::closeImpl()
00184 {
00185     ConfluenceExecStream::closeImpl();
00186 
00187     if (scratchAccessor.pSegment) {
00188         scratchAccessor.pSegment->deallocatePageRange(
00189             NULL_PAGE_ID, NULL_PAGE_ID);
00190     }
00191 }
00192 
00193 uint LbmUnionExecStream::computeOptWorkspacePages(LcsRid maxRid)
00194 {
00195     // TODO: come up with a better estimate once we have statistics
00196     return 2;
00197 }
00198 
00199 uint LbmUnionExecStream::computeRidLimit(uint nWorkspacePages)
00200 {
00201     // save a quarter page for building segments
00202     // based upon the idea that the largest segment could be
00203     // 1/8 of a page along with 1/8 of a page for "growing" a
00204     // segment before writing it out (not true as of 2006-03-08)
00205     uint bytes = (uint) ((nWorkspacePages - 0.25) * pageSize);
00206     return bytes * LbmSegment::LbmOneByteSize;
00207 }
00208 
00209 bool LbmUnionExecStream::isConsumerSridSet()
00210 {
00211     return (opaqueToInt(startRidParamId) > 0);
00212 }
00213 
00214 bool LbmUnionExecStream::isSegmentLimitSet()
00215 {
00216     return (opaqueToInt(segmentLimitParamId) > 0);
00217 }
00218 
00219 ExecStreamResult LbmUnionExecStream::readSegment()
00220 {
00221     if (writePending) {
00222         return EXECRC_YIELD;
00223     }
00224     ExecStreamResult status = segmentReader.readSegmentAndAdvance(
00225         inputSegment.byteNum, inputSegment.byteSeg, inputSegment.len);
00226     if (status == EXECRC_YIELD) {
00227         writePending = true;
00228     }
00229     return status;
00230 }
00231 
00232 bool LbmUnionExecStream::writeSegment()
00233 {
00234     assert(writePending = true);
00235 
00236     // eagerly flush segments
00237     LcsRid currentSrid = segmentReader.getSrid();
00238     workspace.setProductionLimit(currentSrid);
00239     if (!transfer()) {
00240         return false;
00241     }
00242     if (workspace.isEmpty()) {
00243         workspace.advanceToSrid(currentSrid);
00244     }
00245 
00246     // flushing the workspace should make enough room for the next tuple
00247     bool success = workspace.addSegment(inputSegment);
00248     assert(success);
00249     writePending = false;
00250     return true;
00251 }
00252 
00253 void LbmUnionExecStream::transferLast()
00254 {
00255     workspace.removeLimit();
00256     transfer();
00257 }
00258 
00259 bool LbmUnionExecStream::transfer()
00260 {
00261     while (workspace.canProduce()) {
00262         if (isSegmentLimitSet() && segmentsRemaining == 0) {
00263             return false;
00264         }
00265 
00266         LbmByteSegment seg = workspace.getSegment();
00267         assert(seg.len < reverseAreaSize);
00268         PBuffer reverseStart = reverseArea + seg.len - 1;
00269         for (int i = 0; i < seg.len; i++) {
00270             reverseStart[-i] = seg.byteSeg[i];
00271         }
00272         LcsRid startRid = seg.getSrid();
00273         if (! segmentWriter.addSegment(startRid, reverseArea, seg.len)) {
00274             return false;
00275         }
00276         workspace.advancePastSegment();
00277 
00278         if (isSegmentLimitSet()) {
00279             segmentsRemaining--;
00280         }
00281     }
00282     return true;
00283 }
00284 
00285 bool LbmUnionExecStream::produceTuple()
00286 {
00287     assert(producePending);
00288     assert(! segmentWriter.isEmpty());
00289 
00290     outputTuple = segmentWriter.produceSegmentTuple();
00291     if (pOutAccessor->produceTuple(outputTuple)) {
00292         segmentWriter.reset();
00293         producePending = false;
00294         return true;
00295     }
00296     return false;
00297 }
00298 
00299 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $");
00300 
00301 // End LbmUnionExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1