00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/exec/ExecStreamGraphImpl.h"
00025 #include "fennel/lucidera/bitmap/LbmMinusExecStream.h"
00026
00027 FENNEL_BEGIN_CPPFILE("$Id:");
00028
00029 void LbmMinusExecStream::prepare(LbmMinusExecStreamParams const ¶ms)
00030 {
00031 LbmBitOpExecStream::prepare(params);
00032
00033 if (nFields) {
00034
00035 prefixedBitmapTuple.compute(inAccessors[0]->getTupleDesc());
00036
00037
00038
00039
00040 TupleDescriptor prevTupleDesc;
00041 TupleDescriptor const &inputDesc = inAccessors[0]->getTupleDesc();
00042 for (int i = 0; i < nFields; i ++) {
00043 prevTupleDesc.push_back(inputDesc[i]);
00044 }
00045 prevTuple.computeAndAllocate(prevTupleDesc);
00046 }
00047 subtrahendBitmap.resize(0);
00048 }
00049
00050 void LbmMinusExecStream::open(bool restart)
00051 {
00052 LbmBitOpExecStream::open(restart);
00053 subtrahendsDone = false;
00054 needToRead = true;
00055 minSubtrahendRid = LcsRid(0);
00056 maxSubtrahendRid = LcsRid(0);
00057 baseRid = LcsRid(0);
00058 advancePending = false;
00059
00060 rowLimit = 0;
00061 inputType = UNKNOWN_INPUT;
00062 copyPrefixPending = false;
00063 prevTupleValid = false;
00064 minuendReader.init(inAccessors[0], bitmapSegTuples[0]);
00065
00066 if (nFields > 0) {
00067 subtrahendBitmap.resize(SUBTRAHEND_BITMAP_SIZE);
00068 }
00069 restartSubtrahends();
00070 }
00071
00072 ExecStreamResult LbmMinusExecStream::execute(ExecStreamQuantum const &quantum)
00073 {
00074 ExecStreamResult rc;
00075
00076
00077 if (inputType == UNKNOWN_INPUT) {
00078 rc = advanceSubtrahends(LcsRid(0));
00079 if (rc != EXECRC_YIELD) {
00080 return rc;
00081 }
00082 int dummy;
00083 rc = findMinInput(dummy);
00084 if (rc == EXECRC_EOS) {
00085 inputType = EMPTY_INPUT;
00086 if (nFields == 0) {
00087 subtrahendsDone = true;
00088 }
00089 } else {
00090 inputType = NONEMPTY_INPUT;
00091 }
00092 }
00093
00094 if (producePending) {
00095 rc = producePendingOutput(0);
00096 if (rc != EXECRC_YIELD) {
00097 return rc;
00098 }
00099 }
00100
00101 bool skipMinus = false;
00102 if (copyPrefixPending) {
00103 copyPrefix();
00104 copyPrefixPending = false;
00105 needToRead = false;
00106
00107
00108 skipMinus = checkNeedForRestart();
00109 }
00110
00111 for (uint i = 0; i < quantum.nTuplesMax; i++) {
00112
00113
00114 if (needToRead) {
00115 rc = readMinuendInputAndFlush(baseRid, baseByteSeg, baseLen);
00116 if (rc != EXECRC_YIELD) {
00117 return rc;
00118 }
00119
00120
00121 skipMinus = checkNeedForRestart();
00122 }
00123
00124
00125
00126
00127 if ((nFields == 0 && !subtrahendsDone) || !skipMinus) {
00128 if (advancePending) {
00129 rc =
00130 advanceSingleSubtrahend(
00131 advanceSubtrahendInputNo,
00132 advanceSubtrahendRid);
00133 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00134 return rc;
00135 }
00136 advancePending = false;
00137 } else {
00138 rc = advanceSubtrahends(baseRid);
00139 if (rc != EXECRC_YIELD) {
00140 return rc;
00141 }
00142 }
00143
00144 rc = minusSegments(baseRid, baseByteSeg, baseLen);
00145 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00146 return rc;
00147 }
00148 }
00149
00150
00151
00152 needToRead = true;
00153 startRid = baseRid + baseLen * LbmSegment::LbmOneByteSize;
00154 addRid = baseRid;
00155 addByteSeg = pByteSegBuf;
00156 addLen = baseLen;
00157 if (!addSegments()) {
00158 return EXECRC_BUF_OVERFLOW;
00159 }
00160
00161
00162 }
00163
00164 return EXECRC_QUANTUM_EXPIRED;
00165 }
00166
00167 ExecStreamResult LbmMinusExecStream::readMinuendInputAndFlush(
00168 LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
00169 {
00170 ExecStreamResult rc;
00171 bool unordered = false;
00172
00173
00174
00175 if (nFields == 0) {
00176 rc = readInput(0, currRid, currByteSeg, currLen);
00177 } else {
00178 rc = readMinuendInput(currRid, currByteSeg, currLen);
00179 if (currRid < startRid) {
00180 unordered = true;
00181 }
00182 }
00183 if (rc != EXECRC_YIELD) {
00184 return rc;
00185 }
00186
00187
00188 memcpy(pByteSegBuf, baseByteSeg - baseLen + 1, baseLen);
00189 needToRead = false;
00190
00191
00192
00193 startRid = baseRid;
00194 writeStartRidParamValue();
00195 iInput = 1;
00196
00197
00198 if (nFields == 0) {
00199 return rc;
00200 }
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218 if (prevTupleValid) {
00219 if (minuendReader.getTupleChange()) {
00220 minuendReader.resetChangeListener();
00221 int keyComp = comparePrefixes();
00222 if (keyComp != 0 || unordered) {
00223 needSubtrahendRestart = true;
00224 if (!flush()) {
00225 copyPrefixPending = true;
00226 return EXECRC_BUF_OVERFLOW;
00227 }
00228 copyPrefix();
00229 }
00230 }
00231 } else {
00232 prevTupleValid = true;
00233 copyPrefix();
00234 minuendReader.resetChangeListener();
00235 }
00236 return rc;
00237 }
00238
00239 ExecStreamResult LbmMinusExecStream::readMinuendInput(
00240 LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
00241 {
00242 LbmByteNumber byteNumber;
00243 ExecStreamResult rc = minuendReader.readSegmentAndAdvance(
00244 byteNumber, currByteSeg, currLen);
00245 currRid = byteNumberToRid(byteNumber);
00246 if (rc == EXECRC_EOS) {
00247
00248 if (! flush()) {
00249 return EXECRC_BUF_OVERFLOW;
00250 }
00251 pOutAccessor->markEOS();
00252 return EXECRC_EOS;
00253 } else if (rc != EXECRC_YIELD) {
00254 return rc;
00255 }
00256
00257
00258
00259 assert(currLen <= bitmapBufSize);
00260
00261 return EXECRC_YIELD;
00262 }
00263
00264 int LbmMinusExecStream::comparePrefixes()
00265 {
00266 int ret =
00267 (inAccessors[0]->getTupleDesc()).compareTuplesKey(
00268 prevTuple,
00269 bitmapSegTuples[0],
00270 nFields);
00271 return ret;
00272 }
00273
00274 void LbmMinusExecStream::restartSubtrahends()
00275 {
00276 minSubtrahendRid = LcsRid(0);
00277 advancePending = false;
00278 for (uint i = 1; i < nInputs; i++) {
00279 pGraph->getStreamInput(getStreamId(), i)->open(true);
00280 segmentReaders[i].init(
00281 inAccessors[i],
00282 bitmapSegTuples[i],
00283 (!subtrahendsDone && nFields > 0),
00284 &subtrahendBitmap);
00285 }
00286 iInput = 1;
00287 needSubtrahendRestart = false;
00288 }
00289
00290 void LbmMinusExecStream::copyPrefix()
00291 {
00292
00293
00294
00295
00296 prevTuple.resetBuffer();
00297
00298 for (int i = 0; i < nFields; i ++) {
00299 prevTuple[i].memCopyFrom(bitmapSegTuples[0][i]);
00300 }
00301 }
00302
00303 ExecStreamResult LbmMinusExecStream::advanceSingleSubtrahend(
00304 int inputNo,
00305 LcsRid rid)
00306 {
00307 ExecStreamResult rc = segmentReaders[inputNo].advanceToRid(rid);
00308 return rc;
00309 }
00310
00311 ExecStreamResult LbmMinusExecStream::advanceSubtrahends(LcsRid baseRid)
00312 {
00313
00314
00315 if (minSubtrahendRid > baseRid) {
00316 return EXECRC_YIELD;
00317 }
00318
00319
00320 for (; iInput < nInputs; iInput++) {
00321 ExecStreamResult rc = segmentReaders[iInput].advanceToRid(baseRid);
00322 if (rc == EXECRC_EOS) {
00323 continue;
00324 }
00325 if (rc != EXECRC_YIELD) {
00326 return rc;
00327 }
00328 }
00329
00330 return EXECRC_YIELD;
00331 }
00332
00333 bool LbmMinusExecStream::checkNeedForRestart()
00334 {
00335
00336
00337
00338 if (nFields == 0) {
00339 return false;
00340 } else if (inputType == EMPTY_INPUT) {
00341
00342 return true;
00343 } else if (needSubtrahendRestart) {
00344 bool skipMinus = canSkipMinus();
00345
00346
00347 if (!skipMinus && minSubtrahendRid > startRid) {
00348 restartSubtrahends();
00349 }
00350 return skipMinus;
00351 } else {
00352 return canSkipMinus();
00353 }
00354 }
00355
00356 bool LbmMinusExecStream::canSkipMinus()
00357 {
00358 LcsRid rid = baseRid;
00359 LcsRid endRid = baseRid + baseLen * LbmSegment::LbmOneByteSize - 1;
00360
00361
00362
00363 if (subtrahendsDone) {
00364
00365
00366 if (rid > maxSubtrahendRid) {
00367 return true;
00368 }
00369 } else {
00370
00371
00372
00373 for (uint i = 1; i < nInputs; i++) {
00374 if (endRid > segmentReaders[i].getMaxRidSet()) {
00375 return false;
00376 }
00377 }
00378 }
00379
00380 PBuffer seg = baseByteSeg;
00381 for (uint i = 0; i < baseLen; i++) {
00382 uint8_t byte = *((uint8_t *) seg);
00383 for (uint j = 0; j < LbmSegment::LbmOneByteSize; j++) {
00384 if (byte & 1) {
00385
00386 if (subtrahendBitmap.test(
00387 opaqueToInt(rid % SUBTRAHEND_BITMAP_SIZE)))
00388 {
00389 return false;
00390 }
00391 }
00392 byte = byte >> 1;
00393 rid++;
00394 }
00395 seg--;
00396 }
00397 return true;
00398 }
00399
00400 ExecStreamResult LbmMinusExecStream::minusSegments(
00401 LcsRid baseRid, PBuffer baseByteSeg, uint baseLen)
00402 {
00403 while (true) {
00404
00405
00406 int minInput;
00407 ExecStreamResult rc = findMinInput(minInput);
00408 if (rc == EXECRC_EOS) {
00409 return rc;
00410 }
00411
00412 LcsRid currRid;
00413 PBuffer currByteSeg;
00414 uint currLen;
00415 segmentReaders[minInput].readCurrentByteSegment(
00416 currRid, currByteSeg, currLen);
00417
00418
00419
00420 uint offset =
00421 opaqueToInt(currRid - baseRid) / LbmSegment::LbmOneByteSize;
00422 if (offset >= baseLen) {
00423 break;
00424 }
00425
00426
00427
00428 currLen = std::min(currLen, baseLen - offset);
00429
00430
00431
00432 PBuffer out = pByteSegBuf + baseLen - 1 - offset;
00433 uint len = currLen;
00434 while (len--) {
00435 *out-- &= ~(*currByteSeg--);
00436 }
00437
00438
00439
00440
00441 rc = segmentReaders[minInput].advanceToRid(
00442 currRid + currLen * LbmSegment::LbmOneByteSize);
00443 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00444 advancePending = true;
00445 advanceSubtrahendRid =
00446 currRid + currLen * LbmSegment::LbmOneByteSize;
00447 advanceSubtrahendInputNo = minInput;
00448 return rc;
00449 }
00450 }
00451
00452 return EXECRC_YIELD;
00453 }
00454
00455 ExecStreamResult LbmMinusExecStream::findMinInput(int &minInput)
00456 {
00457 minInput = -1;
00458
00459 for (uint i = 1; i < nInputs; i++) {
00460 if (inAccessors[i]->getState() == EXECBUF_EOS) {
00461 continue;
00462 }
00463
00464 LcsRid currRid;
00465 PBuffer currByteSeg;
00466 uint currLen;
00467 segmentReaders[i].readCurrentByteSegment(
00468 currRid, currByteSeg, currLen);
00469
00470 if (minInput == -1 || currRid < minSubtrahendRid) {
00471 minInput = i;
00472 minSubtrahendRid = currRid;
00473 }
00474 }
00475
00476 if (minInput == -1) {
00477
00478
00479
00480
00481 subtrahendsDone = true;
00482 for (uint i = 1; i < nInputs; i++) {
00483 LcsRid rid = segmentReaders[i].getMaxRidSet();
00484 if (rid > maxSubtrahendRid) {
00485 maxSubtrahendRid = rid;
00486 }
00487 }
00488 minSubtrahendRid = maxSubtrahendRid + 1;
00489 return EXECRC_EOS;
00490 } else {
00491 return EXECRC_YIELD;
00492 }
00493 }
00494
00495 bool LbmMinusExecStream::produceTuple(TupleData bitmapTuple)
00496 {
00497
00498
00499 if (nFields) {
00500 for (uint i = 0; i < nFields; i++) {
00501 prefixedBitmapTuple[i].copyFrom(prevTuple[i]);
00502 }
00503 assert (prefixedBitmapTuple.size() == nFields + bitmapTuple.size());
00504 for (uint i = 0; i < 3; i++) {
00505 prefixedBitmapTuple[nFields + i].copyFrom(bitmapTuple[i]);
00506 }
00507 return pOutAccessor->produceTuple(prefixedBitmapTuple);
00508 }
00509 return pOutAccessor->produceTuple(bitmapTuple);
00510 }
00511
00512 void LbmMinusExecStream::closeImpl()
00513 {
00514 subtrahendBitmap.resize(0);
00515 LbmBitOpExecStream::closeImpl();
00516 }
00517
00518 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmMinusExecStream.cpp#16 $");
00519
00520