00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/lucidera/bitmap/LbmBitOpExecStream.h"
00025
00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $");
00027
00028 void LbmBitOpExecStream::prepare(LbmBitOpExecStreamParams const ¶ms)
00029 {
00030 ConfluenceExecStream::prepare(params);
00031
00032
00033 rowLimitParamId = params.rowLimitParamId;
00034 startRidParamId = params.startRidParamId;
00035
00036
00037 rowLimitDatum.pData = (PConstBuffer) &rowLimit;
00038 rowLimitDatum.cbData = sizeof(rowLimit);
00039 startRidDatum.pData = (PConstBuffer) &startRid;
00040 startRidDatum.cbData = sizeof(startRid);
00041
00042
00043 nInputs = inAccessors.size();
00044 segmentReaders.reset(new LbmSegmentReader[nInputs]);
00045 bitmapSegTuples.reset(new TupleData[nInputs]);
00046 for (uint i = 0; i < nInputs; i++) {
00047 bitmapSegTuples[i].compute(inAccessors[i]->getTupleDesc());
00048 }
00049
00050 nFields = inAccessors[0]->getTupleDesc().size() - 3;
00051 }
00052
00053 void LbmBitOpExecStream::open(bool restart)
00054 {
00055 ConfluenceExecStream::open(restart);
00056
00057 uint nKeys = pOutAccessor->getTupleDesc().size() - 3;
00058
00059 if (!restart) {
00060
00061
00062 if (opaqueToInt(rowLimitParamId) > 0) {
00063 pDynamicParamManager->createParam(
00064 rowLimitParamId, pOutAccessor->getTupleDesc()[nKeys]);
00065 }
00066 if (opaqueToInt(startRidParamId) > 0) {
00067 pDynamicParamManager->createParam(
00068 startRidParamId, pOutAccessor->getTupleDesc()[nKeys]);
00069 }
00070 }
00071
00072
00073
00074
00075 if (!outputBuf) {
00076
00077
00078 uint bitmapColSize = pOutAccessor->getTupleDesc()[nKeys + 1].cbStorage;
00079 uint outputBufSize = LbmEntry::getScratchBufferSize(bitmapColSize);
00080 outputBuf.reset(new FixedBuffer[outputBufSize]);
00081
00082
00083
00084 segmentWriter.init(
00085 outputBuf.get(), outputBufSize,
00086 inAccessors[1]->getTupleDesc(), true);
00087
00088
00089
00090 bitmapBufSize = LbmEntry::getMaxBitmapSize(bitmapColSize);
00091 byteSegBuf.reset(new FixedBuffer[bitmapBufSize]);
00092 pByteSegBuf = byteSegBuf.get();
00093
00094 } else {
00095 segmentWriter.reset();
00096 }
00097
00098 startRid = LcsRid(0);
00099 rowLimit = 1;
00100 producePending = false;
00101 writeStartRidParamValue();
00102 if (opaqueToInt(rowLimitParamId) > 0) {
00103 pDynamicParamManager->writeParam(rowLimitParamId, rowLimitDatum);
00104 }
00105 for (uint i = 0; i < nInputs; i++) {
00106 segmentReaders[i].init(inAccessors[i], bitmapSegTuples[i]);
00107 }
00108 }
00109
00110 ExecStreamResult LbmBitOpExecStream::producePendingOutput(uint iInput)
00111 {
00112 if (!produceTuple(outputTuple)) {
00113 return EXECRC_BUF_OVERFLOW;
00114 }
00115
00116
00117 if (!segmentWriter.isEmpty()) {
00118 segmentWriter.reset();
00119 if (!addSegments()) {
00120 return EXECRC_BUF_OVERFLOW;
00121 }
00122 }
00123 producePending = false;
00124 if (inAccessors[iInput]->getState() == EXECBUF_EOS) {
00125 pOutAccessor->markEOS();
00126 return EXECRC_EOS;
00127 }
00128
00129 return EXECRC_YIELD;
00130 }
00131
00132 ExecStreamResult LbmBitOpExecStream::readInput(
00133 uint iInput, LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
00134 {
00135 ExecStreamResult rc = segmentReaders[iInput].advanceToRid(startRid);
00136
00137 if (rc == EXECRC_EOS) {
00138
00139 if (! flush()) {
00140 return EXECRC_BUF_OVERFLOW;
00141 }
00142 pOutAccessor->markEOS();
00143 return EXECRC_EOS;
00144 } else if (rc != EXECRC_YIELD) {
00145 return rc;
00146 }
00147
00148 segmentReaders[iInput].readCurrentByteSegment(
00149 currRid, currByteSeg, currLen);
00150
00151
00152 assert(currLen <= bitmapBufSize);
00153
00154 return EXECRC_YIELD;
00155 }
00156
00157 bool LbmBitOpExecStream::flush()
00158 {
00159 assert (!producePending);
00160
00161 if (!segmentWriter.isEmpty()) {
00162 outputTuple = segmentWriter.produceSegmentTuple();
00163 segmentWriter.reset();
00164 if (!produceTuple(outputTuple)) {
00165 producePending = true;
00166 }
00167 }
00168 return !producePending;
00169 }
00170
00171 bool LbmBitOpExecStream::addSegments()
00172 {
00173 while (addLen > 0) {
00174 if (segmentWriter.addSegment(addRid, addByteSeg, addLen)) {
00175 break;
00176 }
00177
00178 outputTuple = segmentWriter.produceSegmentTuple();
00179 if (!produceTuple(outputTuple)) {
00180 producePending = true;
00181 return false;
00182 }
00183
00184
00185
00186 segmentWriter.reset();
00187 }
00188
00189 return true;
00190 }
00191
00192 bool LbmBitOpExecStream::produceTuple(TupleData bitmapTuple)
00193 {
00194 assert(pOutAccessor->getTupleDesc().size() == bitmapTuple.size());
00195 return pOutAccessor->produceTuple(bitmapTuple);
00196 }
00197
00198 void LbmBitOpExecStream::closeImpl()
00199 {
00200 ConfluenceExecStream::closeImpl();
00201 outputBuf.reset();
00202 byteSegBuf.reset();
00203 }
00204
00205 void LbmBitOpExecStream::writeStartRidParamValue()
00206 {
00207 if (opaqueToInt(startRidParamId) > 0) {
00208 pDynamicParamManager->writeParam(startRidParamId, startRidDatum);
00209 }
00210 }
00211
00212 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $");
00213
00214