00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/lucidera/bitmap/LbmUnionExecStream.h"
00025
00026 #include <math.h>
00027
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $");
00029
00030 void LbmUnionExecStream::prepare(LbmUnionExecStreamParams const ¶ms)
00031 {
00032 ConfluenceExecStream::prepare(params);
00033 maxRid = params.maxRid;
00034
00035
00036 ridLimitParamId = params.ridLimitParamId;
00037 assert(opaqueToInt(ridLimitParamId) > 0);
00038
00039
00040 startRidParamId = params.startRidParamId;
00041 segmentLimitParamId = params.segmentLimitParamId;
00042
00043
00044 ridLimitDatum.pData = (PConstBuffer) &ridLimit;
00045 ridLimitDatum.cbData = sizeof(ridLimit);
00046
00047 assert(inAccessors[0]->getTupleDesc() == pOutAccessor->getTupleDesc());
00048
00049
00050 inputTuple.compute(inAccessors[0]->getTupleDesc());
00051
00052
00053 scratchAccessor = params.scratchAccessor;
00054 workspacePageLock.accessSegment(scratchAccessor);
00055 writerPageLock.accessSegment(scratchAccessor);
00056 pageSize = scratchAccessor.pSegment->getUsablePageSize();
00057 }
00058
00059 void LbmUnionExecStream::getResourceRequirements(
00060 ExecStreamResourceQuantity &minQuantity,
00061 ExecStreamResourceQuantity &optQuantity,
00062 ExecStreamResourceSettingType &optType)
00063 {
00064 ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00065
00066
00067
00068
00069 minQuantity.nCachePages += 2;
00070 optQuantity.nCachePages += 2 + computeOptWorkspacePages(maxRid) + 1;
00071 optType = EXEC_RESOURCE_ESTIMATE;
00072 }
00073
00074 void LbmUnionExecStream::setResourceAllocation(
00075 ExecStreamResourceQuantity &quantity)
00076 {
00077 ConfluenceExecStream::setResourceAllocation(quantity);
00078
00079
00080 nWorkspacePages = quantity.nCachePages - 1;
00081 ridLimit = computeRidLimit(nWorkspacePages);
00082 }
00083
00084 void LbmUnionExecStream::open(bool restart)
00085 {
00086 ConfluenceExecStream::open(restart);
00087
00088 if (!restart) {
00089 uint bitmapColSize = pOutAccessor->getTupleDesc()[1].cbStorage;
00090 uint writerBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00091 writerPageLock.allocatePage();
00092 PBuffer writerBuf = writerPageLock.getPage().getWritableData();
00093 segmentWriter.init(
00094 writerBuf, writerBufSize, pOutAccessor->getTupleDesc(), false);
00095
00096 reverseArea = writerBuf + writerBufSize;
00097 reverseAreaSize =
00098 scratchAccessor.pSegment->getUsablePageSize() - writerBufSize;
00099
00100
00101 boost::shared_array<PBuffer> ppBuffers(new PBuffer[nWorkspacePages]);
00102 assert(ppBuffers != NULL);
00103 for (uint i = 0; i < nWorkspacePages; i++) {
00104 workspacePageLock.allocatePage();
00105 ppBuffers[i] = workspacePageLock.getPage().getWritableData();
00106 workspacePageLock.unlock();
00107 }
00108 ByteBuffer *pBuffer = new ByteBuffer();
00109 pBuffer->init(ppBuffers, nWorkspacePages, pageSize);
00110 SharedByteBuffer pWorkspaceBuffer(pBuffer);
00111 uint maxSegmentSize = LbmEntry::getMaxBitmapSize(bitmapColSize);
00112 workspace.init(pWorkspaceBuffer, maxSegmentSize);
00113
00114
00115 pDynamicParamManager->createParam(
00116 ridLimitParamId, pOutAccessor->getTupleDesc()[0]);
00117 pDynamicParamManager->writeParam(ridLimitParamId, ridLimitDatum);
00118 } else {
00119 workspace.reset();
00120 segmentWriter.reset();
00121 }
00122
00123 writePending = false;
00124 producePending = false;
00125 isDone = false;
00126 segmentReader.init(inAccessors[0], inputTuple);
00127 }
00128
00129 ExecStreamResult LbmUnionExecStream::execute(
00130 ExecStreamQuantum const &quantum)
00131 {
00132 if (isDone) {
00133 pOutAccessor->markEOS();
00134 return EXECRC_EOS;
00135 }
00136
00137 if (isConsumerSridSet()) {
00138
00139 requestedSrid = (LcsRid) *reinterpret_cast<RecordNum const *>(
00140 pDynamicParamManager->getParam(startRidParamId).getDatum().pData);
00141 workspace.advanceToSrid(requestedSrid);
00142 }
00143 if (isSegmentLimitSet()) {
00144 segmentsRemaining = *reinterpret_cast<uint const *>(
00145 pDynamicParamManager->getParam(segmentLimitParamId)
00146 .getDatum().pData);
00147 }
00148
00149 for (uint i = 0; i < quantum.nTuplesMax; i++) {
00150 while (! producePending) {
00151
00152 if (isSegmentLimitSet() && segmentsRemaining == 0) {
00153 return EXECRC_QUANTUM_EXPIRED;
00154 }
00155
00156 ExecStreamResult status = readSegment();
00157 if (status == EXECRC_EOS) {
00158
00159 isDone = workspace.isEmpty() && segmentWriter.isEmpty();
00160 if (! isDone) {
00161 transferLast();
00162 producePending = true;
00163 break;
00164 }
00165 return EXECRC_EOS;
00166 }
00167 if (status != EXECRC_YIELD) {
00168 return status;
00169 }
00170 if (! writeSegment()) {
00171 producePending = (! segmentWriter.isEmpty());
00172 }
00173 }
00174
00175 if (! produceTuple()) {
00176 return EXECRC_BUF_OVERFLOW;
00177 }
00178 producePending = false;
00179 }
00180 return EXECRC_QUANTUM_EXPIRED;
00181 }
00182
00183 void LbmUnionExecStream::closeImpl()
00184 {
00185 ConfluenceExecStream::closeImpl();
00186
00187 if (scratchAccessor.pSegment) {
00188 scratchAccessor.pSegment->deallocatePageRange(
00189 NULL_PAGE_ID, NULL_PAGE_ID);
00190 }
00191 }
00192
00193 uint LbmUnionExecStream::computeOptWorkspacePages(LcsRid maxRid)
00194 {
00195
00196 return 2;
00197 }
00198
00199 uint LbmUnionExecStream::computeRidLimit(uint nWorkspacePages)
00200 {
00201
00202
00203
00204
00205 uint bytes = (uint) ((nWorkspacePages - 0.25) * pageSize);
00206 return bytes * LbmSegment::LbmOneByteSize;
00207 }
00208
00209 bool LbmUnionExecStream::isConsumerSridSet()
00210 {
00211 return (opaqueToInt(startRidParamId) > 0);
00212 }
00213
00214 bool LbmUnionExecStream::isSegmentLimitSet()
00215 {
00216 return (opaqueToInt(segmentLimitParamId) > 0);
00217 }
00218
00219 ExecStreamResult LbmUnionExecStream::readSegment()
00220 {
00221 if (writePending) {
00222 return EXECRC_YIELD;
00223 }
00224 ExecStreamResult status = segmentReader.readSegmentAndAdvance(
00225 inputSegment.byteNum, inputSegment.byteSeg, inputSegment.len);
00226 if (status == EXECRC_YIELD) {
00227 writePending = true;
00228 }
00229 return status;
00230 }
00231
00232 bool LbmUnionExecStream::writeSegment()
00233 {
00234 assert(writePending = true);
00235
00236
00237 LcsRid currentSrid = segmentReader.getSrid();
00238 workspace.setProductionLimit(currentSrid);
00239 if (!transfer()) {
00240 return false;
00241 }
00242 if (workspace.isEmpty()) {
00243 workspace.advanceToSrid(currentSrid);
00244 }
00245
00246
00247 bool success = workspace.addSegment(inputSegment);
00248 assert(success);
00249 writePending = false;
00250 return true;
00251 }
00252
00253 void LbmUnionExecStream::transferLast()
00254 {
00255 workspace.removeLimit();
00256 transfer();
00257 }
00258
00259 bool LbmUnionExecStream::transfer()
00260 {
00261 while (workspace.canProduce()) {
00262 if (isSegmentLimitSet() && segmentsRemaining == 0) {
00263 return false;
00264 }
00265
00266 LbmByteSegment seg = workspace.getSegment();
00267 assert(seg.len < reverseAreaSize);
00268 PBuffer reverseStart = reverseArea + seg.len - 1;
00269 for (int i = 0; i < seg.len; i++) {
00270 reverseStart[-i] = seg.byteSeg[i];
00271 }
00272 LcsRid startRid = seg.getSrid();
00273 if (! segmentWriter.addSegment(startRid, reverseArea, seg.len)) {
00274 return false;
00275 }
00276 workspace.advancePastSegment();
00277
00278 if (isSegmentLimitSet()) {
00279 segmentsRemaining--;
00280 }
00281 }
00282 return true;
00283 }
00284
00285 bool LbmUnionExecStream::produceTuple()
00286 {
00287 assert(producePending);
00288 assert(! segmentWriter.isEmpty());
00289
00290 outputTuple = segmentWriter.produceSegmentTuple();
00291 if (pOutAccessor->produceTuple(outputTuple)) {
00292 segmentWriter.reset();
00293 producePending = false;
00294 return true;
00295 }
00296 return false;
00297 }
00298
00299 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $");
00300
00301