SpillOutputStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/SpillOutputStream.h"
00026 #include "fennel/segment/SegOutputStream.h"
00027 #include "fennel/segment/SegInputStream.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029 #include "fennel/common/ByteArrayInputStream.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $");
00032 
00033 // TODO:  support multiple scratch pages
00034 
00035 SharedSpillOutputStream SpillOutputStream::newSpillOutputStream(
00036     SharedSegmentFactory pSegmentFactory,
00037     SharedCacheAccessor pCacheAccessor,
00038     std::string spillFileName)
00039 {
00040     return SharedSpillOutputStream(
00041         new SpillOutputStream(pSegmentFactory,pCacheAccessor,spillFileName),
00042         ClosableObjectDestructor());
00043 }
00044 
00045 SpillOutputStream::SpillOutputStream(
00046     SharedSegmentFactory pSegmentFactoryInit,
00047     SharedCacheAccessor pCacheAccessorInit,
00048     std::string spillFileNameInit)
00049     : pSegmentFactory(pSegmentFactoryInit),
00050       pCacheAccessor(pCacheAccessorInit),
00051       spillFileName(spillFileNameInit)
00052 {
00053     // REVIEW:  this causes pCacheAccessor to lose its chance to intercept any
00054     // of the scratch calls
00055     scratchAccessor = pSegmentFactory->newScratchSegment(
00056         pCacheAccessor->getCache(),
00057         1);
00058     scratchPageLock.accessSegment(scratchAccessor);
00059     scratchPageLock.allocatePage();
00060     cbBuffer = scratchAccessor.pSegment->getUsablePageSize();
00061     setBuffer(
00062         scratchPageLock.getPage().getWritableData(),
00063         cbBuffer);
00064 }
00065 
00066 SpillOutputStream::~SpillOutputStream()
00067 {
00068 }
00069 
00070 void SpillOutputStream::flushBuffer(uint cbRequested)
00071 {
00072     if (scratchPageLock.isLocked()) {
00073         assert(!pSegOutputStream);
00074         // grow from short to long
00075         spill();
00076     } else {
00077         assert(pSegOutputStream);
00078         assert(!scratchPageLock.isLocked());
00079         // already long
00080         assert(cbBuffer >= getBytesAvailable());
00081         pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00082     }
00083     assert(pSegOutputStream);
00084     if (cbRequested) {
00085         PBuffer pBuffer =
00086             pSegOutputStream->getWritePointer(cbRequested,&cbBuffer);
00087         setBuffer(pBuffer,cbBuffer);
00088     } else {
00089         pSegOutputStream->hardPageBreak();
00090         cbBuffer = 0;
00091     }
00092 }
00093 
00094 void SpillOutputStream::closeImpl()
00095 {
00096     if (scratchPageLock.isLocked()) {
00097         // discard contents
00098         scratchPageLock.unlock();
00099     } else {
00100         assert(pSegOutputStream);
00101         assert(!scratchPageLock.isLocked());
00102         // flush long log
00103         pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00104         cbBuffer = 0;
00105         pSegOutputStream.reset();
00106     }
00107 }
00108 
00109 void SpillOutputStream::setWriteLatency(WriteLatency writeLatency)
00110 {
00111     ByteOutputStream::setWriteLatency(writeLatency);
00112     if (pSegOutputStream) {
00113         pSegOutputStream->setWriteLatency(writeLatency);
00114     }
00115 }
00116 
00117 // REVIEW:  due to page size discrepancies, spill will screw up any
00118 // LogicalTxnParticipant relying on getWritePointer/getReadPointer.  Should
00119 // either warn about this or fix it.  Also, after spill, could cut out the
00120 // middleman by having LogicalTxn reference pSegOutputStream directly and
00121 // discard the SpillOutputStream.
00122 
00123 void SpillOutputStream::spill()
00124 {
00125     DeviceMode devMode = DeviceMode::createNew;
00126     // TODO:  make this a parameter; for now it's always direct since this is
00127     // only used for log streams
00128     devMode.direct = true;
00129     SharedSegment pLongLogSegment =
00130         pSegmentFactory->newTempDeviceSegment(
00131             scratchPageLock.getCacheAccessor()->getCache(),
00132             devMode,
00133             spillFileName);
00134     SegmentAccessor segmentAccessor(pLongLogSegment,pCacheAccessor);
00135     pSegOutputStream = SegOutputStream::newSegOutputStream(segmentAccessor);
00136     pSegOutputStream->setWriteLatency(writeLatency);
00137     pSegOutputStream->writeBytes(
00138         scratchPageLock.getPage().getReadableData(),
00139         cbBuffer - getBytesAvailable());
00140     scratchPageLock.unlock();
00141 }
00142 
00143 SharedByteInputStream SpillOutputStream::getInputStream(
00144     SeekPosition seekPosition)
00145 {
00146     if (scratchPageLock.isLocked()) {
00147         SharedByteInputStream pInputStream =
00148             ByteArrayInputStream::newByteArrayInputStream(
00149                 scratchPageLock.getPage().getReadableData(),
00150                 getOffset());
00151         if (seekPosition == SEEK_STREAM_END) {
00152             pInputStream->seekForward(getOffset());
00153         }
00154         return pInputStream;
00155     } else {
00156         assert(pSegOutputStream);
00157         updatePage();
00158         SharedSegment pSegment = pSegOutputStream->getSegment();
00159         SegStreamPosition endPos;
00160         if (seekPosition == SEEK_STREAM_END) {
00161             pSegOutputStream->getSegPos(endPos);
00162         }
00163         SegmentAccessor segmentAccessor(pSegment,pCacheAccessor);
00164         SharedSegInputStream pInputStream =
00165             SegInputStream::newSegInputStream(segmentAccessor);
00166         if (seekPosition == SEEK_STREAM_END) {
00167             pInputStream->seekSegPos(endPos);
00168         }
00169         return pInputStream;
00170     }
00171 }
00172 
00173 void SpillOutputStream::updatePage()
00174 {
00175     if (!cbBuffer) {
00176         return;
00177     }
00178     assert(cbBuffer > getBytesAvailable());
00179     uint cbConsumed = cbBuffer - getBytesAvailable();
00180     pSegOutputStream->consumeWritePointer(cbConsumed);
00181     cbBuffer -= cbConsumed;
00182     pSegOutputStream->updatePage();
00183 }
00184 
00185 SharedSegment SpillOutputStream::getSegment()
00186 {
00187     if (pSegOutputStream) {
00188         return pSegOutputStream->getSegment();
00189     } else {
00190         return SharedSegment();
00191     }
00192 }
00193 
00194 SharedSegOutputStream SpillOutputStream::getSegOutputStream()
00195 {
00196     return pSegOutputStream;
00197 }
00198 
00199 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $");
00200 
00201 // End SpillOutputStream.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1