SpillOutputStream Class Reference

SpillOutputStream implements the ByteOutputStream interface by starting with writes to a cache scratch page. More...

#include <SpillOutputStream.h>

Inheritance diagram for SpillOutputStream:

ByteOutputStream ByteStream ClosableObject List of all members.

Public Member Functions

virtual ~SpillOutputStream ()
SharedByteInputStream getInputStream (SeekPosition seekPosition=SEEK_STREAM_BEGIN)
 Obtains a ByteInputStream suitable for accessing the contents of this SpillOutputStream.
SharedSegment getSegment ()
 Obtains a reference to the underlying segment if this stream has spilled.
SharedSegOutputStream getSegOutputStream ()
 Obtains a reference to the underlying SegOutputStream if this stream has spilled.
virtual void setWriteLatency (WriteLatency writeLatency)
 Changes the write latency.
void writeBytes (void const *pData, uint cbRequested)
 Writes bytes to the stream.
PBuffer getWritePointer (uint cbRequested, uint *pcbActual=NULL)
 Copyless alternative for writing bytes to the stream.
void consumeWritePointer (uint cbUsed)
 Advances stream position after a call to getWritePointer.
void hardPageBreak ()
 Marks the current buffer as complete regardless of how much data it contains.
template<class T>
void writeValue (T const &value)
 Writes a fixed-size type to the stream.
FileSize getOffset () const
 
Returns:
current offset from beginning of stream

bool isClosed () const
 
Returns:
whether the object has been closed

void close ()
 Closes this object, releasing any unallocated resources.

Static Public Member Functions

static SharedSpillOutputStream newSpillOutputStream (SharedSegmentFactory pSegmentFactory, SharedCacheAccessor pCacheAccessor, std::string spillFileName)
 Creates a new SpillOutputStream.

Protected Member Functions

void setBuffer (PBuffer pBuffer, uint cbBuffer)
 Sets the current buffer to be written.
uint getBytesAvailable () const
 
Returns:
number of bytes remaining to be written in current buffer


Protected Attributes

WriteLatency writeLatency
 Current write latency mode.
FileSize cbOffset
 Byte position in stream.
bool needsClose

Private Member Functions

virtual void flushBuffer (uint cbRequested)
 Must be implemented by derived class to flush buffered data.
virtual void closeImpl ()
 Must be implemented by derived class to release any resources.
void spill ()
void updatePage ()
 SpillOutputStream (SharedSegmentFactory, SharedCacheAccessor, std::string)

Private Attributes

SharedSegmentFactory pSegmentFactory
 Factory to use for creating spill segment if necessary.
SharedCacheAccessor pCacheAccessor
 CacheAccessor to use for accessing spill segment if necessary.
SharedSegOutputStream pSegOutputStream
 Spill segment output stream.
uint cbBuffer
 Total number of bytes locked in current page.
SegmentAccessor scratchAccessor
 Accessor for scratch segment.
SegPageLock scratchPageLock
 Page lock on scratch page for short streams.
std::string spillFileName
 Filename to assign to spill segment.

Detailed Description

SpillOutputStream implements the ByteOutputStream interface by starting with writes to a cache scratch page.

If this overflows, the contents are spilled to a new SegOutputStream to which all further writes are directed.

Definition at line 40 of file SpillOutputStream.h.


Constructor & Destructor Documentation

SpillOutputStream::SpillOutputStream ( SharedSegmentFactory  ,
SharedCacheAccessor  ,
std::string   
) [explicit, private]

Definition at line 45 of file SpillOutputStream.cpp.

References SegPageLock::accessSegment(), SegPageLock::allocatePage(), cbBuffer, SegPageLock::getPage(), CachePage::getWritableData(), pCacheAccessor, SegmentAccessor::pSegment, pSegmentFactory, scratchAccessor, scratchPageLock, and ByteOutputStream::setBuffer().

Referenced by newSpillOutputStream().

00049     : pSegmentFactory(pSegmentFactoryInit),
00050       pCacheAccessor(pCacheAccessorInit),
00051       spillFileName(spillFileNameInit)
00052 {
00053     // REVIEW:  this causes pCacheAccessor to lose its chance to intercept any
00054     // of the scratch calls
00055     scratchAccessor = pSegmentFactory->newScratchSegment(
00056         pCacheAccessor->getCache(),
00057         1);
00058     scratchPageLock.accessSegment(scratchAccessor);
00059     scratchPageLock.allocatePage();
00060     cbBuffer = scratchAccessor.pSegment->getUsablePageSize();
00061     setBuffer(
00062         scratchPageLock.getPage().getWritableData(),
00063         cbBuffer);
00064 }

SpillOutputStream::~SpillOutputStream (  )  [virtual]

Definition at line 66 of file SpillOutputStream.cpp.

00067 {
00068 }


Member Function Documentation

void SpillOutputStream::flushBuffer ( uint  cbRequested  )  [private, virtual]

Must be implemented by derived class to flush buffered data.

Parameters:
cbRequested if non-zero, the derived class should allocate a new buffer with at least the requested size and call setBuffer

Implements ByteOutputStream.

Definition at line 70 of file SpillOutputStream.cpp.

References cbBuffer, ByteOutputStream::getBytesAvailable(), SegPageLock::isLocked(), pSegOutputStream, scratchPageLock, ByteOutputStream::setBuffer(), and spill().

00071 {
00072     if (scratchPageLock.isLocked()) {
00073         assert(!pSegOutputStream);
00074         // grow from short to long
00075         spill();
00076     } else {
00077         assert(pSegOutputStream);
00078         assert(!scratchPageLock.isLocked());
00079         // already long
00080         assert(cbBuffer >= getBytesAvailable());
00081         pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00082     }
00083     assert(pSegOutputStream);
00084     if (cbRequested) {
00085         PBuffer pBuffer =
00086             pSegOutputStream->getWritePointer(cbRequested,&cbBuffer);
00087         setBuffer(pBuffer,cbBuffer);
00088     } else {
00089         pSegOutputStream->hardPageBreak();
00090         cbBuffer = 0;
00091     }
00092 }

void SpillOutputStream::closeImpl (  )  [private, virtual]

Must be implemented by derived class to release any resources.

Reimplemented from ByteOutputStream.

Definition at line 94 of file SpillOutputStream.cpp.

References cbBuffer, ByteOutputStream::getBytesAvailable(), SegPageLock::isLocked(), pSegOutputStream, scratchPageLock, and SegPageLock::unlock().

00095 {
00096     if (scratchPageLock.isLocked()) {
00097         // discard contents
00098         scratchPageLock.unlock();
00099     } else {
00100         assert(pSegOutputStream);
00101         assert(!scratchPageLock.isLocked());
00102         // flush long log
00103         pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00104         cbBuffer = 0;
00105         pSegOutputStream.reset();
00106     }
00107 }

void SpillOutputStream::spill (  )  [private]

Definition at line 123 of file SpillOutputStream.cpp.

References cbBuffer, DeviceMode::createNew, DeviceMode::direct, ByteOutputStream::getBytesAvailable(), SegPageLock::getCacheAccessor(), SegPageLock::getPage(), CachePage::getReadableData(), SegOutputStream::newSegOutputStream(), pCacheAccessor, pSegmentFactory, pSegOutputStream, scratchPageLock, spillFileName, SegPageLock::unlock(), and ByteOutputStream::writeLatency.

Referenced by flushBuffer().

00124 {
00125     DeviceMode devMode = DeviceMode::createNew;
00126     // TODO:  make this a parameter; for now it's always direct since this is
00127     // only used for log streams
00128     devMode.direct = true;
00129     SharedSegment pLongLogSegment =
00130         pSegmentFactory->newTempDeviceSegment(
00131             scratchPageLock.getCacheAccessor()->getCache(),
00132             devMode,
00133             spillFileName);
00134     SegmentAccessor segmentAccessor(pLongLogSegment,pCacheAccessor);
00135     pSegOutputStream = SegOutputStream::newSegOutputStream(segmentAccessor);
00136     pSegOutputStream->setWriteLatency(writeLatency);
00137     pSegOutputStream->writeBytes(
00138         scratchPageLock.getPage().getReadableData(),
00139         cbBuffer - getBytesAvailable());
00140     scratchPageLock.unlock();
00141 }

void SpillOutputStream::updatePage (  )  [private]

Definition at line 173 of file SpillOutputStream.cpp.

References cbBuffer, ByteOutputStream::getBytesAvailable(), and pSegOutputStream.

Referenced by getInputStream().

00174 {
00175     if (!cbBuffer) {
00176         return;
00177     }
00178     assert(cbBuffer > getBytesAvailable());
00179     uint cbConsumed = cbBuffer - getBytesAvailable();
00180     pSegOutputStream->consumeWritePointer(cbConsumed);
00181     cbBuffer -= cbConsumed;
00182     pSegOutputStream->updatePage();
00183 }

SharedSpillOutputStream SpillOutputStream::newSpillOutputStream ( SharedSegmentFactory  pSegmentFactory,
SharedCacheAccessor  pCacheAccessor,
std::string  spillFileName 
) [static]

Creates a new SpillOutputStream.

Parameters:
pSegmentFactory the SegmentFactory to use if the output stream spills
pCacheAccessor the CacheAccessor to use
spillFileName filename to assign to spill segment

Definition at line 35 of file SpillOutputStream.cpp.

References SpillOutputStream().

Referenced by LogicalTxn::LogicalTxn(), and SegStreamTest::testWriteSpillAndRead().

SharedByteInputStream SpillOutputStream::getInputStream ( SeekPosition  seekPosition = SEEK_STREAM_BEGIN  ) 

Obtains a ByteInputStream suitable for accessing the contents of this SpillOutputStream.

If spill has already occurred, then this is a SegInputStream, otherwise a ByteArrayInputStream.

Parameters:
seekPosition if SEEK_STREAM_BEGIN (the default), the input stream is initially positioned before the first byte of stream data; otherwise, after the last byte
Returns:
new ByteInputStream

Definition at line 143 of file SpillOutputStream.cpp.

References ByteStream::getOffset(), SegPageLock::getPage(), CachePage::getReadableData(), SegPageLock::isLocked(), ByteArrayInputStream::newByteArrayInputStream(), SegInputStream::newSegInputStream(), pCacheAccessor, pSegOutputStream, scratchPageLock, SEEK_STREAM_END, and updatePage().

00145 {
00146     if (scratchPageLock.isLocked()) {
00147         SharedByteInputStream pInputStream =
00148             ByteArrayInputStream::newByteArrayInputStream(
00149                 scratchPageLock.getPage().getReadableData(),
00150                 getOffset());
00151         if (seekPosition == SEEK_STREAM_END) {
00152             pInputStream->seekForward(getOffset());
00153         }
00154         return pInputStream;
00155     } else {
00156         assert(pSegOutputStream);
00157         updatePage();
00158         SharedSegment pSegment = pSegOutputStream->getSegment();
00159         SegStreamPosition endPos;
00160         if (seekPosition == SEEK_STREAM_END) {
00161             pSegOutputStream->getSegPos(endPos);
00162         }
00163         SegmentAccessor segmentAccessor(pSegment,pCacheAccessor);
00164         SharedSegInputStream pInputStream =
00165             SegInputStream::newSegInputStream(segmentAccessor);
00166         if (seekPosition == SEEK_STREAM_END) {
00167             pInputStream->seekSegPos(endPos);
00168         }
00169         return pInputStream;
00170     }
00171 }

SharedSegment SpillOutputStream::getSegment (  ) 

Obtains a reference to the underlying segment if this stream has spilled.

Returns:
the segment, or a singular pointer if the stream has not spilled

Definition at line 185 of file SpillOutputStream.cpp.

References pSegOutputStream.

00186 {
00187     if (pSegOutputStream) {
00188         return pSegOutputStream->getSegment();
00189     } else {
00190         return SharedSegment();
00191     }
00192 }

SharedSegOutputStream SpillOutputStream::getSegOutputStream (  ) 

Obtains a reference to the underlying SegOutputStream if this stream has spilled.

Returns:
the stream, or a singular pointer if the stream has not spilled

Definition at line 194 of file SpillOutputStream.cpp.

References pSegOutputStream.

00195 {
00196     return pSegOutputStream;
00197 }

void SpillOutputStream::setWriteLatency ( WriteLatency  writeLatency  )  [virtual]

Changes the write latency.

May not be meaningful for all stream implementations.

Parameters:
writeLatency new WriteLatency setting

Reimplemented from ByteOutputStream.

Definition at line 109 of file SpillOutputStream.cpp.

References pSegOutputStream, and ByteOutputStream::setWriteLatency().

00110 {
00111     ByteOutputStream::setWriteLatency(writeLatency);
00112     if (pSegOutputStream) {
00113         pSegOutputStream->setWriteLatency(writeLatency);
00114     }
00115 }

void ByteOutputStream::setBuffer ( PBuffer  pBuffer,
uint  cbBuffer 
) [inline, protected, inherited]

Sets the current buffer to be written.

Parameters:
pBuffer receives start address of new buffer
cbBuffer number of bytes in buffer

Definition at line 162 of file ByteOutputStream.h.

References ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.

Referenced by ByteArrayOutputStream::ByteArrayOutputStream(), ByteArrayOutputStream::clear(), flushBuffer(), SegOutputStream::flushBuffer(), and SpillOutputStream().

00163 {
00164     pNextByte = pBuffer;
00165     cbWritable = cbBuffer;
00166 }

uint ByteOutputStream::getBytesAvailable (  )  const [inline, protected, inherited]

Returns:
number of bytes remaining to be written in current buffer

Definition at line 168 of file ByteOutputStream.h.

References ByteOutputStream::cbWritable.

Referenced by closeImpl(), flushBuffer(), SegOutputStream::getBytesWrittenThisPage(), SegOutputStream::getSegPos(), spill(), and updatePage().

00169 {
00170     return cbWritable;
00171 }

void ByteOutputStream::writeBytes ( void const *  pData,
uint  cbRequested 
) [inherited]

Writes bytes to the stream.

Parameters:
pData source buffer containing bytes to be written
cbRequested number of bytes to write

Definition at line 35 of file ByteOutputStream.cpp.

References ByteStream::cbOffset, ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.

Referenced by FtrsTableWriter::execute().

00036 {
00037     cbOffset += cb;
00038     if (!cbWritable) {
00039         flushBuffer(1);
00040     }
00041     for (;;) {
00042         assert(cbWritable);
00043         if (cb <= cbWritable) {
00044             memcpy(pNextByte,pData,cb);
00045             cbWritable -= cb;
00046             pNextByte += cb;
00047             return;
00048         }
00049         memcpy(pNextByte,pData,cbWritable);
00050         pData = static_cast<char const *>(pData) + cbWritable;
00051         cb -= cbWritable;
00052         cbWritable = 0;
00053         flushBuffer(1);
00054     }
00055 }

PBuffer ByteOutputStream::getWritePointer ( uint  cbRequested,
uint pcbActual = NULL 
) [inline, inherited]

Copyless alternative for writing bytes to the stream.

Provides direct access to the stream's internal buffer, but doesn't move the stream position (see consumeWritePointer).

Parameters:
cbRequested number of contiguous bytes to access; if fewer bytes are currently available in the buffer, the buffer is flushed and a new buffer is returned
pcbActual if non-NULL, receives actual number of contiguous writable bytes, which will always be greater than or equal to cbRequested
Returns:
pointer to cbActual bytes of writable buffer space

Definition at line 141 of file ByteOutputStream.h.

References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.

Referenced by BTreeWriter::deleteCurrent(), BTreeWriter::insertTupleFromBuffer(), and SegOutputStream::SegOutputStream().

00143 {
00144     if (cbWritable < cbRequested) {
00145         flushBuffer(cbRequested);
00146         assert(cbWritable >= cbRequested);
00147     }
00148     if (pcbActual) {
00149         *pcbActual = cbWritable;
00150     }
00151     return pNextByte;
00152 }

void ByteOutputStream::consumeWritePointer ( uint  cbUsed  )  [inline, inherited]

Advances stream position after a call to getWritePointer.

Parameters:
cbUsed number of bytes to advance; must be less than or equal to the value of cbActual returned by the last call to getWritePointer

Definition at line 154 of file ByteOutputStream.h.

References ByteStream::cbOffset, ByteOutputStream::cbWritable, and ByteOutputStream::pNextByte.

Referenced by BTreeWriter::deleteCurrent(), and BTreeWriter::insertTupleFromBuffer().

00155 {
00156     assert(cbUsed <= cbWritable);
00157     cbWritable -= cbUsed;
00158     pNextByte += cbUsed;
00159     cbOffset += cbUsed;
00160 }

void ByteOutputStream::hardPageBreak (  )  [inherited]

Marks the current buffer as complete regardless of how much data it contains.

The exact semantics are dependent on the buffering implementation.

Definition at line 62 of file ByteOutputStream.cpp.

References ByteOutputStream::cbWritable, ByteOutputStream::flushBuffer(), and ByteOutputStream::pNextByte.

00063 {
00064     flushBuffer(0);
00065     cbWritable = 0;
00066     pNextByte = NULL;
00067 }

template<class T>
void ByteOutputStream::writeValue ( T const &  value  )  [inline, inherited]

Writes a fixed-size type to the stream.

Parameters:
value value to read; type must be memcpy-safe

Definition at line 135 of file ByteOutputStream.h.

Referenced by FtrsTableWriter::describeIndex(), LogicalTxnTest::describeParticipant(), FtrsTableWriter::describeParticipant(), BTreeWriter::describeParticipant(), DatabaseTest::executeIncrementAction(), TupleProjection::writePersistent(), and TupleDescriptor::writePersistent().

00136     {
00137         writeBytes(&value,sizeof(value));
00138     }

FileSize ByteStream::getOffset (  )  const [inline, inherited]

Returns:
current offset from beginning of stream

Definition at line 110 of file ByteStream.h.

References ByteStream::cbOffset.

Referenced by getInputStream(), and ByteInputStream::mark().

00111 {
00112     return cbOffset;
00113 }

bool ClosableObject::isClosed (  )  const [inline, inherited]

Returns:
whether the object has been closed

Definition at line 58 of file ClosableObject.h.

00059     {
00060         return !needsClose;
00061     }

void ClosableObject::close (  )  [inherited]

Closes this object, releasing any unallocated resources.

Reimplemented in CollectExecStream, CorrelationJoinExecStream, LcsClusterAppendExecStream, and LcsClusterReplaceExecStream.

Definition at line 39 of file ClosableObject.cpp.

References ClosableObject::closeImpl(), and ClosableObject::needsClose.

Referenced by CacheImpl< PageT, VictimPolicyT >::allocatePages(), LcsRowScanBaseExecStream::closeImpl(), ExecStreamGraphImpl::closeImpl(), FlatFileBuffer::open(), ClosableObjectDestructor::operator()(), and Segment::~Segment().

00040 {
00041     if (!needsClose) {
00042         return;
00043     }
00044     needsClose = false;
00045     closeImpl();
00046 }


Member Data Documentation

SharedSegmentFactory SpillOutputStream::pSegmentFactory [private]

Factory to use for creating spill segment if necessary.

Definition at line 46 of file SpillOutputStream.h.

Referenced by spill(), and SpillOutputStream().

SharedCacheAccessor SpillOutputStream::pCacheAccessor [private]

CacheAccessor to use for accessing spill segment if necessary.

Definition at line 51 of file SpillOutputStream.h.

Referenced by getInputStream(), spill(), and SpillOutputStream().

SharedSegOutputStream SpillOutputStream::pSegOutputStream [private]

Spill segment output stream.

Definition at line 56 of file SpillOutputStream.h.

Referenced by closeImpl(), flushBuffer(), getInputStream(), getSegment(), getSegOutputStream(), setWriteLatency(), spill(), and updatePage().

uint SpillOutputStream::cbBuffer [private]

Total number of bytes locked in current page.

Definition at line 61 of file SpillOutputStream.h.

Referenced by closeImpl(), flushBuffer(), spill(), SpillOutputStream(), and updatePage().

SegmentAccessor SpillOutputStream::scratchAccessor [private]

Accessor for scratch segment.

Definition at line 66 of file SpillOutputStream.h.

Referenced by SpillOutputStream().

SegPageLock SpillOutputStream::scratchPageLock [private]

Page lock on scratch page for short streams.

Definition at line 71 of file SpillOutputStream.h.

Referenced by closeImpl(), flushBuffer(), getInputStream(), spill(), and SpillOutputStream().

std::string SpillOutputStream::spillFileName [private]

Filename to assign to spill segment.

Definition at line 76 of file SpillOutputStream.h.

Referenced by spill().

WriteLatency ByteOutputStream::writeLatency [protected, inherited]

Current write latency mode.

Definition at line 51 of file ByteOutputStream.h.

Referenced by SegOutputStream::flushBuffer(), SegOutputStream::SegOutputStream(), ByteOutputStream::setWriteLatency(), and spill().

FileSize ByteStream::cbOffset [protected, inherited]

Byte position in stream.

Definition at line 41 of file ByteStream.h.

Referenced by ByteStream::ByteStream(), ByteInputStream::consumeReadPointer(), ByteOutputStream::consumeWritePointer(), ByteStream::getOffset(), SegOutputStream::getSegPos(), SegInputStream::getSegPos(), ByteInputStream::readBytes(), ByteInputStream::reset(), ByteArrayInputStream::resetArray(), ByteInputStream::seekBackward(), SegInputStream::seekSegPos(), and ByteOutputStream::writeBytes().

bool ClosableObject::needsClose [protected, inherited]

Definition at line 44 of file ClosableObject.h.

Referenced by SegStreamAllocation::beginWrite(), ExecStreamGraphImpl::clear(), ClosableObject::ClosableObject(), ClosableObject::close(), FlatFileBuffer::open(), ExecStreamGraphImpl::open(), ExecStream::open(), and ClosableObject::~ClosableObject().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:47 2009 for Fennel by  doxygen 1.5.1