#include <SegOutputStream.h>
Inheritance diagram for SegOutputStream:
Public Member Functions | |
PageId | getFirstPageId () const |
Gets the first PageId allocated. | |
BlockNum | getPageCount () const |
| |
void | updatePage () |
Updates current page header without unlocking; allows a SegInputStream to read the contents from the same thread (but another thread would be locked out). | |
virtual void | getSegPos (SegStreamPosition &pos) |
Obtains the current stream position. | |
SharedSegment | getSegment () const |
| |
SegmentAccessor const & | getSegmentAccessor () const |
| |
FileSize | getOffset () const |
| |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
void | writeBytes (void const *pData, uint cbRequested) |
Writes bytes to the stream. | |
PBuffer | getWritePointer (uint cbRequested, uint *pcbActual=NULL) |
Copyless alternative for writing bytes to the stream. | |
void | consumeWritePointer (uint cbUsed) |
Advances stream position after a call to getWritePointer. | |
void | hardPageBreak () |
Marks the current buffer as complete regardless of how much data it contains. | |
virtual void | setWriteLatency (WriteLatency writeLatency) |
Changes the write latency. | |
template<class T> | |
void | writeValue (T const &value) |
Writes a fixed-size type to the stream. | |
Static Public Member Functions | |
static SharedSegOutputStream | newSegOutputStream (SegmentAccessor const &segmentAccessor) |
Creates a new SegOutputStream. | |
Protected Member Functions | |
uint | getBytesWrittenThisPage () const |
| |
virtual void | flushBuffer (uint cbRequested) |
Must be implemented by derived class to flush buffered data. | |
virtual void | closeImpl () |
Must be implemented by derived class to release any resources. | |
SegOutputStream (SegmentAccessor const &, uint cbExtraHeader=0) | |
virtual void | writeExtraHeaders (SegStreamNode &node) |
Hook for subclasses; default is to do nothing. | |
void | setBuffer (PBuffer pBuffer, uint cbBuffer) |
Sets the current buffer to be written. | |
uint | getBytesAvailable () const |
| |
Protected Attributes | |
PageId | firstPageId |
First PageId allocated. | |
PageId | lastPageId |
Last PageId allocated. | |
uint | cbMaxPageData |
Maximum number of data bytes which can be stored per page (does not include header). | |
BlockNum | nPagesAllocated |
Counter for getPageCount(). | |
SegmentAccessor | segmentAccessor |
Accessor for segment containing stream data. | |
SegStreamLock | pageLock |
Lock held on current page, if any. | |
uint | cbPageHeader |
Number of bytes in page header. | |
FileSize | cbOffset |
Byte position in stream. | |
bool | needsClose |
WriteLatency | writeLatency |
Current write latency mode. |
The Segment must support the get/setPageSuccessor interface for chaining the pages together.
Definition at line 37 of file SegOutputStream.h.
SegOutputStream::SegOutputStream | ( | SegmentAccessor const & | , | |
uint | cbExtraHeader = 0 | |||
) | [explicit, protected] |
Definition at line 37 of file SegOutputStream.cpp.
References cbMaxPageData, SegStream::cbPageHeader, firstPageId, SegStream::getSegment(), ByteOutputStream::getWritePointer(), lastPageId, nPagesAllocated, NULL_PAGE_ID, WRITE_LAZY, and ByteOutputStream::writeLatency.
Referenced by newSegOutputStream().
00040 : SegStream(segmentAccessor,cbExtraHeader) 00041 { 00042 firstPageId = NULL_PAGE_ID; 00043 lastPageId = NULL_PAGE_ID; 00044 nPagesAllocated = 0; 00045 cbMaxPageData = getSegment()->getUsablePageSize() - cbPageHeader; 00046 writeLatency = WRITE_LAZY; 00047 // force allocation of first page 00048 getWritePointer(1); 00049 }
uint SegOutputStream::getBytesWrittenThisPage | ( | ) | const [protected] |
Definition at line 110 of file SegOutputStream.cpp.
References cbMaxPageData, and ByteOutputStream::getBytesAvailable().
Referenced by getSegPos(), and updatePage().
00111 { 00112 return cbMaxPageData - getBytesAvailable(); 00113 }
void SegOutputStream::flushBuffer | ( | uint | cbRequested | ) | [protected, virtual] |
Must be implemented by derived class to flush buffered data.
cbRequested | if non-zero, the derived class should allocate a new buffer with at least the requested size and call setBuffer |
Implements ByteOutputStream.
Definition at line 75 of file SegOutputStream.cpp.
References SegNodeLock< Node >::allocatePage(), cbMaxPageData, SegStream::cbPageHeader, firstPageId, SegPageLock::getCacheAccessor(), SegNodeLock< Node >::getNodeForWrite(), SegPageLock::getPage(), SegStream::getSegment(), SegPageLock::isLocked(), lastPageId, nPagesAllocated, NULL_PAGE_ID, SegStream::pageLock, ByteOutputStream::setBuffer(), SegPageLock::unlock(), updatePage(), WRITE_EAGER_ASYNC, WRITE_LAZY, and ByteOutputStream::writeLatency.
00076 { 00077 assert(cbRequested <= cbMaxPageData); 00078 updatePage(); 00079 if (pageLock.isLocked()) { 00080 if (writeLatency != WRITE_LAZY) { 00081 pageLock.getCacheAccessor()->flushPage( 00082 pageLock.getPage(), 00083 writeLatency == WRITE_EAGER_ASYNC); 00084 } 00085 pageLock.unlock(); 00086 } 00087 if (!cbRequested) { 00088 return; 00089 } 00090 PageId pageId = pageLock.allocatePage(); 00091 ++nPagesAllocated; 00092 if (firstPageId == NULL_PAGE_ID) { 00093 firstPageId = pageId; 00094 } else { 00095 getSegment()->setPageSuccessor(lastPageId,pageId); 00096 } 00097 lastPageId = pageId; 00098 SegStreamNode &node = pageLock.getNodeForWrite(); 00099 setBuffer( 00100 reinterpret_cast<PBuffer>(&node)+cbPageHeader, 00101 cbMaxPageData); 00102 }
void SegOutputStream::closeImpl | ( | ) | [protected, virtual] |
Must be implemented by derived class to release any resources.
Reimplemented from ByteOutputStream.
Definition at line 104 of file SegOutputStream.cpp.
References SegStream::closeImpl(), and ByteOutputStream::closeImpl().
00105 { 00106 ByteOutputStream::closeImpl(); 00107 SegStream::closeImpl(); 00108 }
void SegOutputStream::writeExtraHeaders | ( | SegStreamNode & | node | ) | [protected, virtual] |
Hook for subclasses; default is to do nothing.
node | the node being flushed |
Reimplemented in CrcSegOutputStream.
Definition at line 71 of file SegOutputStream.cpp.
Referenced by updatePage().
SharedSegOutputStream SegOutputStream::newSegOutputStream | ( | SegmentAccessor const & | segmentAccessor | ) | [static] |
Creates a new SegOutputStream.
segmentAccessor | accessor for the segment in which to store the data |
Definition at line 29 of file SegOutputStream.cpp.
References SegStream::segmentAccessor, and SegOutputStream().
Referenced by LhxPartitionWriter::open(), SpillOutputStream::spill(), ExternalSortRunAccessor::storeRun(), BTreeTest::testBulkLoad(), BTreeReadersTest::testReaders(), SegStreamTest::testWriteSeg(), VariableBuildLevel::VariableBuildLevel(), and SegBufferWriter::write().
00031 { 00032 return SharedSegOutputStream( 00033 new SegOutputStream(segmentAccessor), 00034 ClosableObjectDestructor()); 00035 }
PageId SegOutputStream::getFirstPageId | ( | ) | const |
Gets the first PageId allocated.
For non-linear segments, this is required in order to be able to read the data back via SegInputStream.
Definition at line 51 of file SegOutputStream.cpp.
References firstPageId.
00052 { 00053 return firstPageId; 00054 }
BlockNum SegOutputStream::getPageCount | ( | ) | const |
Definition at line 56 of file SegOutputStream.cpp.
References nPagesAllocated.
00057 { 00058 return nPagesAllocated; 00059 }
void SegOutputStream::updatePage | ( | ) |
Updates current page header without unlocking; allows a SegInputStream to read the contents from the same thread (but another thread would be locked out).
Definition at line 61 of file SegOutputStream.cpp.
References SegStreamNode::cbData, getBytesWrittenThisPage(), SegNodeLock< Node >::getNodeForWrite(), SegPageLock::isLocked(), SegStream::pageLock, and writeExtraHeaders().
Referenced by flushBuffer().
00062 { 00063 if (!pageLock.isLocked()) { 00064 return; 00065 } 00066 SegStreamNode &node = pageLock.getNodeForWrite(); 00067 node.cbData = getBytesWrittenThisPage(); 00068 writeExtraHeaders(node); 00069 }
void SegOutputStream::getSegPos | ( | SegStreamPosition & | pos | ) | [virtual] |
Obtains the current stream position.
pos | receives the position |
Implements SegStream.
Definition at line 115 of file SegOutputStream.cpp.
References ByteStream::cbOffset, SegStreamPosition::cbOffset, ByteOutputStream::getBytesAvailable(), getBytesWrittenThisPage(), lastPageId, CompoundId::MAX_BYTE_OFFSET, SegStreamPosition::segByteId, CompoundId::setByteOffset(), and CompoundId::setPageId().
00116 { 00117 CompoundId::setPageId(pos.segByteId,lastPageId); 00118 if (getBytesAvailable()) { 00119 CompoundId::setByteOffset(pos.segByteId,getBytesWrittenThisPage()); 00120 } else { 00121 // after a hard page break, use a special sentinel value to indicate 00122 // the last byte on the page 00123 CompoundId::setByteOffset( 00124 pos.segByteId,CompoundId::MAX_BYTE_OFFSET); 00125 } 00126 pos.cbOffset = cbOffset; 00127 }
SharedSegment SegStream::getSegment | ( | ) | const [inherited] |
Definition at line 43 of file SegStream.cpp.
References SegmentAccessor::pSegment, and SegStream::segmentAccessor.
Referenced by SegInputStream::closeImpl(), flushBuffer(), SegInputStream::readNextBuffer(), SegInputStream::readPrevBuffer(), SegInputStream::SegInputStream(), and SegOutputStream().
00044 { 00045 return segmentAccessor.pSegment; 00046 }
SegmentAccessor const & SegStream::getSegmentAccessor | ( | ) | const [inherited] |
Definition at line 48 of file SegStream.cpp.
References SegStream::segmentAccessor.
00049 { 00050 return segmentAccessor; 00051 }
FileSize ByteStream::getOffset | ( | ) | const [inline, inherited] |
Definition at line 110 of file ByteStream.h.
References ByteStream::cbOffset.
Referenced by SpillOutputStream::getInputStream(), and ByteInputStream::mark().
00111 { 00112 return cbOffset; 00113 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
Sets the current buffer to be written.
pBuffer | receives start address of new buffer | |
cbBuffer | number of bytes in buffer |
Definition at line 162 of file ByteOutputStream.h.
References ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.
Referenced by ByteArrayOutputStream::ByteArrayOutputStream(), ByteArrayOutputStream::clear(), SpillOutputStream::flushBuffer(), flushBuffer(), and SpillOutputStream::SpillOutputStream().
00163 { 00164 pNextByte = pBuffer; 00165 cbWritable = cbBuffer; 00166 }
uint ByteOutputStream::getBytesAvailable | ( | ) | const [inline, protected, inherited] |
Definition at line 168 of file ByteOutputStream.h.
References ByteOutputStream::cbWritable.
Referenced by SpillOutputStream::closeImpl(), SpillOutputStream::flushBuffer(), getBytesWrittenThisPage(), getSegPos(), SpillOutputStream::spill(), and SpillOutputStream::updatePage().
00169 { 00170 return cbWritable; 00171 }
void ByteOutputStream::writeBytes | ( | void const * | pData, | |
uint | cbRequested | |||
) | [inherited] |
Writes bytes to the stream.
pData | source buffer containing bytes to be written | |
cbRequested | number of bytes to write |
Definition at line 35 of file ByteOutputStream.cpp.
References ByteStream::cbOffset, ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.
Referenced by FtrsTableWriter::execute().
00036 { 00037 cbOffset += cb; 00038 if (!cbWritable) { 00039 flushBuffer(1); 00040 } 00041 for (;;) { 00042 assert(cbWritable); 00043 if (cb <= cbWritable) { 00044 memcpy(pNextByte,pData,cb); 00045 cbWritable -= cb; 00046 pNextByte += cb; 00047 return; 00048 } 00049 memcpy(pNextByte,pData,cbWritable); 00050 pData = static_cast<char const *>(pData) + cbWritable; 00051 cb -= cbWritable; 00052 cbWritable = 0; 00053 flushBuffer(1); 00054 } 00055 }
PBuffer ByteOutputStream::getWritePointer | ( | uint | cbRequested, | |
uint * | pcbActual = NULL | |||
) | [inline, inherited] |
Copyless alternative for writing bytes to the stream.
Provides direct access to the stream's internal buffer, but doesn't move the stream position (see consumeWritePointer).
cbRequested | number of contiguous bytes to access; if fewer bytes are currently available in the buffer, the buffer is flushed and a new buffer is returned | |
pcbActual | if non-NULL, receives actual number of contiguous writable bytes, which will always be greater than or equal to cbRequested |
Definition at line 141 of file ByteOutputStream.h.
References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.
Referenced by BTreeWriter::deleteCurrent(), BTreeWriter::insertTupleFromBuffer(), and SegOutputStream().
00143 { 00144 if (cbWritable < cbRequested) { 00145 flushBuffer(cbRequested); 00146 assert(cbWritable >= cbRequested); 00147 } 00148 if (pcbActual) { 00149 *pcbActual = cbWritable; 00150 } 00151 return pNextByte; 00152 }
void ByteOutputStream::consumeWritePointer | ( | uint | cbUsed | ) | [inline, inherited] |
Advances stream position after a call to getWritePointer.
cbUsed | number of bytes to advance; must be less than or equal to the value of cbActual returned by the last call to getWritePointer |
Definition at line 154 of file ByteOutputStream.h.
References ByteStream::cbOffset, ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.
Referenced by BTreeWriter::deleteCurrent(), and BTreeWriter::insertTupleFromBuffer().
00155 { 00156 assert(cbUsed <= cbWritable); 00157 cbWritable -= cbUsed; 00158 pNextByte += cbUsed; 00159 cbOffset += cbUsed; 00160 }
void ByteOutputStream::hardPageBreak | ( | ) | [inherited] |
Marks the current buffer as complete regardless of how much data it contains.
The exact semantics are dependent on the buffering implementation.
Definition at line 62 of file ByteOutputStream.cpp.
References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.
00063 { 00064 flushBuffer(0); 00065 cbWritable = 0; 00066 pNextByte = NULL; 00067 }
void ByteOutputStream::setWriteLatency | ( | WriteLatency | writeLatency | ) | [virtual, inherited] |
Changes the write latency.
May not be meaningful for all stream implementations.
writeLatency | new WriteLatency setting |
Reimplemented in SpillOutputStream.
Definition at line 69 of file ByteOutputStream.cpp.
References ByteOutputStream::writeLatency.
Referenced by SpillOutputStream::setWriteLatency().
00070 { 00071 writeLatency = writeLatencyInit; 00072 }
void ByteOutputStream::writeValue | ( | T const & | value | ) | [inline, inherited] |
Writes a fixed-size type to the stream.
value | value to read; type must be memcpy-safe |
Definition at line 135 of file ByteOutputStream.h.
Referenced by FtrsTableWriter::describeIndex(), LogicalTxnTest::describeParticipant(), FtrsTableWriter::describeParticipant(), BTreeWriter::describeParticipant(), DatabaseTest::executeIncrementAction(), TupleProjection::writePersistent(), and TupleDescriptor::writePersistent().
00136 { 00137 writeBytes(&value,sizeof(value)); 00138 }
PageId SegOutputStream::firstPageId [protected] |
First PageId allocated.
Definition at line 44 of file SegOutputStream.h.
Referenced by flushBuffer(), getFirstPageId(), and SegOutputStream().
PageId SegOutputStream::lastPageId [protected] |
Last PageId allocated.
Definition at line 49 of file SegOutputStream.h.
Referenced by flushBuffer(), getSegPos(), SegOutputStream(), and CrcSegOutputStream::writeExtraHeaders().
uint SegOutputStream::cbMaxPageData [protected] |
Maximum number of data bytes which can be stored per page (does not include header).
Definition at line 55 of file SegOutputStream.h.
Referenced by flushBuffer(), getBytesWrittenThisPage(), and SegOutputStream().
BlockNum SegOutputStream::nPagesAllocated [protected] |
Counter for getPageCount().
Definition at line 60 of file SegOutputStream.h.
Referenced by flushBuffer(), getPageCount(), and SegOutputStream().
SegmentAccessor SegStream::segmentAccessor [protected, inherited] |
Accessor for segment containing stream data.
Definition at line 97 of file SegStream.h.
Referenced by SegStream::closeImpl(), SegStreamAllocation::endWrite(), SegStream::getSegment(), SegStream::getSegmentAccessor(), CrcSegInputStream::lockBufferParanoid(), CrcSegInputStream::newCrcSegInputStream(), CrcSegOutputStream::newCrcSegOutputStream(), SegInputStream::newSegInputStream(), newSegOutputStream(), and SegInputStream::startPrefetch().
SegStreamLock SegStream::pageLock [protected, inherited] |
Lock held on current page, if any.
Definition at line 102 of file SegStream.h.
Referenced by SegStream::closeImpl(), SegInputStream::closeImpl(), flushBuffer(), SegInputStream::lockBuffer(), CrcSegInputStream::lockBuffer(), CrcSegInputStream::lockBufferParanoid(), SegInputStream::readNextBuffer(), and updatePage().
uint SegStream::cbPageHeader [protected, inherited] |
Number of bytes in page header.
Definition at line 107 of file SegStream.h.
Referenced by flushBuffer(), SegInputStream::lockBuffer(), CrcSegInputStream::lockBufferParanoid(), SegOutputStream(), and SegStream::SegStream().
FileSize ByteStream::cbOffset [protected, inherited] |
Byte position in stream.
Definition at line 41 of file ByteStream.h.
Referenced by ByteStream::ByteStream(), ByteInputStream::consumeReadPointer(), ByteOutputStream::consumeWritePointer(), ByteStream::getOffset(), getSegPos(), SegInputStream::getSegPos(), ByteInputStream::readBytes(), ByteInputStream::reset(), ByteArrayInputStream::resetArray(), ByteInputStream::seekBackward(), SegInputStream::seekSegPos(), and ByteOutputStream::writeBytes().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().
WriteLatency ByteOutputStream::writeLatency [protected, inherited] |
Current write latency mode.
Definition at line 51 of file ByteOutputStream.h.
Referenced by flushBuffer(), SegOutputStream(), ByteOutputStream::setWriteLatency(), and SpillOutputStream::spill().