LbmChopperExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/lucidera/bitmap/LbmChopperExecStream.h"
00025 
00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $");
00027 
00028 LbmChopperExecStream::LbmChopperExecStream()
00029 {
00030     ridLimitParamId = DynamicParamId(0);
00031 }
00032 
00033 void LbmChopperExecStream::prepare(LbmChopperExecStreamParams const &params)
00034 {
00035     ConfluenceExecStream::prepare(params);
00036 
00037     // set dynamic parameter ids
00038     ridLimitParamId = params.ridLimitParamId;
00039     assert(opaqueToInt(ridLimitParamId) > 0);
00040 
00041     // initialize reader
00042     inputTuple.compute(inAccessors[0]->getTupleDesc());
00043 
00044     // output buffer will come from scratch segment
00045     SegmentAccessor scratchAccessor = params.scratchAccessor;
00046     writerPageLock.accessSegment(scratchAccessor);
00047     pageSize = scratchAccessor.pSegment->getUsablePageSize();
00048 }
00049 
00050 void LbmChopperExecStream::getResourceRequirements(
00051     ExecStreamResourceQuantity &minQuantity,
00052     ExecStreamResourceQuantity &optQuantity)
00053 {
00054     ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00055 
00056     // one page for the writer
00057     minQuantity.nCachePages += 1;
00058     optQuantity.nCachePages += 1;
00059 }
00060 
00061 void LbmChopperExecStream::open(bool restart)
00062 {
00063     ConfluenceExecStream::open(restart);
00064 
00065     if (!restart) {
00066         uint bitmapColSize = pOutAccessor->getTupleDesc()[1].cbStorage;
00067         uint writerBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00068         writerPageLock.allocatePage();
00069         PBuffer writerBuf = writerPageLock.getPage().getWritableData();
00070         segmentWriter.init(
00071             writerBuf, writerBufSize, pOutAccessor->getTupleDesc(), false);
00072     } else {
00073         segmentWriter.reset();
00074     }
00075 
00076     state = LBM_STATE_READ;
00077     writePending = false;
00078     producePending = false;
00079     segmentReader.init(inAccessors[0], inputTuple);
00080 }
00081 
00082 ExecStreamResult LbmChopperExecStream::execute(
00083     ExecStreamQuantum const &quantum)
00084 {
00085     ridLimit = *reinterpret_cast<RecordNum const *>(
00086         pDynamicParamManager->getParam(ridLimitParamId).getDatum().pData);
00087 
00088     uint nTuples = 0;
00089     ExecStreamResult status;
00090     while (nTuples < quantum.nTuplesMax) {
00091         switch (state) {
00092         case LBM_STATE_READ:
00093             status = readSegment();
00094             if (status == EXECRC_EOS) {
00095                 // flush any remaining data as last tuple(s)
00096                 if (! segmentWriter.isEmpty()) {
00097                     producePending = true;
00098                     state = LBM_STATE_PRODUCE;
00099                     continue;
00100                 }
00101                 state = LBM_STATE_DONE;
00102                 continue;
00103             }
00104             if (status != EXECRC_YIELD) {
00105                 return status;
00106             }
00107             state = LBM_STATE_WRITE;
00108             continue;
00109         case LBM_STATE_WRITE:
00110             if (! writeSegment()) {
00111                 producePending = true;
00112                 state = LBM_STATE_PRODUCE;
00113                 continue;
00114             }
00115             nTuples++;
00116             state = LBM_STATE_READ;
00117             continue;
00118         case LBM_STATE_PRODUCE:
00119             if (! produceTuple()) {
00120                 return EXECRC_BUF_OVERFLOW;
00121             }
00122             state = writePending ? LBM_STATE_WRITE : LBM_STATE_READ;
00123             continue;
00124         case LBM_STATE_DONE:
00125             pOutAccessor->markEOS();
00126             return EXECRC_EOS;
00127         default:
00128             assert(false);
00129         }
00130     }
00131     return EXECRC_QUANTUM_EXPIRED;
00132 }
00133 
00134 void LbmChopperExecStream::closeImpl()
00135 {
00136     ConfluenceExecStream::closeImpl();
00137 }
00138 
00139 ExecStreamResult LbmChopperExecStream::readSegment()
00140 {
00141     if (writePending) {
00142         return EXECRC_YIELD;
00143     }
00144     ExecStreamResult status = segmentReader.readSegmentAndAdvance(
00145         inputSegment.byteNum, inputSegment.byteSeg, inputSegment.len);
00146     if (status == EXECRC_YIELD) {
00147         writePending = true;
00148     }
00149     return status;
00150 }
00151 
00152 bool LbmChopperExecStream::writeSegment()
00153 {
00154     assert(writePending = true);
00155     LcsRid startRid = inputSegment.getSrid();
00156     LcsRid endRid = inputSegment.getEndRid();
00157     assert(opaqueToInt(endRid - startRid) <= ridLimit);
00158 
00159     // if appending to previous segments, ensure that the current segment
00160     // follows the previous segment, and that it would not exceed the
00161     // rid limit for the tuple being written
00162     bool firstWrite = segmentWriter.isEmpty();
00163     if (! firstWrite) {
00164         if (startRid < currentEndRid) {
00165             return false;
00166         }
00167         if (opaqueToInt(endRid - currentSrid) > ridLimit) {
00168             return false;
00169         }
00170     }
00171 
00172     // try to add segment to writer
00173     PBuffer byteSeg = inputSegment.byteSeg - (inputSegment.len - 1);
00174     if (segmentWriter.addSegment(
00175             startRid,
00176             byteSeg,
00177             inputSegment.len))
00178     {
00179         writePending = false;
00180         if (firstWrite) {
00181             currentSrid = startRid;
00182         }
00183         currentEndRid = endRid;
00184         return true;
00185     }
00186     return false;
00187 }
00188 
00189 bool LbmChopperExecStream::produceTuple()
00190 {
00191     assert(producePending);
00192 
00193     TupleData outputTuple = segmentWriter.produceSegmentTuple();
00194     if (pOutAccessor->produceTuple(outputTuple)) {
00195         segmentWriter.reset();
00196         producePending = false;
00197         return true;
00198     }
00199     return false;
00200 }
00201 
00202 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $");
00203 
00204 // End LbmChopperExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1