SegBufferWriter.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SegBufferWriter.cpp#1 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/SegBufferWriter.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/segment/SegOutputStream.h"
00028 #include "fennel/segment/SegInputStream.h"
00029 
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferWriter.cpp#1 $");
00031 
00032 SharedSegBufferWriter SegBufferWriter::newSegBufferWriter(
00033     SharedExecStreamBufAccessor &pInAccessor,
00034     SegmentAccessor const &bufferSegmentAccessor,
00035     bool destroyOnClose)
00036 {
00037     return SharedSegBufferWriter(
00038         new SegBufferWriter(pInAccessor, bufferSegmentAccessor, destroyOnClose),
00039         ClosableObjectDestructor());
00040 }
00041 
00042 SegBufferWriter::SegBufferWriter(
00043     SharedExecStreamBufAccessor &pInAccessorInit,
00044     SegmentAccessor const &bufferSegmentAccessorInit,
00045     bool destroyOnCloseInit)
00046     : pInAccessor(pInAccessorInit),
00047         bufferSegmentAccessor(bufferSegmentAccessorInit),
00048         destroyOnClose(destroyOnCloseInit)
00049 {
00050     firstPageId = NULL_PAGE_ID;
00051 }
00052 
00053 ExecStreamResult SegBufferWriter::write()
00054 {
00055     if (!pByteOutputStream) {
00056         pByteOutputStream =
00057             SegOutputStream::newSegOutputStream(bufferSegmentAccessor);
00058         firstPageId = pByteOutputStream->getFirstPageId();
00059     }
00060 
00061     ExecStreamBufState inState = pInAccessor->getState();
00062     switch (inState) {
00063     case EXECBUF_NONEMPTY:
00064     case EXECBUF_OVERFLOW:
00065         pByteOutputStream->consumeWritePointer(
00066             pInAccessor->getConsumptionAvailable());
00067         pByteOutputStream->hardPageBreak();
00068         pInAccessor->consumeData(pInAccessor->getConsumptionEnd());
00069         if (pInAccessor->getState() == EXECBUF_EOS) {
00070             return EXECRC_BUF_UNDERFLOW;
00071         }
00072         // else fall through intentionally
00073     case EXECBUF_EMPTY:
00074         {
00075             uint cb;
00076             PBuffer pBuffer = pByteOutputStream->getWritePointer(1,&cb);
00077             pInAccessor->provideBufferForProduction(
00078                 pBuffer,
00079                 pBuffer + cb,
00080                 false);
00081         }
00082         return EXECRC_BUF_UNDERFLOW;
00083     case EXECBUF_UNDERFLOW:
00084         return EXECRC_BUF_UNDERFLOW;
00085     case EXECBUF_EOS:
00086         pByteOutputStream.reset();
00087         return EXECRC_EOS;
00088     default:
00089         permAssert(false);
00090     }
00091 }
00092 
00093 PageId SegBufferWriter::getFirstPageId()
00094 {
00095     return firstPageId;
00096 }
00097 
00098 void SegBufferWriter::closeImpl()
00099 {
00100     pByteOutputStream.reset();
00101     if (destroyOnClose) {
00102         SharedSegInputStream pByteInputStream =
00103             SegInputStream::newSegInputStream(
00104                 bufferSegmentAccessor,
00105                 firstPageId);
00106         pByteInputStream->setDeallocate(true);
00107         pByteInputStream.reset();
00108     }
00109 }
00110 
00111 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferWriter.cpp#1 $");
00112 
00113 // End SegBufferWriter.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1