SegInputStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/SegInputStream.h"
00026 
00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $");
00028 
00029 SharedSegInputStream SegInputStream::newSegInputStream(
00030     SegmentAccessor const &segmentAccessor,
00031     PageId beginPageId)
00032 {
00033     return SharedSegInputStream(
00034         new SegInputStream(segmentAccessor,beginPageId),
00035         ClosableObjectDestructor());
00036 }
00037 
00038 SegInputStream::SegInputStream(
00039     SegmentAccessor const &segmentAccessor,
00040     PageId beginPageId,
00041     uint cbExtraHeader)
00042     : SegStream(segmentAccessor,cbExtraHeader)
00043 {
00044     if (beginPageId == FIRST_LINEAR_PAGE_ID) {
00045         assert(
00046             getSegment()->getAllocationOrder() == Segment::LINEAR_ALLOCATION);
00047     }
00048     currPageId = beginPageId;
00049     shouldDeallocate = false;
00050 }
00051 
00052 void SegInputStream::startPrefetch()
00053 {
00054     pageIter.mapRange(segmentAccessor,currPageId);
00055 }
00056 
00057 void SegInputStream::endPrefetch()
00058 {
00059     pageIter.makeSingular();
00060 }
00061 
00062 void SegInputStream::lockBuffer()
00063 {
00064     pageLock.lockShared(currPageId);
00065     SegStreamNode const &node = pageLock.getNodeForRead();
00066     PConstBuffer pFirstByte =
00067         reinterpret_cast<PConstBuffer>(&node) + cbPageHeader;
00068     setBuffer(pFirstByte,node.cbData);
00069 }
00070 
00071 void SegInputStream::readNextBuffer()
00072 {
00073     nullifyBuffer();
00074     if (pageLock.isLocked()) {
00075         if (currPageId != NULL_PAGE_ID) {
00076             if (pageIter.isSingular()) {
00077                 currPageId = getSegment()->getPageSuccessor(currPageId);
00078             } else {
00079                 ++pageIter;
00080                 assert(*pageIter == getSegment()->getPageSuccessor(currPageId));
00081                 currPageId = *pageIter;
00082             }
00083         }
00084         if (shouldDeallocate) {
00085             pageLock.deallocateLockedPage();
00086         } else {
00087             pageLock.unlock();
00088         }
00089     }
00090     if (currPageId == NULL_PAGE_ID) {
00091         return;
00092     }
00093     lockBuffer();
00094 }
00095 
00096 void SegInputStream::readPrevBuffer()
00097 {
00098     assert(pageIter.isSingular());
00099     assert(
00100         getSegment()->getAllocationOrder() >= Segment::CONSECUTIVE_ALLOCATION);
00101     nullifyBuffer();
00102     if (Segment::getLinearBlockNum(currPageId) == 0) {
00103         return;
00104     }
00105     --currPageId;
00106     lockBuffer();
00107 }
00108 
00109 void SegInputStream::closeImpl()
00110 {
00111     pageIter.makeSingular();
00112     if (shouldDeallocate) {
00113         pageLock.unlock();
00114         while (currPageId != NULL_PAGE_ID) {
00115             PageId nextPageId = getSegment()->getPageSuccessor(currPageId);
00116             pageLock.deallocateUnlockedPage(currPageId);
00117             currPageId = nextPageId;
00118         }
00119     }
00120     SegStream::closeImpl();
00121 }
00122 
00123 void SegInputStream::getSegPos(SegStreamPosition &pos)
00124 {
00125     CompoundId::setPageId(pos.segByteId,currPageId);
00126     CompoundId::setByteOffset(pos.segByteId,getBytesConsumed());
00127     pos.cbOffset = cbOffset;
00128 }
00129 
00130 void SegInputStream::seekSegPos(SegStreamPosition const &pos)
00131 {
00132     assert(pageIter.isSingular());
00133     currPageId = CompoundId::getPageId(pos.segByteId);
00134     lockBuffer();
00135     uint cb = CompoundId::getByteOffset(pos.segByteId);
00136     if (cb == CompoundId::MAX_BYTE_OFFSET) {
00137         consumeReadPointer(getBytesAvailable());
00138     } else {
00139         consumeReadPointer(cb);
00140     }
00141 
00142     cbOffset = pos.cbOffset;
00143 }
00144 
00145 void SegInputStream::setDeallocate(
00146     bool shouldDeallocateInit)
00147 {
00148     shouldDeallocate = shouldDeallocateInit;
00149 }
00150 
00151 bool SegInputStream::isDeallocating()
00152 {
00153     return shouldDeallocate;
00154 }
00155 
00156 SharedByteStreamMarker SegInputStream::newMarker()
00157 {
00158     return SharedByteStreamMarker(new SegStreamMarker(*this));
00159 }
00160 
00161 void SegInputStream::mark(ByteStreamMarker &marker)
00162 {
00163     assert(&(marker.getStream()) == this);
00164 
00165     // memorize SegStream-specific info
00166     SegStreamMarker &segMarker =
00167         dynamic_cast<SegStreamMarker &>(marker);
00168     getSegPos(segMarker.segPos);
00169 }
00170 
00171 void SegInputStream::reset(ByteStreamMarker const &marker)
00172 {
00173     assert(&(marker.getStream()) == this);
00174 
00175     // use SegStream-specific info
00176     SegStreamMarker const &segMarker =
00177         dynamic_cast<SegStreamMarker const &>(marker);
00178 
00179     // disable prefetch during seek
00180     bool prefetch = !pageIter.isSingular();
00181     endPrefetch();
00182 
00183     seekSegPos(segMarker.segPos);
00184 
00185     // restore prefetch preference
00186     if (prefetch) {
00187         startPrefetch();
00188     }
00189 }
00190 
00191 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $");
00192 
00193 // End SegInputStream.cpp

Generated on Mon Jun 22 04:00:20 2009 for Fennel by  doxygen 1.5.1