00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/SpillOutputStream.h"
00026 #include "fennel/segment/SegOutputStream.h"
00027 #include "fennel/segment/SegInputStream.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029 #include "fennel/common/ByteArrayInputStream.h"
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $");
00032 
00033 
00034 
00035 SharedSpillOutputStream SpillOutputStream::newSpillOutputStream(
00036     SharedSegmentFactory pSegmentFactory,
00037     SharedCacheAccessor pCacheAccessor,
00038     std::string spillFileName)
00039 {
00040     return SharedSpillOutputStream(
00041         new SpillOutputStream(pSegmentFactory,pCacheAccessor,spillFileName),
00042         ClosableObjectDestructor());
00043 }
00044 
00045 SpillOutputStream::SpillOutputStream(
00046     SharedSegmentFactory pSegmentFactoryInit,
00047     SharedCacheAccessor pCacheAccessorInit,
00048     std::string spillFileNameInit)
00049     : pSegmentFactory(pSegmentFactoryInit),
00050       pCacheAccessor(pCacheAccessorInit),
00051       spillFileName(spillFileNameInit)
00052 {
00053     
00054     
00055     scratchAccessor = pSegmentFactory->newScratchSegment(
00056         pCacheAccessor->getCache(),
00057         1);
00058     scratchPageLock.accessSegment(scratchAccessor);
00059     scratchPageLock.allocatePage();
00060     cbBuffer = scratchAccessor.pSegment->getUsablePageSize();
00061     setBuffer(
00062         scratchPageLock.getPage().getWritableData(),
00063         cbBuffer);
00064 }
00065 
00066 SpillOutputStream::~SpillOutputStream()
00067 {
00068 }
00069 
00070 void SpillOutputStream::flushBuffer(uint cbRequested)
00071 {
00072     if (scratchPageLock.isLocked()) {
00073         assert(!pSegOutputStream);
00074         
00075         spill();
00076     } else {
00077         assert(pSegOutputStream);
00078         assert(!scratchPageLock.isLocked());
00079         
00080         assert(cbBuffer >= getBytesAvailable());
00081         pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00082     }
00083     assert(pSegOutputStream);
00084     if (cbRequested) {
00085         PBuffer pBuffer =
00086             pSegOutputStream->getWritePointer(cbRequested,&cbBuffer);
00087         setBuffer(pBuffer,cbBuffer);
00088     } else {
00089         pSegOutputStream->hardPageBreak();
00090         cbBuffer = 0;
00091     }
00092 }
00093 
00094 void SpillOutputStream::closeImpl()
00095 {
00096     if (scratchPageLock.isLocked()) {
00097         
00098         scratchPageLock.unlock();
00099     } else {
00100         assert(pSegOutputStream);
00101         assert(!scratchPageLock.isLocked());
00102         
00103         pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00104         cbBuffer = 0;
00105         pSegOutputStream.reset();
00106     }
00107 }
00108 
00109 void SpillOutputStream::setWriteLatency(WriteLatency writeLatency)
00110 {
00111     ByteOutputStream::setWriteLatency(writeLatency);
00112     if (pSegOutputStream) {
00113         pSegOutputStream->setWriteLatency(writeLatency);
00114     }
00115 }
00116 
00117 
00118 
00119 
00120 
00121 
00122 
00123 void SpillOutputStream::spill()
00124 {
00125     DeviceMode devMode = DeviceMode::createNew;
00126     
00127     
00128     devMode.direct = true;
00129     SharedSegment pLongLogSegment =
00130         pSegmentFactory->newTempDeviceSegment(
00131             scratchPageLock.getCacheAccessor()->getCache(),
00132             devMode,
00133             spillFileName);
00134     SegmentAccessor segmentAccessor(pLongLogSegment,pCacheAccessor);
00135     pSegOutputStream = SegOutputStream::newSegOutputStream(segmentAccessor);
00136     pSegOutputStream->setWriteLatency(writeLatency);
00137     pSegOutputStream->writeBytes(
00138         scratchPageLock.getPage().getReadableData(),
00139         cbBuffer - getBytesAvailable());
00140     scratchPageLock.unlock();
00141 }
00142 
00143 SharedByteInputStream SpillOutputStream::getInputStream(
00144     SeekPosition seekPosition)
00145 {
00146     if (scratchPageLock.isLocked()) {
00147         SharedByteInputStream pInputStream =
00148             ByteArrayInputStream::newByteArrayInputStream(
00149                 scratchPageLock.getPage().getReadableData(),
00150                 getOffset());
00151         if (seekPosition == SEEK_STREAM_END) {
00152             pInputStream->seekForward(getOffset());
00153         }
00154         return pInputStream;
00155     } else {
00156         assert(pSegOutputStream);
00157         updatePage();
00158         SharedSegment pSegment = pSegOutputStream->getSegment();
00159         SegStreamPosition endPos;
00160         if (seekPosition == SEEK_STREAM_END) {
00161             pSegOutputStream->getSegPos(endPos);
00162         }
00163         SegmentAccessor segmentAccessor(pSegment,pCacheAccessor);
00164         SharedSegInputStream pInputStream =
00165             SegInputStream::newSegInputStream(segmentAccessor);
00166         if (seekPosition == SEEK_STREAM_END) {
00167             pInputStream->seekSegPos(endPos);
00168         }
00169         return pInputStream;
00170     }
00171 }
00172 
00173 void SpillOutputStream::updatePage()
00174 {
00175     if (!cbBuffer) {
00176         return;
00177     }
00178     assert(cbBuffer > getBytesAvailable());
00179     uint cbConsumed = cbBuffer - getBytesAvailable();
00180     pSegOutputStream->consumeWritePointer(cbConsumed);
00181     cbBuffer -= cbConsumed;
00182     pSegOutputStream->updatePage();
00183 }
00184 
00185 SharedSegment SpillOutputStream::getSegment()
00186 {
00187     if (pSegOutputStream) {
00188         return pSegOutputStream->getSegment();
00189     } else {
00190         return SharedSegment();
00191     }
00192 }
00193 
00194 SharedSegOutputStream SpillOutputStream::getSegOutputStream()
00195 {
00196     return pSegOutputStream;
00197 }
00198 
00199 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $");
00200 
00201