00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/SpillOutputStream.h"
00026 #include "fennel/segment/SegOutputStream.h"
00027 #include "fennel/segment/SegInputStream.h"
00028 #include "fennel/segment/SegmentFactory.h"
00029 #include "fennel/common/ByteArrayInputStream.h"
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $");
00032
00033
00034
00035 SharedSpillOutputStream SpillOutputStream::newSpillOutputStream(
00036 SharedSegmentFactory pSegmentFactory,
00037 SharedCacheAccessor pCacheAccessor,
00038 std::string spillFileName)
00039 {
00040 return SharedSpillOutputStream(
00041 new SpillOutputStream(pSegmentFactory,pCacheAccessor,spillFileName),
00042 ClosableObjectDestructor());
00043 }
00044
00045 SpillOutputStream::SpillOutputStream(
00046 SharedSegmentFactory pSegmentFactoryInit,
00047 SharedCacheAccessor pCacheAccessorInit,
00048 std::string spillFileNameInit)
00049 : pSegmentFactory(pSegmentFactoryInit),
00050 pCacheAccessor(pCacheAccessorInit),
00051 spillFileName(spillFileNameInit)
00052 {
00053
00054
00055 scratchAccessor = pSegmentFactory->newScratchSegment(
00056 pCacheAccessor->getCache(),
00057 1);
00058 scratchPageLock.accessSegment(scratchAccessor);
00059 scratchPageLock.allocatePage();
00060 cbBuffer = scratchAccessor.pSegment->getUsablePageSize();
00061 setBuffer(
00062 scratchPageLock.getPage().getWritableData(),
00063 cbBuffer);
00064 }
00065
00066 SpillOutputStream::~SpillOutputStream()
00067 {
00068 }
00069
00070 void SpillOutputStream::flushBuffer(uint cbRequested)
00071 {
00072 if (scratchPageLock.isLocked()) {
00073 assert(!pSegOutputStream);
00074
00075 spill();
00076 } else {
00077 assert(pSegOutputStream);
00078 assert(!scratchPageLock.isLocked());
00079
00080 assert(cbBuffer >= getBytesAvailable());
00081 pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00082 }
00083 assert(pSegOutputStream);
00084 if (cbRequested) {
00085 PBuffer pBuffer =
00086 pSegOutputStream->getWritePointer(cbRequested,&cbBuffer);
00087 setBuffer(pBuffer,cbBuffer);
00088 } else {
00089 pSegOutputStream->hardPageBreak();
00090 cbBuffer = 0;
00091 }
00092 }
00093
00094 void SpillOutputStream::closeImpl()
00095 {
00096 if (scratchPageLock.isLocked()) {
00097
00098 scratchPageLock.unlock();
00099 } else {
00100 assert(pSegOutputStream);
00101 assert(!scratchPageLock.isLocked());
00102
00103 pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable());
00104 cbBuffer = 0;
00105 pSegOutputStream.reset();
00106 }
00107 }
00108
00109 void SpillOutputStream::setWriteLatency(WriteLatency writeLatency)
00110 {
00111 ByteOutputStream::setWriteLatency(writeLatency);
00112 if (pSegOutputStream) {
00113 pSegOutputStream->setWriteLatency(writeLatency);
00114 }
00115 }
00116
00117
00118
00119
00120
00121
00122
00123 void SpillOutputStream::spill()
00124 {
00125 DeviceMode devMode = DeviceMode::createNew;
00126
00127
00128 devMode.direct = true;
00129 SharedSegment pLongLogSegment =
00130 pSegmentFactory->newTempDeviceSegment(
00131 scratchPageLock.getCacheAccessor()->getCache(),
00132 devMode,
00133 spillFileName);
00134 SegmentAccessor segmentAccessor(pLongLogSegment,pCacheAccessor);
00135 pSegOutputStream = SegOutputStream::newSegOutputStream(segmentAccessor);
00136 pSegOutputStream->setWriteLatency(writeLatency);
00137 pSegOutputStream->writeBytes(
00138 scratchPageLock.getPage().getReadableData(),
00139 cbBuffer - getBytesAvailable());
00140 scratchPageLock.unlock();
00141 }
00142
00143 SharedByteInputStream SpillOutputStream::getInputStream(
00144 SeekPosition seekPosition)
00145 {
00146 if (scratchPageLock.isLocked()) {
00147 SharedByteInputStream pInputStream =
00148 ByteArrayInputStream::newByteArrayInputStream(
00149 scratchPageLock.getPage().getReadableData(),
00150 getOffset());
00151 if (seekPosition == SEEK_STREAM_END) {
00152 pInputStream->seekForward(getOffset());
00153 }
00154 return pInputStream;
00155 } else {
00156 assert(pSegOutputStream);
00157 updatePage();
00158 SharedSegment pSegment = pSegOutputStream->getSegment();
00159 SegStreamPosition endPos;
00160 if (seekPosition == SEEK_STREAM_END) {
00161 pSegOutputStream->getSegPos(endPos);
00162 }
00163 SegmentAccessor segmentAccessor(pSegment,pCacheAccessor);
00164 SharedSegInputStream pInputStream =
00165 SegInputStream::newSegInputStream(segmentAccessor);
00166 if (seekPosition == SEEK_STREAM_END) {
00167 pInputStream->seekSegPos(endPos);
00168 }
00169 return pInputStream;
00170 }
00171 }
00172
00173 void SpillOutputStream::updatePage()
00174 {
00175 if (!cbBuffer) {
00176 return;
00177 }
00178 assert(cbBuffer > getBytesAvailable());
00179 uint cbConsumed = cbBuffer - getBytesAvailable();
00180 pSegOutputStream->consumeWritePointer(cbConsumed);
00181 cbBuffer -= cbConsumed;
00182 pSegOutputStream->updatePage();
00183 }
00184
00185 SharedSegment SpillOutputStream::getSegment()
00186 {
00187 if (pSegOutputStream) {
00188 return pSegOutputStream->getSegment();
00189 } else {
00190 return SharedSegment();
00191 }
00192 }
00193
00194 SharedSegOutputStream SpillOutputStream::getSegOutputStream()
00195 {
00196 return pSegOutputStream;
00197 }
00198
00199 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $");
00200
00201