#include <SpillOutputStream.h>
Inheritance diagram for SpillOutputStream:
Public Member Functions | |
virtual | ~SpillOutputStream () |
SharedByteInputStream | getInputStream (SeekPosition seekPosition=SEEK_STREAM_BEGIN) |
Obtains a ByteInputStream suitable for accessing the contents of this SpillOutputStream. | |
SharedSegment | getSegment () |
Obtains a reference to the underlying segment if this stream has spilled. | |
SharedSegOutputStream | getSegOutputStream () |
Obtains a reference to the underlying SegOutputStream if this stream has spilled. | |
virtual void | setWriteLatency (WriteLatency writeLatency) |
Changes the write latency. | |
void | writeBytes (void const *pData, uint cbRequested) |
Writes bytes to the stream. | |
PBuffer | getWritePointer (uint cbRequested, uint *pcbActual=NULL) |
Copyless alternative for writing bytes to the stream. | |
void | consumeWritePointer (uint cbUsed) |
Advances stream position after a call to getWritePointer. | |
void | hardPageBreak () |
Marks the current buffer as complete regardless of how much data it contains. | |
template<class T> | |
void | writeValue (T const &value) |
Writes a fixed-size type to the stream. | |
FileSize | getOffset () const |
| |
bool | isClosed () const |
| |
void | close () |
Closes this object, releasing any unallocated resources. | |
Static Public Member Functions | |
static SharedSpillOutputStream | newSpillOutputStream (SharedSegmentFactory pSegmentFactory, SharedCacheAccessor pCacheAccessor, std::string spillFileName) |
Creates a new SpillOutputStream. | |
Protected Member Functions | |
void | setBuffer (PBuffer pBuffer, uint cbBuffer) |
Sets the current buffer to be written. | |
uint | getBytesAvailable () const |
| |
Protected Attributes | |
WriteLatency | writeLatency |
Current write latency mode. | |
FileSize | cbOffset |
Byte position in stream. | |
bool | needsClose |
Private Member Functions | |
virtual void | flushBuffer (uint cbRequested) |
Must be implemented by derived class to flush buffered data. | |
virtual void | closeImpl () |
Must be implemented by derived class to release any resources. | |
void | spill () |
void | updatePage () |
SpillOutputStream (SharedSegmentFactory, SharedCacheAccessor, std::string) | |
Private Attributes | |
SharedSegmentFactory | pSegmentFactory |
Factory to use for creating spill segment if necessary. | |
SharedCacheAccessor | pCacheAccessor |
CacheAccessor to use for accessing spill segment if necessary. | |
SharedSegOutputStream | pSegOutputStream |
Spill segment output stream. | |
uint | cbBuffer |
Total number of bytes locked in current page. | |
SegmentAccessor | scratchAccessor |
Accessor for scratch segment. | |
SegPageLock | scratchPageLock |
Page lock on scratch page for short streams. | |
std::string | spillFileName |
Filename to assign to spill segment. |
If this overflows, the contents are spilled to a new SegOutputStream to which all further writes are directed.
Definition at line 40 of file SpillOutputStream.h.
SpillOutputStream::SpillOutputStream | ( | SharedSegmentFactory | , | |
SharedCacheAccessor | , | |||
std::string | ||||
) | [explicit, private] |
Definition at line 45 of file SpillOutputStream.cpp.
References SegPageLock::accessSegment(), SegPageLock::allocatePage(), cbBuffer, SegPageLock::getPage(), CachePage::getWritableData(), pCacheAccessor, SegmentAccessor::pSegment, pSegmentFactory, scratchAccessor, scratchPageLock, and ByteOutputStream::setBuffer().
Referenced by newSpillOutputStream().
00049 : pSegmentFactory(pSegmentFactoryInit), 00050 pCacheAccessor(pCacheAccessorInit), 00051 spillFileName(spillFileNameInit) 00052 { 00053 // REVIEW: this causes pCacheAccessor to lose its chance to intercept any 00054 // of the scratch calls 00055 scratchAccessor = pSegmentFactory->newScratchSegment( 00056 pCacheAccessor->getCache(), 00057 1); 00058 scratchPageLock.accessSegment(scratchAccessor); 00059 scratchPageLock.allocatePage(); 00060 cbBuffer = scratchAccessor.pSegment->getUsablePageSize(); 00061 setBuffer( 00062 scratchPageLock.getPage().getWritableData(), 00063 cbBuffer); 00064 }
SpillOutputStream::~SpillOutputStream | ( | ) | [virtual] |
void SpillOutputStream::flushBuffer | ( | uint | cbRequested | ) | [private, virtual] |
Must be implemented by derived class to flush buffered data.
cbRequested | if non-zero, the derived class should allocate a new buffer with at least the requested size and call setBuffer |
Implements ByteOutputStream.
Definition at line 70 of file SpillOutputStream.cpp.
References cbBuffer, ByteOutputStream::getBytesAvailable(), SegPageLock::isLocked(), pSegOutputStream, scratchPageLock, ByteOutputStream::setBuffer(), and spill().
00071 { 00072 if (scratchPageLock.isLocked()) { 00073 assert(!pSegOutputStream); 00074 // grow from short to long 00075 spill(); 00076 } else { 00077 assert(pSegOutputStream); 00078 assert(!scratchPageLock.isLocked()); 00079 // already long 00080 assert(cbBuffer >= getBytesAvailable()); 00081 pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable()); 00082 } 00083 assert(pSegOutputStream); 00084 if (cbRequested) { 00085 PBuffer pBuffer = 00086 pSegOutputStream->getWritePointer(cbRequested,&cbBuffer); 00087 setBuffer(pBuffer,cbBuffer); 00088 } else { 00089 pSegOutputStream->hardPageBreak(); 00090 cbBuffer = 0; 00091 } 00092 }
void SpillOutputStream::closeImpl | ( | ) | [private, virtual] |
Must be implemented by derived class to release any resources.
Reimplemented from ByteOutputStream.
Definition at line 94 of file SpillOutputStream.cpp.
References cbBuffer, ByteOutputStream::getBytesAvailable(), SegPageLock::isLocked(), pSegOutputStream, scratchPageLock, and SegPageLock::unlock().
00095 { 00096 if (scratchPageLock.isLocked()) { 00097 // discard contents 00098 scratchPageLock.unlock(); 00099 } else { 00100 assert(pSegOutputStream); 00101 assert(!scratchPageLock.isLocked()); 00102 // flush long log 00103 pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable()); 00104 cbBuffer = 0; 00105 pSegOutputStream.reset(); 00106 } 00107 }
void SpillOutputStream::spill | ( | ) | [private] |
Definition at line 123 of file SpillOutputStream.cpp.
References cbBuffer, DeviceMode::createNew, DeviceMode::direct, ByteOutputStream::getBytesAvailable(), SegPageLock::getCacheAccessor(), SegPageLock::getPage(), CachePage::getReadableData(), SegOutputStream::newSegOutputStream(), pCacheAccessor, pSegmentFactory, pSegOutputStream, scratchPageLock, spillFileName, SegPageLock::unlock(), and ByteOutputStream::writeLatency.
Referenced by flushBuffer().
00124 { 00125 DeviceMode devMode = DeviceMode::createNew; 00126 // TODO: make this a parameter; for now it's always direct since this is 00127 // only used for log streams 00128 devMode.direct = true; 00129 SharedSegment pLongLogSegment = 00130 pSegmentFactory->newTempDeviceSegment( 00131 scratchPageLock.getCacheAccessor()->getCache(), 00132 devMode, 00133 spillFileName); 00134 SegmentAccessor segmentAccessor(pLongLogSegment,pCacheAccessor); 00135 pSegOutputStream = SegOutputStream::newSegOutputStream(segmentAccessor); 00136 pSegOutputStream->setWriteLatency(writeLatency); 00137 pSegOutputStream->writeBytes( 00138 scratchPageLock.getPage().getReadableData(), 00139 cbBuffer - getBytesAvailable()); 00140 scratchPageLock.unlock(); 00141 }
void SpillOutputStream::updatePage | ( | ) | [private] |
Definition at line 173 of file SpillOutputStream.cpp.
References cbBuffer, ByteOutputStream::getBytesAvailable(), and pSegOutputStream.
Referenced by getInputStream().
00174 { 00175 if (!cbBuffer) { 00176 return; 00177 } 00178 assert(cbBuffer > getBytesAvailable()); 00179 uint cbConsumed = cbBuffer - getBytesAvailable(); 00180 pSegOutputStream->consumeWritePointer(cbConsumed); 00181 cbBuffer -= cbConsumed; 00182 pSegOutputStream->updatePage(); 00183 }
SharedSpillOutputStream SpillOutputStream::newSpillOutputStream | ( | SharedSegmentFactory | pSegmentFactory, | |
SharedCacheAccessor | pCacheAccessor, | |||
std::string | spillFileName | |||
) | [static] |
Creates a new SpillOutputStream.
pSegmentFactory | the SegmentFactory to use if the output stream spills | |
pCacheAccessor | the CacheAccessor to use | |
spillFileName | filename to assign to spill segment |
Definition at line 35 of file SpillOutputStream.cpp.
References SpillOutputStream().
Referenced by LogicalTxn::LogicalTxn(), and SegStreamTest::testWriteSpillAndRead().
00039 { 00040 return SharedSpillOutputStream( 00041 new SpillOutputStream(pSegmentFactory,pCacheAccessor,spillFileName), 00042 ClosableObjectDestructor()); 00043 }
SharedByteInputStream SpillOutputStream::getInputStream | ( | SeekPosition | seekPosition = SEEK_STREAM_BEGIN |
) |
Obtains a ByteInputStream suitable for accessing the contents of this SpillOutputStream.
If spill has already occurred, then this is a SegInputStream, otherwise a ByteArrayInputStream.
seekPosition | if SEEK_STREAM_BEGIN (the default), the input stream is initially positioned before the first byte of stream data; otherwise, after the last byte |
Definition at line 143 of file SpillOutputStream.cpp.
References ByteStream::getOffset(), SegPageLock::getPage(), CachePage::getReadableData(), SegPageLock::isLocked(), ByteArrayInputStream::newByteArrayInputStream(), SegInputStream::newSegInputStream(), pCacheAccessor, pSegOutputStream, scratchPageLock, SEEK_STREAM_END, and updatePage().
00145 { 00146 if (scratchPageLock.isLocked()) { 00147 SharedByteInputStream pInputStream = 00148 ByteArrayInputStream::newByteArrayInputStream( 00149 scratchPageLock.getPage().getReadableData(), 00150 getOffset()); 00151 if (seekPosition == SEEK_STREAM_END) { 00152 pInputStream->seekForward(getOffset()); 00153 } 00154 return pInputStream; 00155 } else { 00156 assert(pSegOutputStream); 00157 updatePage(); 00158 SharedSegment pSegment = pSegOutputStream->getSegment(); 00159 SegStreamPosition endPos; 00160 if (seekPosition == SEEK_STREAM_END) { 00161 pSegOutputStream->getSegPos(endPos); 00162 } 00163 SegmentAccessor segmentAccessor(pSegment,pCacheAccessor); 00164 SharedSegInputStream pInputStream = 00165 SegInputStream::newSegInputStream(segmentAccessor); 00166 if (seekPosition == SEEK_STREAM_END) { 00167 pInputStream->seekSegPos(endPos); 00168 } 00169 return pInputStream; 00170 } 00171 }
SharedSegment SpillOutputStream::getSegment | ( | ) |
Obtains a reference to the underlying segment if this stream has spilled.
Definition at line 185 of file SpillOutputStream.cpp.
References pSegOutputStream.
00186 { 00187 if (pSegOutputStream) { 00188 return pSegOutputStream->getSegment(); 00189 } else { 00190 return SharedSegment(); 00191 } 00192 }
SharedSegOutputStream SpillOutputStream::getSegOutputStream | ( | ) |
Obtains a reference to the underlying SegOutputStream if this stream has spilled.
Definition at line 194 of file SpillOutputStream.cpp.
References pSegOutputStream.
00195 { 00196 return pSegOutputStream; 00197 }
void SpillOutputStream::setWriteLatency | ( | WriteLatency | writeLatency | ) | [virtual] |
Changes the write latency.
May not be meaningful for all stream implementations.
writeLatency | new WriteLatency setting |
Reimplemented from ByteOutputStream.
Definition at line 109 of file SpillOutputStream.cpp.
References pSegOutputStream, and ByteOutputStream::setWriteLatency().
00110 { 00111 ByteOutputStream::setWriteLatency(writeLatency); 00112 if (pSegOutputStream) { 00113 pSegOutputStream->setWriteLatency(writeLatency); 00114 } 00115 }
Sets the current buffer to be written.
pBuffer | receives start address of new buffer | |
cbBuffer | number of bytes in buffer |
Definition at line 162 of file ByteOutputStream.h.
References ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.
Referenced by ByteArrayOutputStream::ByteArrayOutputStream(), ByteArrayOutputStream::clear(), flushBuffer(), SegOutputStream::flushBuffer(), and SpillOutputStream().
00163 { 00164 pNextByte = pBuffer; 00165 cbWritable = cbBuffer; 00166 }
uint ByteOutputStream::getBytesAvailable | ( | ) | const [inline, protected, inherited] |
Definition at line 168 of file ByteOutputStream.h.
References ByteOutputStream::cbWritable.
Referenced by closeImpl(), flushBuffer(), SegOutputStream::getBytesWrittenThisPage(), SegOutputStream::getSegPos(), spill(), and updatePage().
00169 { 00170 return cbWritable; 00171 }
void ByteOutputStream::writeBytes | ( | void const * | pData, | |
uint | cbRequested | |||
) | [inherited] |
Writes bytes to the stream.
pData | source buffer containing bytes to be written | |
cbRequested | number of bytes to write |
Definition at line 35 of file ByteOutputStream.cpp.
References ByteStream::cbOffset, ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.
Referenced by FtrsTableWriter::execute().
00036 { 00037 cbOffset += cb; 00038 if (!cbWritable) { 00039 flushBuffer(1); 00040 } 00041 for (;;) { 00042 assert(cbWritable); 00043 if (cb <= cbWritable) { 00044 memcpy(pNextByte,pData,cb); 00045 cbWritable -= cb; 00046 pNextByte += cb; 00047 return; 00048 } 00049 memcpy(pNextByte,pData,cbWritable); 00050 pData = static_cast<char const *>(pData) + cbWritable; 00051 cb -= cbWritable; 00052 cbWritable = 0; 00053 flushBuffer(1); 00054 } 00055 }
PBuffer ByteOutputStream::getWritePointer | ( | uint | cbRequested, | |
uint * | pcbActual = NULL | |||
) | [inline, inherited] |
Copyless alternative for writing bytes to the stream.
Provides direct access to the stream's internal buffer, but doesn't move the stream position (see consumeWritePointer).
cbRequested | number of contiguous bytes to access; if fewer bytes are currently available in the buffer, the buffer is flushed and a new buffer is returned | |
pcbActual | if non-NULL, receives actual number of contiguous writable bytes, which will always be greater than or equal to cbRequested |
Definition at line 141 of file ByteOutputStream.h.
References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.
Referenced by BTreeWriter::deleteCurrent(), BTreeWriter::insertTupleFromBuffer(), and SegOutputStream::SegOutputStream().
00143 { 00144 if (cbWritable < cbRequested) { 00145 flushBuffer(cbRequested); 00146 assert(cbWritable >= cbRequested); 00147 } 00148 if (pcbActual) { 00149 *pcbActual = cbWritable; 00150 } 00151 return pNextByte; 00152 }
void ByteOutputStream::consumeWritePointer | ( | uint | cbUsed | ) | [inline, inherited] |
Advances stream position after a call to getWritePointer.
cbUsed | number of bytes to advance; must be less than or equal to the value of cbActual returned by the last call to getWritePointer |
Definition at line 154 of file ByteOutputStream.h.
References ByteStream::cbOffset, ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.
Referenced by BTreeWriter::deleteCurrent(), and BTreeWriter::insertTupleFromBuffer().
00155 { 00156 assert(cbUsed <= cbWritable); 00157 cbWritable -= cbUsed; 00158 pNextByte += cbUsed; 00159 cbOffset += cbUsed; 00160 }
void ByteOutputStream::hardPageBreak | ( | ) | [inherited] |
Marks the current buffer as complete regardless of how much data it contains.
The exact semantics are dependent on the buffering implementation.
Definition at line 62 of file ByteOutputStream.cpp.
References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.
00063 { 00064 flushBuffer(0); 00065 cbWritable = 0; 00066 pNextByte = NULL; 00067 }
void ByteOutputStream::writeValue | ( | T const & | value | ) | [inline, inherited] |
Writes a fixed-size type to the stream.
value | value to read; type must be memcpy-safe |
Definition at line 135 of file ByteOutputStream.h.
Referenced by FtrsTableWriter::describeIndex(), LogicalTxnTest::describeParticipant(), FtrsTableWriter::describeParticipant(), BTreeWriter::describeParticipant(), DatabaseTest::executeIncrementAction(), TupleProjection::writePersistent(), and TupleDescriptor::writePersistent().
00136 { 00137 writeBytes(&value,sizeof(value)); 00138 }
FileSize ByteStream::getOffset | ( | ) | const [inline, inherited] |
Definition at line 110 of file ByteStream.h.
References ByteStream::cbOffset.
Referenced by getInputStream(), and ByteInputStream::mark().
00111 { 00112 return cbOffset; 00113 }
bool ClosableObject::isClosed | ( | ) | const [inline, inherited] |
Definition at line 58 of file ClosableObject.h.
00059 { 00060 return !needsClose; 00061 }
void ClosableObject::close | ( | ) | [inherited] |
Closes this object, releasing any unallocated resources.
Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.
Definition at line 39 of file ClosableObject.cpp.
References ClosableObject::closeImpl(), and ClosableObject::needsClose.
Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().
00040 { 00041 if (!needsClose) { 00042 return; 00043 } 00044 needsClose = false; 00045 closeImpl(); 00046 }
Factory to use for creating spill segment if necessary.
Definition at line 46 of file SpillOutputStream.h.
Referenced by spill(), and SpillOutputStream().
CacheAccessor to use for accessing spill segment if necessary.
Definition at line 51 of file SpillOutputStream.h.
Referenced by getInputStream(), spill(), and SpillOutputStream().
Spill segment output stream.
Definition at line 56 of file SpillOutputStream.h.
Referenced by closeImpl(), flushBuffer(), getInputStream(), getSegment(), getSegOutputStream(), setWriteLatency(), spill(), and updatePage().
uint SpillOutputStream::cbBuffer [private] |
Total number of bytes locked in current page.
Definition at line 61 of file SpillOutputStream.h.
Referenced by closeImpl(), flushBuffer(), spill(), SpillOutputStream(), and updatePage().
Accessor for scratch segment.
Definition at line 66 of file SpillOutputStream.h.
Referenced by SpillOutputStream().
Page lock on scratch page for short streams.
Definition at line 71 of file SpillOutputStream.h.
Referenced by closeImpl(), flushBuffer(), getInputStream(), spill(), and SpillOutputStream().
std::string SpillOutputStream::spillFileName [private] |
Filename to assign to spill segment.
Definition at line 76 of file SpillOutputStream.h.
Referenced by spill().
WriteLatency ByteOutputStream::writeLatency [protected, inherited] |
Current write latency mode.
Definition at line 51 of file ByteOutputStream.h.
Referenced by SegOutputStream::flushBuffer(), SegOutputStream::SegOutputStream(), ByteOutputStream::setWriteLatency(), and spill().
FileSize ByteStream::cbOffset [protected, inherited] |
Byte position in stream.
Definition at line 41 of file ByteStream.h.
Referenced by ByteStream::ByteStream(), ByteInputStream::consumeReadPointer(), ByteOutputStream::consumeWritePointer(), ByteStream::getOffset(), SegOutputStream::getSegPos(), SegInputStream::getSegPos(), ByteInputStream::readBytes(), ByteInputStream::reset(), ByteArrayInputStream::resetArray(), ByteInputStream::seekBackward(), SegInputStream::seekSegPos(), and ByteOutputStream::writeBytes().
bool ClosableObject::needsClose [protected, inherited] |
Definition at line 44 of file ClosableObject.h.
Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().