SegOutputStream Class Reference

SegOutputStream implements the ByteOutputStream interface by writing data to pages allocated from a Segment. More...

#include <SegOutputStream.h>

Inheritance diagram for SegOutputStream:

SegStream ByteOutputStream ByteStream ByteStream ClosableObject ClosableObject CrcSegOutputStream List of all members.

Public Member Functions

PageId getFirstPageId () const
 Gets the first PageId allocated.
BlockNum getPageCount () const
 
Returns:
the number of pages allocated by this stream

void updatePage ()
 Updates current page header without unlocking; allows a SegInputStream to read the contents from the same thread (but another thread would be locked out).
virtual void getSegPos (SegStreamPosition &pos)
 Obtains the current stream position.
SharedSegment getSegment () const
 
Returns:
segment accessed by this stream

SegmentAccessor const & getSegmentAccessor () const
 
Returns:
segment accessor used by this stream

FileSize getOffset () const
 
Returns:
current offset from beginning of stream

bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.
void writeBytes (void const *pData, uint cbRequested)
 Writes bytes to the stream.
PBuffer getWritePointer (uint cbRequested, uint *pcbActual=NULL)
 Copyless alternative for writing bytes to the stream.
void consumeWritePointer (uint cbUsed)
 Advances stream position after a call to getWritePointer.
void hardPageBreak ()
 Marks the current buffer as complete regardless of how much data it contains.
virtual void setWriteLatency (WriteLatency writeLatency)
 Changes the write latency.
template<class T>
void writeValue (T const &value)
 Writes a fixed-size type to the stream.

Static Public Member Functions

static SharedSegOutputStream newSegOutputStream (SegmentAccessor const &segmentAccessor)
 Creates a new SegOutputStream.

Protected Member Functions

uint getBytesWrittenThisPage () const
 
Returns:
number of bytes already written to current page

virtual void flushBuffer (uint cbRequested)
 Must be implemented by derived class to flush buffered data.
virtual void closeImpl ()
 Must be implemented by derived class to release any resources.
 SegOutputStream (SegmentAccessor const &, uint cbExtraHeader=0)
virtual void writeExtraHeaders (SegStreamNode &node)
 Hook for subclasses; default is to do nothing.
void setBuffer (PBuffer pBuffer, uint cbBuffer)
 Sets the current buffer to be written.
uint getBytesAvailable () const
 
Returns:
number of bytes remaining to be written in current buffer


Protected Attributes

PageId firstPageId
 First PageId allocated.
PageId lastPageId
 Last PageId allocated.
uint cbMaxPageData
 Maximum number of data bytes which can be stored per page (does not include header).
BlockNum nPagesAllocated
 Counter for getPageCount().
SegmentAccessor segmentAccessor
 Accessor for segment containing stream data.
SegStreamLock pageLock
 Lock held on current page, if any.
uint cbPageHeader
 Number of bytes in page header.
FileSize cbOffset
 Byte position in stream.
bool needsClose
WriteLatency writeLatency
 Current write latency mode.

Detailed Description

SegOutputStream implements the ByteOutputStream interface by writing data to pages allocated from a Segment.

The Segment must support the get/setPageSuccessor interface for chaining the pages together.

Definition at line 37 of file SegOutputStream.h.


Constructor & Destructor Documentation

SegOutputStream::SegOutputStream ( SegmentAccessor const &  ,
uint  cbExtraHeader = 0 
) [explicit, protected]

Definition at line 37 of file SegOutputStream.cpp.

References cbMaxPageData, SegStream::cbPageHeader, firstPageId, SegStream::getSegment(), ByteOutputStream::getWritePointer(), lastPageId, nPagesAllocated, NULL_PAGE_ID, WRITE_LAZY, and ByteOutputStream::writeLatency.

Referenced by newSegOutputStream().

00040     : SegStream(segmentAccessor,cbExtraHeader)
00041 {
00042     firstPageId = NULL_PAGE_ID;
00043     lastPageId = NULL_PAGE_ID;
00044     nPagesAllocated = 0;
00045     cbMaxPageData = getSegment()->getUsablePageSize() - cbPageHeader;
00046     writeLatency = WRITE_LAZY;
00047     // force allocation of first page
00048     getWritePointer(1);
00049 }


Member Function Documentation

uint SegOutputStream::getBytesWrittenThisPage (  )  const [protected]

Returns:
number of bytes already written to current page

Definition at line 110 of file SegOutputStream.cpp.

References cbMaxPageData, and ByteOutputStream::getBytesAvailable().

Referenced by getSegPos(), and updatePage().

00111 {
00112     return cbMaxPageData - getBytesAvailable();
00113 }

void SegOutputStream::flushBuffer ( uint  cbRequested  )  [protected, virtual]

Must be implemented by derived class to flush buffered data.

Parameters:
cbRequested if non-zero, the derived class should allocate a new buffer with at least the requested size and call setBuffer

Implements ByteOutputStream.

Definition at line 75 of file SegOutputStream.cpp.

References SegNodeLock< Node >::allocatePage(), cbMaxPageData, SegStream::cbPageHeader, firstPageId, SegPageLock::getCacheAccessor(), SegNodeLock< Node >::getNodeForWrite(), SegPageLock::getPage(), SegStream::getSegment(), SegPageLock::isLocked(), lastPageId, nPagesAllocated, NULL_PAGE_ID, SegStream::pageLock, ByteOutputStream::setBuffer(), SegPageLock::unlock(), updatePage(), WRITE_EAGER_ASYNC, WRITE_LAZY, and ByteOutputStream::writeLatency.

00076 {
00077     assert(cbRequested <= cbMaxPageData);
00078     updatePage();
00079     if (pageLock.isLocked()) {
00080         if (writeLatency != WRITE_LAZY) {
00081             pageLock.getCacheAccessor()->flushPage(
00082                 pageLock.getPage(),
00083                 writeLatency == WRITE_EAGER_ASYNC);
00084         }
00085         pageLock.unlock();
00086     }
00087     if (!cbRequested) {
00088         return;
00089     }
00090     PageId pageId = pageLock.allocatePage();
00091     ++nPagesAllocated;
00092     if (firstPageId == NULL_PAGE_ID) {
00093         firstPageId = pageId;
00094     } else {
00095         getSegment()->setPageSuccessor(lastPageId,pageId);
00096     }
00097     lastPageId = pageId;
00098     SegStreamNode &node = pageLock.getNodeForWrite();
00099     setBuffer(
00100         reinterpret_cast<PBuffer>(&node)+cbPageHeader,
00101         cbMaxPageData);
00102 }

void SegOutputStream::closeImpl (  )  [protected, virtual]

Must be implemented by derived class to release any resources.

Reimplemented from ByteOutputStream.

Definition at line 104 of file SegOutputStream.cpp.

References SegStream::closeImpl(), and ByteOutputStream::closeImpl().

00105 {
00106     ByteOutputStream::closeImpl();
00107     SegStream::closeImpl();
00108 }

void SegOutputStream::writeExtraHeaders ( SegStreamNode node  )  [protected, virtual]

Hook for subclasses; default is to do nothing.

Parameters:
node the node being flushed

Reimplemented in CrcSegOutputStream.

Definition at line 71 of file SegOutputStream.cpp.

Referenced by updatePage().

00072 {
00073 }

SharedSegOutputStream SegOutputStream::newSegOutputStream ( SegmentAccessor const &  segmentAccessor  )  [static]

Creates a new SegOutputStream.

Parameters:
segmentAccessor accessor for the segment in which to store the data
Returns:
shared_ptr to new SegOutputStream

Definition at line 29 of file SegOutputStream.cpp.

References SegStream::segmentAccessor, and SegOutputStream().

Referenced by LhxPartitionWriter::open(), SpillOutputStream::spill(), ExternalSortRunAccessor::storeRun(), BTreeTest::testBulkLoad(), BTreeReadersTest::testReaders(), SegStreamTest::testWriteSeg(), VariableBuildLevel::VariableBuildLevel(), and SegBufferWriter::write().

00031 {
00032     return SharedSegOutputStream(
00033         new SegOutputStream(segmentAccessor),
00034         ClosableObjectDestructor());
00035 }

PageId SegOutputStream::getFirstPageId (  )  const

Gets the first PageId allocated.

For non-linear segments, this is required in order to be able to read the data back via SegInputStream.

Returns:
the first PageId allocated, or NULL_PAGE_ID if no data has been written to the stream yet

Definition at line 51 of file SegOutputStream.cpp.

References firstPageId.

00052 {
00053     return firstPageId;
00054 }

BlockNum SegOutputStream::getPageCount (  )  const

Returns:
the number of pages allocated by this stream

Definition at line 56 of file SegOutputStream.cpp.

References nPagesAllocated.

00057 {
00058     return nPagesAllocated;
00059 }

void SegOutputStream::updatePage (  ) 

Updates current page header without unlocking; allows a SegInputStream to read the contents from the same thread (but another thread would be locked out).

Definition at line 61 of file SegOutputStream.cpp.

References SegStreamNode::cbData, getBytesWrittenThisPage(), SegNodeLock< Node >::getNodeForWrite(), SegPageLock::isLocked(), SegStream::pageLock, and writeExtraHeaders().

Referenced by flushBuffer().

00062 {
00063     if (!pageLock.isLocked()) {
00064         return;
00065     }
00066     SegStreamNode &node = pageLock.getNodeForWrite();
00067     node.cbData = getBytesWrittenThisPage();
00068     writeExtraHeaders(node);
00069 }

void SegOutputStream::getSegPos ( SegStreamPosition pos  )  [virtual]

Obtains the current stream position.

Parameters:
pos receives the position

Implements SegStream.

Definition at line 115 of file SegOutputStream.cpp.

References ByteStream::cbOffset, SegStreamPosition::cbOffset, ByteOutputStream::getBytesAvailable(), getBytesWrittenThisPage(), lastPageId, CompoundId::MAX_BYTE_OFFSET, SegStreamPosition::segByteId, CompoundId::setByteOffset(), and CompoundId::setPageId().

00116 {
00117     CompoundId::setPageId(pos.segByteId,lastPageId);
00118     if (getBytesAvailable()) {
00119         CompoundId::setByteOffset(pos.segByteId,getBytesWrittenThisPage());
00120     } else {
00121         // after a hard page break, use a special sentinel value to indicate
00122         // the last byte on the page
00123         CompoundId::setByteOffset(
00124             pos.segByteId,CompoundId::MAX_BYTE_OFFSET);
00125     }
00126     pos.cbOffset = cbOffset;
00127 }

SharedSegment SegStream::getSegment (  )  const [inherited]

Returns:
segment accessed by this stream

Definition at line 43 of file SegStream.cpp.

References SegmentAccessor::pSegment, and SegStream::segmentAccessor.

Referenced by SegInputStream::closeImpl(), flushBuffer(), SegInputStream::readNextBuffer(), SegInputStream::readPrevBuffer(), SegInputStream::SegInputStream(), and SegOutputStream().

00044 {
00045     return segmentAccessor.pSegment;
00046 }

SegmentAccessor const & SegStream::getSegmentAccessor (  )  const [inherited]

Returns:
segment accessor used by this stream

Definition at line 48 of file SegStream.cpp.

References SegStream::segmentAccessor.

00049 {
00050     return segmentAccessor;
00051 }

FileSize ByteStream::getOffset (  )  const [inline, inherited]

Returns:
current offset from beginning of stream

Definition at line 110 of file ByteStream.h.

References ByteStream::cbOffset.

Referenced by SpillOutputStream::getInputStream(), and ByteInputStream::mark().

00111 {
00112     return cbOffset;
00113 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }

void ByteOutputStream::setBuffer ( PBuffer  pBuffer,
uint  cbBuffer 
) [inline, protected, inherited]

Sets the current buffer to be written.

Parameters:
pBuffer receives start address of new buffer
cbBuffer number of bytes in buffer

Definition at line 162 of file ByteOutputStream.h.

References ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.

Referenced by ByteArrayOutputStream::ByteArrayOutputStream(), ByteArrayOutputStream::clear(), SpillOutputStream::flushBuffer(), flushBuffer(), and SpillOutputStream::SpillOutputStream().

00163 {
00164     pNextByte = pBuffer;
00165     cbWritable = cbBuffer;
00166 }

uint ByteOutputStream::getBytesAvailable (  )  const [inline, protected, inherited]

Returns:
number of bytes remaining to be written in current buffer

Definition at line 168 of file ByteOutputStream.h.

References ByteOutputStream::cbWritable.

Referenced by SpillOutputStream::closeImpl(), SpillOutputStream::flushBuffer(), getBytesWrittenThisPage(), getSegPos(), SpillOutputStream::spill(), and SpillOutputStream::updatePage().

00169 {
00170     return cbWritable;
00171 }

void ByteOutputStream::writeBytes ( void const *  pData,
uint  cbRequested 
) [inherited]

Writes bytes to the stream.

Parameters:
pData source buffer containing bytes to be written
cbRequested number of bytes to write

Definition at line 35 of file ByteOutputStream.cpp.

References ByteStream::cbOffset, ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.

Referenced by FtrsTableWriter::execute().

00036 {
00037     cbOffset += cb;
00038     if (!cbWritable) {
00039         flushBuffer(1);
00040     }
00041     for (;;) {
00042         assert(cbWritable);
00043         if (cb <= cbWritable) {
00044             memcpy(pNextByte,pData,cb);
00045             cbWritable -= cb;
00046             pNextByte += cb;
00047             return;
00048         }
00049         memcpy(pNextByte,pData,cbWritable);
00050         pData = static_cast<char const *>(pData) + cbWritable;
00051         cb -= cbWritable;
00052         cbWritable = 0;
00053         flushBuffer(1);
00054     }
00055 }

PBuffer ByteOutputStream::getWritePointer ( uint  cbRequested,
uint pcbActual = NULL 
) [inline, inherited]

Copyless alternative for writing bytes to the stream.

Provides direct access to the stream's internal buffer, but doesn't move the stream position (see consumeWritePointer).

Parameters:
cbRequested number of contiguous bytes to access; if fewer bytes are currently available in the buffer, the buffer is flushed and a new buffer is returned
pcbActual if non-NULL, receives actual number of contiguous writable bytes, which will always be greater than or equal to cbRequested
Returns:
pointer to cbActual bytes of writable buffer space

Definition at line 141 of file ByteOutputStream.h.

References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.

Referenced by BTreeWriter::deleteCurrent(), BTreeWriter::insertTupleFromBuffer(), and SegOutputStream().

00143 {
00144     if (cbWritable < cbRequested) {
00145         flushBuffer(cbRequested);
00146         assert(cbWritable >= cbRequested);
00147     }
00148     if (pcbActual) {
00149         *pcbActual = cbWritable;
00150     }
00151     return pNextByte;
00152 }

void ByteOutputStream::consumeWritePointer ( uint  cbUsed  )  [inline, inherited]

Advances stream position after a call to getWritePointer.

Parameters:
cbUsed number of bytes to advance; must be less than or equal to the value of cbActual returned by the last call to getWritePointer

Definition at line 154 of file ByteOutputStream.h.

References ByteStream::cbOffset, ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.

Referenced by BTreeWriter::deleteCurrent(), and BTreeWriter::insertTupleFromBuffer().

00155 {
00156     assert(cbUsed <= cbWritable);
00157     cbWritable -= cbUsed;
00158     pNextByte += cbUsed;
00159     cbOffset += cbUsed;
00160 }

void ByteOutputStream::hardPageBreak (  )  [inherited]

Marks the current buffer as complete regardless of how much data it contains.

The exact semantics are dependent on the buffering implementation.

Definition at line 62 of file ByteOutputStream.cpp.

References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.

00063 {
00064     flushBuffer(0);
00065     cbWritable = 0;
00066     pNextByte = NULL;
00067 }

void ByteOutputStream::setWriteLatency ( WriteLatency  writeLatency  )  [virtual, inherited]

Changes the write latency.

May not be meaningful for all stream implementations.

Parameters:
writeLatency new WriteLatency setting

Reimplemented in SpillOutputStream.

Definition at line 69 of file ByteOutputStream.cpp.

References ByteOutputStream::writeLatency.

Referenced by SpillOutputStream::setWriteLatency().

00070 {
00071     writeLatency = writeLatencyInit;
00072 }

template<class T>
void ByteOutputStream::writeValue ( T const &  value  )  [inline, inherited]

Writes a fixed-size type to the stream.

Parameters:
value value to read; type must be memcpy-safe

Definition at line 135 of file ByteOutputStream.h.

Referenced by FtrsTableWriter::describeIndex(), LogicalTxnTest::describeParticipant(), FtrsTableWriter::describeParticipant(), BTreeWriter::describeParticipant(), DatabaseTest::executeIncrementAction(), TupleProjection::writePersistent(), and TupleDescriptor::writePersistent().

00136     {
00137         writeBytes(&value,sizeof(value));
00138     }


Member Data Documentation

PageId SegOutputStream::firstPageId [protected]

First PageId allocated.

Definition at line 44 of file SegOutputStream.h.

Referenced by flushBuffer(), getFirstPageId(), and SegOutputStream().

PageId SegOutputStream::lastPageId [protected]

Last PageId allocated.

Definition at line 49 of file SegOutputStream.h.

Referenced by flushBuffer(), getSegPos(), SegOutputStream(), and CrcSegOutputStream::writeExtraHeaders().

uint SegOutputStream::cbMaxPageData [protected]

Maximum number of data bytes which can be stored per page (does not include header).

Definition at line 55 of file SegOutputStream.h.

Referenced by flushBuffer(), getBytesWrittenThisPage(), and SegOutputStream().

BlockNum SegOutputStream::nPagesAllocated [protected]

Counter for getPageCount().

Definition at line 60 of file SegOutputStream.h.

Referenced by flushBuffer(), getPageCount(), and SegOutputStream().

SegmentAccessor SegStream::segmentAccessor [protected, inherited]

Accessor for segment containing stream data.

Definition at line 97 of file SegStream.h.

Referenced by SegStream::closeImpl(), SegStreamAllocation::endWrite(), SegStream::getSegment(), SegStream::getSegmentAccessor(), CrcSegInputStream::lockBufferParanoid(), CrcSegInputStream::newCrcSegInputStream(), CrcSegOutputStream::newCrcSegOutputStream(), SegInputStream::newSegInputStream(), newSegOutputStream(), and SegInputStream::startPrefetch().

SegStreamLock SegStream::pageLock [protected, inherited]

Lock held on current page, if any.

Definition at line 102 of file SegStream.h.

Referenced by SegStream::closeImpl(), SegInputStream::closeImpl(), flushBuffer(), SegInputStream::lockBuffer(), CrcSegInputStream::lockBuffer(), CrcSegInputStream::lockBufferParanoid(), SegInputStream::readNextBuffer(), and updatePage().

uint SegStream::cbPageHeader [protected, inherited]

Number of bytes in page header.

Definition at line 107 of file SegStream.h.

Referenced by flushBuffer(), SegInputStream::lockBuffer(), CrcSegInputStream::lockBufferParanoid(), SegOutputStream(), and SegStream::SegStream().

FileSize ByteStream::cbOffset [protected, inherited]

Byte position in stream.

Definition at line 41 of file ByteStream.h.

Referenced by ByteStream::ByteStream(), ByteInputStream::consumeReadPointer(), ByteOutputStream::consumeWritePointer(), ByteStream::getOffset(), getSegPos(), SegInputStream::getSegPos(), ByteInputStream::readBytes(), ByteInputStream::reset(), ByteArrayInputStream::resetArray(), ByteInputStream::seekBackward(), SegInputStream::seekSegPos(), and ByteOutputStream::writeBytes().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().

WriteLatency ByteOutputStream::writeLatency [protected, inherited]

Current write latency mode.

Definition at line 51 of file ByteOutputStream.h.

Referenced by flushBuffer(), SegOutputStream(), ByteOutputStream::setWriteLatency(), and SpillOutputStream::spill().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:46 2009 for Fennel by  doxygen 1.5.1