00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/lucidera/bitmap/LbmChopperExecStream.h"
00025
00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $");
00027
00028 LbmChopperExecStream::LbmChopperExecStream()
00029 {
00030 ridLimitParamId = DynamicParamId(0);
00031 }
00032
00033 void LbmChopperExecStream::prepare(LbmChopperExecStreamParams const ¶ms)
00034 {
00035 ConfluenceExecStream::prepare(params);
00036
00037
00038 ridLimitParamId = params.ridLimitParamId;
00039 assert(opaqueToInt(ridLimitParamId) > 0);
00040
00041
00042 inputTuple.compute(inAccessors[0]->getTupleDesc());
00043
00044
00045 SegmentAccessor scratchAccessor = params.scratchAccessor;
00046 writerPageLock.accessSegment(scratchAccessor);
00047 pageSize = scratchAccessor.pSegment->getUsablePageSize();
00048 }
00049
00050 void LbmChopperExecStream::getResourceRequirements(
00051 ExecStreamResourceQuantity &minQuantity,
00052 ExecStreamResourceQuantity &optQuantity)
00053 {
00054 ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00055
00056
00057 minQuantity.nCachePages += 1;
00058 optQuantity.nCachePages += 1;
00059 }
00060
00061 void LbmChopperExecStream::open(bool restart)
00062 {
00063 ConfluenceExecStream::open(restart);
00064
00065 if (!restart) {
00066 uint bitmapColSize = pOutAccessor->getTupleDesc()[1].cbStorage;
00067 uint writerBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00068 writerPageLock.allocatePage();
00069 PBuffer writerBuf = writerPageLock.getPage().getWritableData();
00070 segmentWriter.init(
00071 writerBuf, writerBufSize, pOutAccessor->getTupleDesc(), false);
00072 } else {
00073 segmentWriter.reset();
00074 }
00075
00076 state = LBM_STATE_READ;
00077 writePending = false;
00078 producePending = false;
00079 segmentReader.init(inAccessors[0], inputTuple);
00080 }
00081
00082 ExecStreamResult LbmChopperExecStream::execute(
00083 ExecStreamQuantum const &quantum)
00084 {
00085 ridLimit = *reinterpret_cast<RecordNum const *>(
00086 pDynamicParamManager->getParam(ridLimitParamId).getDatum().pData);
00087
00088 uint nTuples = 0;
00089 ExecStreamResult status;
00090 while (nTuples < quantum.nTuplesMax) {
00091 switch (state) {
00092 case LBM_STATE_READ:
00093 status = readSegment();
00094 if (status == EXECRC_EOS) {
00095
00096 if (! segmentWriter.isEmpty()) {
00097 producePending = true;
00098 state = LBM_STATE_PRODUCE;
00099 continue;
00100 }
00101 state = LBM_STATE_DONE;
00102 continue;
00103 }
00104 if (status != EXECRC_YIELD) {
00105 return status;
00106 }
00107 state = LBM_STATE_WRITE;
00108 continue;
00109 case LBM_STATE_WRITE:
00110 if (! writeSegment()) {
00111 producePending = true;
00112 state = LBM_STATE_PRODUCE;
00113 continue;
00114 }
00115 nTuples++;
00116 state = LBM_STATE_READ;
00117 continue;
00118 case LBM_STATE_PRODUCE:
00119 if (! produceTuple()) {
00120 return EXECRC_BUF_OVERFLOW;
00121 }
00122 state = writePending ? LBM_STATE_WRITE : LBM_STATE_READ;
00123 continue;
00124 case LBM_STATE_DONE:
00125 pOutAccessor->markEOS();
00126 return EXECRC_EOS;
00127 default:
00128 assert(false);
00129 }
00130 }
00131 return EXECRC_QUANTUM_EXPIRED;
00132 }
00133
00134 void LbmChopperExecStream::closeImpl()
00135 {
00136 ConfluenceExecStream::closeImpl();
00137 }
00138
00139 ExecStreamResult LbmChopperExecStream::readSegment()
00140 {
00141 if (writePending) {
00142 return EXECRC_YIELD;
00143 }
00144 ExecStreamResult status = segmentReader.readSegmentAndAdvance(
00145 inputSegment.byteNum, inputSegment.byteSeg, inputSegment.len);
00146 if (status == EXECRC_YIELD) {
00147 writePending = true;
00148 }
00149 return status;
00150 }
00151
00152 bool LbmChopperExecStream::writeSegment()
00153 {
00154 assert(writePending = true);
00155 LcsRid startRid = inputSegment.getSrid();
00156 LcsRid endRid = inputSegment.getEndRid();
00157 assert(opaqueToInt(endRid - startRid) <= ridLimit);
00158
00159
00160
00161
00162 bool firstWrite = segmentWriter.isEmpty();
00163 if (! firstWrite) {
00164 if (startRid < currentEndRid) {
00165 return false;
00166 }
00167 if (opaqueToInt(endRid - currentSrid) > ridLimit) {
00168 return false;
00169 }
00170 }
00171
00172
00173 PBuffer byteSeg = inputSegment.byteSeg - (inputSegment.len - 1);
00174 if (segmentWriter.addSegment(
00175 startRid,
00176 byteSeg,
00177 inputSegment.len))
00178 {
00179 writePending = false;
00180 if (firstWrite) {
00181 currentSrid = startRid;
00182 }
00183 currentEndRid = endRid;
00184 return true;
00185 }
00186 return false;
00187 }
00188
00189 bool LbmChopperExecStream::produceTuple()
00190 {
00191 assert(producePending);
00192
00193 TupleData outputTuple = segmentWriter.produceSegmentTuple();
00194 if (pOutAccessor->produceTuple(outputTuple)) {
00195 segmentWriter.reset();
00196 producePending = false;
00197 return true;
00198 }
00199 return false;
00200 }
00201
00202 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $");
00203
00204