SegBufferExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SegBufferExecStream.cpp#19 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/SegBufferExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ExecStreamGraphImpl.h"
00028 #include "fennel/exec/SegBufferReader.h"
00029 #include "fennel/exec/SegBufferWriter.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferExecStream.cpp#19 $");
00032 
00033 void SegBufferExecStream::prepare(SegBufferExecStreamParams const &params)
00034 {
00035     ConduitExecStream::prepare(params);
00036     bufferSegmentAccessor = params.scratchAccessor;
00037 
00038     multipass = params.multipass;
00039     firstPageId = NULL_PAGE_ID;
00040 }
00041 
00042 void SegBufferExecStream::getResourceRequirements(
00043     ExecStreamResourceQuantity &minQuantity,
00044     ExecStreamResourceQuantity &optQuantity)
00045 {
00046     ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00047 
00048     // set aside 1 page for I/O
00049     minQuantity.nCachePages += 1;
00050 
00051     optQuantity = minQuantity;
00052 }
00053 
00054 void SegBufferExecStream::open(bool restart)
00055 {
00056     assert(pInAccessor);
00057     assert(pInAccessor->getProvision() == BUFPROV_CONSUMER);
00058 
00059     assert(pOutAccessor);
00060     assert(pOutAccessor->getProvision() == BUFPROV_PRODUCER);
00061 
00062     // TODO jvs 1-June-2006:  generalize SegStreamAllocation to handle
00063     // the multipass usage requirements here
00064 
00065     if (restart) {
00066         pOutAccessor->clear();
00067         if (multipass) {
00068             if (pSegBufferReader) {
00069                 // reread from beginning
00070                 openBufferForRead(false);
00071             } else {
00072                 // nothing was ever buffered, so treat this the
00073                 // same as first open
00074                 ConduitExecStream::open(restart);
00075             }
00076         } else {
00077             // for a single-pass buffer, a restart means forget any buffered
00078             // contents
00079             destroyBuffer();
00080             ConduitExecStream::open(restart);
00081         }
00082     } else {
00083         ConduitExecStream::open(restart);
00084     }
00085 }
00086 
00087 void SegBufferExecStream::closeImpl()
00088 {
00089     destroyBuffer();
00090     ConduitExecStream::closeImpl();
00091 }
00092 
00093 void SegBufferExecStream::destroyBuffer()
00094 {
00095     if (pSegBufferWriter || (multipass && (firstPageId != NULL_PAGE_ID))) {
00096         // this is to make sure that buffer storage gets deallocated in all
00097         // cases
00098         openBufferForRead(true);
00099     }
00100     pSegBufferReader.reset();
00101     firstPageId = NULL_PAGE_ID;
00102 }
00103 
00104 void SegBufferExecStream::openBufferForRead(bool destroy)
00105 {
00106     if (firstPageId == NULL_PAGE_ID) {
00107         firstPageId = pSegBufferWriter->getFirstPageId();
00108         pSegBufferWriter.reset();
00109         pSegBufferReader =
00110             SegBufferReader::newSegBufferReader(
00111                 pOutAccessor,
00112                 bufferSegmentAccessor,
00113                 firstPageId);
00114     }
00115     pSegBufferReader->open(destroy);
00116 }
00117 
00118 ExecStreamResult SegBufferExecStream::execute(ExecStreamQuantum const &)
00119 {
00120     if (!pSegBufferReader) {
00121         if (!pSegBufferWriter) {
00122             pSegBufferWriter =
00123                 SegBufferWriter::newSegBufferWriter(
00124                     pInAccessor,
00125                     bufferSegmentAccessor,
00126                     false);
00127         }
00128         ExecStreamResult rc = pSegBufferWriter->write();
00129         if (rc != EXECRC_EOS) {
00130             return rc;
00131         }
00132 
00133         ExecStreamGraphImpl &graphImpl =
00134             dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00135         graphImpl.closeProducers(getStreamId());
00136         openBufferForRead(!multipass);
00137     }
00138 
00139     return pSegBufferReader->read();
00140 }
00141 
00142 ExecStreamBufProvision SegBufferExecStream::getOutputBufProvision() const
00143 {
00144     return BUFPROV_PRODUCER;
00145 }
00146 
00147 ExecStreamBufProvision SegBufferExecStream::getInputBufProvision() const
00148 {
00149     return BUFPROV_CONSUMER;
00150 }
00151 
00152 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferExecStream.cpp#19 $");
00153 
00154 // End SegBufferExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1