00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/SegInputStream.h"
00026
00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $");
00028
00029 SharedSegInputStream SegInputStream::newSegInputStream(
00030 SegmentAccessor const &segmentAccessor,
00031 PageId beginPageId)
00032 {
00033 return SharedSegInputStream(
00034 new SegInputStream(segmentAccessor,beginPageId),
00035 ClosableObjectDestructor());
00036 }
00037
00038 SegInputStream::SegInputStream(
00039 SegmentAccessor const &segmentAccessor,
00040 PageId beginPageId,
00041 uint cbExtraHeader)
00042 : SegStream(segmentAccessor,cbExtraHeader)
00043 {
00044 if (beginPageId == FIRST_LINEAR_PAGE_ID) {
00045 assert(
00046 getSegment()->getAllocationOrder() == Segment::LINEAR_ALLOCATION);
00047 }
00048 currPageId = beginPageId;
00049 shouldDeallocate = false;
00050 }
00051
00052 void SegInputStream::startPrefetch()
00053 {
00054 pageIter.mapRange(segmentAccessor,currPageId);
00055 }
00056
00057 void SegInputStream::endPrefetch()
00058 {
00059 pageIter.makeSingular();
00060 }
00061
00062 void SegInputStream::lockBuffer()
00063 {
00064 pageLock.lockShared(currPageId);
00065 SegStreamNode const &node = pageLock.getNodeForRead();
00066 PConstBuffer pFirstByte =
00067 reinterpret_cast<PConstBuffer>(&node) + cbPageHeader;
00068 setBuffer(pFirstByte,node.cbData);
00069 }
00070
00071 void SegInputStream::readNextBuffer()
00072 {
00073 nullifyBuffer();
00074 if (pageLock.isLocked()) {
00075 if (currPageId != NULL_PAGE_ID) {
00076 if (pageIter.isSingular()) {
00077 currPageId = getSegment()->getPageSuccessor(currPageId);
00078 } else {
00079 ++pageIter;
00080 assert(*pageIter == getSegment()->getPageSuccessor(currPageId));
00081 currPageId = *pageIter;
00082 }
00083 }
00084 if (shouldDeallocate) {
00085 pageLock.deallocateLockedPage();
00086 } else {
00087 pageLock.unlock();
00088 }
00089 }
00090 if (currPageId == NULL_PAGE_ID) {
00091 return;
00092 }
00093 lockBuffer();
00094 }
00095
00096 void SegInputStream::readPrevBuffer()
00097 {
00098 assert(pageIter.isSingular());
00099 assert(
00100 getSegment()->getAllocationOrder() >= Segment::CONSECUTIVE_ALLOCATION);
00101 nullifyBuffer();
00102 if (Segment::getLinearBlockNum(currPageId) == 0) {
00103 return;
00104 }
00105 --currPageId;
00106 lockBuffer();
00107 }
00108
00109 void SegInputStream::closeImpl()
00110 {
00111 pageIter.makeSingular();
00112 if (shouldDeallocate) {
00113 pageLock.unlock();
00114 while (currPageId != NULL_PAGE_ID) {
00115 PageId nextPageId = getSegment()->getPageSuccessor(currPageId);
00116 pageLock.deallocateUnlockedPage(currPageId);
00117 currPageId = nextPageId;
00118 }
00119 }
00120 SegStream::closeImpl();
00121 }
00122
00123 void SegInputStream::getSegPos(SegStreamPosition &pos)
00124 {
00125 CompoundId::setPageId(pos.segByteId,currPageId);
00126 CompoundId::setByteOffset(pos.segByteId,getBytesConsumed());
00127 pos.cbOffset = cbOffset;
00128 }
00129
00130 void SegInputStream::seekSegPos(SegStreamPosition const &pos)
00131 {
00132 assert(pageIter.isSingular());
00133 currPageId = CompoundId::getPageId(pos.segByteId);
00134 lockBuffer();
00135 uint cb = CompoundId::getByteOffset(pos.segByteId);
00136 if (cb == CompoundId::MAX_BYTE_OFFSET) {
00137 consumeReadPointer(getBytesAvailable());
00138 } else {
00139 consumeReadPointer(cb);
00140 }
00141
00142 cbOffset = pos.cbOffset;
00143 }
00144
00145 void SegInputStream::setDeallocate(
00146 bool shouldDeallocateInit)
00147 {
00148 shouldDeallocate = shouldDeallocateInit;
00149 }
00150
00151 bool SegInputStream::isDeallocating()
00152 {
00153 return shouldDeallocate;
00154 }
00155
00156 SharedByteStreamMarker SegInputStream::newMarker()
00157 {
00158 return SharedByteStreamMarker(new SegStreamMarker(*this));
00159 }
00160
00161 void SegInputStream::mark(ByteStreamMarker &marker)
00162 {
00163 assert(&(marker.getStream()) == this);
00164
00165
00166 SegStreamMarker &segMarker =
00167 dynamic_cast<SegStreamMarker &>(marker);
00168 getSegPos(segMarker.segPos);
00169 }
00170
00171 void SegInputStream::reset(ByteStreamMarker const &marker)
00172 {
00173 assert(&(marker.getStream()) == this);
00174
00175
00176 SegStreamMarker const &segMarker =
00177 dynamic_cast<SegStreamMarker const &>(marker);
00178
00179
00180 bool prefetch = !pageIter.isSingular();
00181 endPrefetch();
00182
00183 seekSegPos(segMarker.segPos);
00184
00185
00186 if (prefetch) {
00187 startPrefetch();
00188 }
00189 }
00190
00191 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $");
00192
00193