SegBufferReaderExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SegBufferReaderExecStream.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/SegBufferReaderExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/DynamicParam.h"
00028 #include "fennel/exec/SegBufferReader.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferReaderExecStream.cpp#2 $");
00031 
00032 void SegBufferReaderExecStream::prepare(
00033     SegBufferReaderExecStreamParams const &params)
00034 {
00035     ConduitExecStream::prepare(params);
00036     bufferSegmentAccessor = params.scratchAccessor;
00037     readerRefCountParamId = params.readerRefCountParamId;
00038     paramIncremented = false;
00039 
00040     assert(pInAccessor->getTupleDesc().size() == 1);
00041     inputTuple.compute(pInAccessor->getTupleDesc());
00042 }
00043 
00044 void SegBufferReaderExecStream::getResourceRequirements(
00045     ExecStreamResourceQuantity &minQuantity,
00046     ExecStreamResourceQuantity &optQuantity)
00047 {
00048     ConduitExecStream::getResourceRequirements(minQuantity, optQuantity);
00049 
00050     // set aside 1 page for I/O
00051     minQuantity.nCachePages += 1;
00052     optQuantity = minQuantity;
00053 }
00054 
00055 void SegBufferReaderExecStream::open(bool restart)
00056 {
00057     assert(pOutAccessor);
00058     assert(pOutAccessor->getProvision() == BUFPROV_PRODUCER);
00059 
00060     if (!restart) {
00061         firstBufferPageId = NULL_PAGE_ID;
00062         pDynamicParamManager->incrementCounterParam(readerRefCountParamId);
00063         paramIncremented = true;
00064         ConduitExecStream::open(restart);
00065     } else {
00066         pOutAccessor->clear();
00067         if (pSegBufferReader) {
00068             // reread from beginning
00069             pSegBufferReader->open(false);
00070         } else {
00071             // the buffered data hasn't been read yet, so treat this the
00072             // same as first open
00073             ConduitExecStream::open(restart);
00074         }
00075     }
00076 }
00077 
00078 void SegBufferReaderExecStream::closeImpl()
00079 {
00080     // If this stream was never opened, then it's possible that the parameter
00081     // does not exist.  So, only decrement if we know we've done an increment.
00082     if (paramIncremented) {
00083         pDynamicParamManager->decrementCounterParam(readerRefCountParamId);
00084         paramIncremented = false;
00085     }
00086     pSegBufferReader.reset();
00087     ConduitExecStream::closeImpl();
00088 }
00089 
00090 ExecStreamResult SegBufferReaderExecStream::execute(ExecStreamQuantum const &)
00091 {
00092     // Retrieve the first pageId of the buffered data from the writer stream,
00093     // if it hasn't already been retrieved, and then setup a buffer reader.
00094     if (firstBufferPageId == NULL_PAGE_ID) {
00095         if (!pInAccessor->demandData()) {
00096             return EXECRC_BUF_UNDERFLOW;
00097         }
00098         pInAccessor->unmarshalTuple(inputTuple);
00099         firstBufferPageId =
00100             *reinterpret_cast<PageId const *> (inputTuple[0].pData);
00101         pInAccessor->consumeTuple();
00102         pSegBufferReader =
00103             SegBufferReader::newSegBufferReader(
00104                 pOutAccessor,
00105                 bufferSegmentAccessor,
00106                 firstBufferPageId);
00107         pSegBufferReader->open(false);
00108     }
00109 
00110     // Read the buffered data
00111     return pSegBufferReader->read();
00112 }
00113 
00114 ExecStreamBufProvision SegBufferReaderExecStream::getOutputBufProvision() const
00115 {
00116     return BUFPROV_PRODUCER;
00117 }
00118 
00119 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferReaderExecStream.cpp#2 $")
00120 
00121 // End SegBufferReaderExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1