LbmBitOpExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 LucidEra, Inc.
00005 // Copyright (C) 2006-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/lucidera/bitmap/LbmBitOpExecStream.h"
00025 
00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $");
00027 
00028 void LbmBitOpExecStream::prepare(LbmBitOpExecStreamParams const &params)
00029 {
00030     ConfluenceExecStream::prepare(params);
00031 
00032     // set dynanmic parameter ids
00033     rowLimitParamId = params.rowLimitParamId;
00034     startRidParamId = params.startRidParamId;
00035 
00036     // setup tupledatums for writing dynamic parameter values
00037     rowLimitDatum.pData = (PConstBuffer) &rowLimit;
00038     rowLimitDatum.cbData = sizeof(rowLimit);
00039     startRidDatum.pData = (PConstBuffer) &startRid;
00040     startRidDatum.cbData = sizeof(startRid);
00041 
00042     // initialize segment readers for reading bitmaps from input stream
00043     nInputs = inAccessors.size();
00044     segmentReaders.reset(new LbmSegmentReader[nInputs]);
00045     bitmapSegTuples.reset(new TupleData[nInputs]);
00046     for (uint i = 0; i < nInputs; i++) {
00047         bitmapSegTuples[i].compute(inAccessors[i]->getTupleDesc());
00048     }
00049 
00050     nFields = inAccessors[0]->getTupleDesc().size() - 3;
00051 }
00052 
00053 void LbmBitOpExecStream::open(bool restart)
00054 {
00055     ConfluenceExecStream::open(restart);
00056 
00057     uint nKeys = pOutAccessor->getTupleDesc().size() - 3;
00058 
00059     if (!restart) {
00060         // create dynamic parameters with the same type as the first bitmap
00061         // field, a RID, if this is the first time open is called
00062         if (opaqueToInt(rowLimitParamId) > 0) {
00063             pDynamicParamManager->createParam(
00064                 rowLimitParamId, pOutAccessor->getTupleDesc()[nKeys]);
00065         }
00066         if (opaqueToInt(startRidParamId) > 0) {
00067             pDynamicParamManager->createParam(
00068                 startRidParamId, pOutAccessor->getTupleDesc()[nKeys]);
00069         }
00070     }
00071 
00072     // need to allocate buffers if this is the very first open, or if a
00073     // previous close freed up the buffers; note that we don't check "restart"
00074     // in case the stream was closed early, and then reopened in restart mode
00075     if (!outputBuf) {
00076         // allocate output buffer; the output buffer size is based on the size
00077         // required for building a LbmEntry
00078         uint bitmapColSize = pOutAccessor->getTupleDesc()[nKeys + 1].cbStorage;
00079         uint outputBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00080         outputBuf.reset(new FixedBuffer[outputBufSize]);
00081 
00082         // initialize the writer to produce bitmap tuples; the second input
00083         // should be a bitmap input
00084         segmentWriter.init(
00085             outputBuf.get(), outputBufSize,
00086             inAccessors[1]->getTupleDesc(), true);
00087 
00088         // allocate a temporary buffer for the bit operation; the temporary
00089         // buffer should not be larger than what a LbmEntry supports
00090         bitmapBufSize = LbmEntry::getMaxBitmapSize(bitmapColSize);
00091         byteSegBuf.reset(new FixedBuffer[bitmapBufSize]);
00092         pByteSegBuf = byteSegBuf.get();
00093 
00094     } else {
00095         segmentWriter.reset();
00096     }
00097 
00098     startRid = LcsRid(0);
00099     rowLimit = 1;
00100     producePending = false;
00101     writeStartRidParamValue();
00102     if (opaqueToInt(rowLimitParamId) > 0) {
00103         pDynamicParamManager->writeParam(rowLimitParamId, rowLimitDatum);
00104     }
00105     for (uint i = 0; i < nInputs; i++) {
00106         segmentReaders[i].init(inAccessors[i], bitmapSegTuples[i]);
00107     }
00108 }
00109 
00110 ExecStreamResult LbmBitOpExecStream::producePendingOutput(uint iInput)
00111 {
00112     if (!produceTuple(outputTuple)) {
00113         return EXECRC_BUF_OVERFLOW;
00114     }
00115     // in the middle of adding segments when buffer overflow occurred;
00116     // go back and add the remaining segments
00117     if (!segmentWriter.isEmpty()) {
00118         segmentWriter.reset();
00119         if (!addSegments()) {
00120             return EXECRC_BUF_OVERFLOW;
00121         }
00122     }
00123     producePending = false;
00124     if (inAccessors[iInput]->getState() == EXECBUF_EOS) {
00125         pOutAccessor->markEOS();
00126         return EXECRC_EOS;
00127     }
00128 
00129     return EXECRC_YIELD;
00130 }
00131 
00132 ExecStreamResult LbmBitOpExecStream::readInput(
00133     uint iInput, LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
00134 {
00135     ExecStreamResult rc = segmentReaders[iInput].advanceToRid(startRid);
00136 
00137     if (rc == EXECRC_EOS) {
00138         // write out the last pending segment
00139         if (! flush()) {
00140             return EXECRC_BUF_OVERFLOW;
00141         }
00142         pOutAccessor->markEOS();
00143         return EXECRC_EOS;
00144     } else if (rc != EXECRC_YIELD) {
00145         return rc;
00146     }
00147 
00148     segmentReaders[iInput].readCurrentByteSegment(
00149         currRid, currByteSeg, currLen);
00150     // segment read should never be larger than space available
00151     // for segments
00152     assert(currLen <= bitmapBufSize);
00153 
00154     return EXECRC_YIELD;
00155 }
00156 
00157 bool LbmBitOpExecStream::flush()
00158 {
00159     assert (!producePending);
00160 
00161     if (!segmentWriter.isEmpty()) {
00162         outputTuple = segmentWriter.produceSegmentTuple();
00163         segmentWriter.reset();
00164         if (!produceTuple(outputTuple)) {
00165             producePending = true;
00166         }
00167     }
00168     return !producePending;
00169  }
00170 
00171 bool LbmBitOpExecStream::addSegments()
00172 {
00173     while (addLen > 0) {
00174         if (segmentWriter.addSegment(addRid, addByteSeg, addLen)) {
00175             break;
00176         }
00177 
00178         outputTuple = segmentWriter.produceSegmentTuple();
00179         if (!produceTuple(outputTuple)) {
00180             producePending = true;
00181             return false;
00182         }
00183 
00184         // loop back and start creating a new segment for the remainder of
00185         // the segments that wouldn't fit
00186         segmentWriter.reset();
00187     }
00188 
00189     return true;
00190 }
00191 
00192 bool LbmBitOpExecStream::produceTuple(TupleData bitmapTuple)
00193 {
00194     assert(pOutAccessor->getTupleDesc().size() == bitmapTuple.size());
00195     return pOutAccessor->produceTuple(bitmapTuple);
00196 }
00197 
00198 void LbmBitOpExecStream::closeImpl()
00199 {
00200     ConfluenceExecStream::closeImpl();
00201     outputBuf.reset();
00202     byteSegBuf.reset();
00203 }
00204 
00205 void LbmBitOpExecStream::writeStartRidParamValue()
00206 {
00207     if (opaqueToInt(startRidParamId) > 0) {
00208         pDynamicParamManager->writeParam(startRidParamId, startRidDatum);
00209     }
00210 }
00211 
00212 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $");
00213 
00214 // End LbmBitOpExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1