LbmMinusExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmMinusExecStream.cpp#16 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 LucidEra, Inc.
00005 // Copyright (C) 2006-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/exec/ExecStreamBufAccessor.h"
00024 #include "fennel/exec/ExecStreamGraphImpl.h"
00025 #include "fennel/lucidera/bitmap/LbmMinusExecStream.h"
00026 
00027 FENNEL_BEGIN_CPPFILE("$Id:");
00028 
00029 void LbmMinusExecStream::prepare(LbmMinusExecStreamParams const &params)
00030 {
00031     LbmBitOpExecStream::prepare(params);
00032 
00033     if (nFields) {
00034         // compute an output tuple based on the minuend
00035         prefixedBitmapTuple.compute(inAccessors[0]->getTupleDesc());
00036 
00037         // prevTuple contains only the prefix fields, it has it's own
00038         // storage to track the previous tuple when the current tuple
00039         // pointers have moved forward
00040         TupleDescriptor prevTupleDesc;
00041         TupleDescriptor const &inputDesc = inAccessors[0]->getTupleDesc();
00042         for (int i = 0; i < nFields; i ++) {
00043             prevTupleDesc.push_back(inputDesc[i]);
00044         }
00045         prevTuple.computeAndAllocate(prevTupleDesc);
00046     }
00047     subtrahendBitmap.resize(0);
00048 }
00049 
00050 void LbmMinusExecStream::open(bool restart)
00051 {
00052     LbmBitOpExecStream::open(restart);
00053     subtrahendsDone = false;
00054     needToRead = true;
00055     minSubtrahendRid = LcsRid(0);
00056     maxSubtrahendRid = LcsRid(0);
00057     baseRid = LcsRid(0);
00058     advancePending = false;
00059     // since the subtrahends need to read till EOS, don't set a rowLimit
00060     rowLimit = 0;
00061     inputType = UNKNOWN_INPUT;
00062     copyPrefixPending = false;
00063     prevTupleValid = false;
00064     minuendReader.init(inAccessors[0], bitmapSegTuples[0]);
00065 
00066     if (nFields > 0) {
00067         subtrahendBitmap.resize(SUBTRAHEND_BITMAP_SIZE);
00068     }
00069     restartSubtrahends();
00070 }
00071 
00072 ExecStreamResult LbmMinusExecStream::execute(ExecStreamQuantum const &quantum)
00073 {
00074     ExecStreamResult rc;
00075 
00076     // On the first execution, check whether any subtrahend has data
00077     if (inputType == UNKNOWN_INPUT) {
00078         rc = advanceSubtrahends(LcsRid(0));
00079         if (rc != EXECRC_YIELD) {
00080             return rc;
00081         }
00082         int dummy;
00083         rc = findMinInput(dummy);
00084         if (rc == EXECRC_EOS) {
00085             inputType = EMPTY_INPUT;
00086             if (nFields == 0) {
00087                 subtrahendsDone = true;
00088             }
00089         } else {
00090             inputType = NONEMPTY_INPUT;
00091         }
00092     }
00093 
00094     if (producePending) {
00095         rc = producePendingOutput(0);
00096         if (rc != EXECRC_YIELD) {
00097             return rc;
00098         }
00099     }
00100 
00101     bool skipMinus = false;
00102     if (copyPrefixPending) {
00103         copyPrefix();
00104         copyPrefixPending = false;
00105         needToRead = false;
00106         // Since we bypassed the restart check when this current minuend
00107         // was read, we need to do the check here
00108         skipMinus = checkNeedForRestart();
00109     }
00110 
00111     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00112         // read a segment from the minuend if we've finished processing the
00113         // previous segment
00114         if (needToRead) {
00115             rc = readMinuendInputAndFlush(baseRid, baseByteSeg, baseLen);
00116             if (rc != EXECRC_YIELD) {
00117                 return rc;
00118             }
00119 
00120             // See if we need to restart the subtrahends
00121             skipMinus = checkNeedForRestart();
00122         }
00123 
00124         // Minus the subtrahends if they haven't all reached EOS in the
00125         // case where there are no keys.  In the case where there are keys,
00126         // the bitmap determines whether we can skip the minus.
00127         if ((nFields == 0 && !subtrahendsDone) || !skipMinus) {
00128             if (advancePending) {
00129                 rc =
00130                     advanceSingleSubtrahend(
00131                         advanceSubtrahendInputNo,
00132                         advanceSubtrahendRid);
00133                 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00134                     return rc;
00135                 }
00136                 advancePending = false;
00137             } else {
00138                 rc = advanceSubtrahends(baseRid);
00139                 if (rc != EXECRC_YIELD) {
00140                     return rc;
00141                 }
00142             }
00143 
00144             rc = minusSegments(baseRid, baseByteSeg, baseLen);
00145             if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00146                 return rc;
00147             }
00148         }
00149 
00150         // bump up the startrid past the segment just read and
00151         // write out the minuend segment
00152         needToRead = true;
00153         startRid = baseRid + baseLen * LbmSegment::LbmOneByteSize;
00154         addRid = baseRid;
00155         addByteSeg = pByteSegBuf;
00156         addLen = baseLen;
00157         if (!addSegments()) {
00158             return EXECRC_BUF_OVERFLOW;
00159         }
00160 
00161         // loop back to read the next minuend segment
00162     }
00163 
00164     return EXECRC_QUANTUM_EXPIRED;
00165 }
00166 
00167 ExecStreamResult LbmMinusExecStream::readMinuendInputAndFlush(
00168     LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
00169 {
00170     ExecStreamResult rc;
00171     bool unordered = false;
00172 
00173     // If there are no keys, read as the minuend as an ordered input.
00174     // Otherwise, read the minuend as a random sequence of segments.
00175     if (nFields == 0) {
00176         rc = readInput(0, currRid, currByteSeg, currLen);
00177     } else {
00178         rc = readMinuendInput(currRid, currByteSeg, currLen);
00179         if (currRid < startRid) {
00180             unordered = true;
00181         }
00182     }
00183     if (rc != EXECRC_YIELD) {
00184         return rc;
00185     }
00186 
00187     // Store the segment just read
00188     memcpy(pByteSegBuf, baseByteSeg - baseLen + 1, baseLen);
00189     needToRead = false;
00190     // reset the startrid to the rid just read in and write the
00191     // dynamic parameter so the subtrahends can skip forward to that
00192     // rid
00193     startRid = baseRid;
00194     writeStartRidParamValue();
00195     iInput = 1;
00196 
00197     // If there are no keys (the usual case), we never need to restart inputs
00198     if (nFields == 0) {
00199         return rc;
00200     }
00201 
00202     // If there are keys, then data is expected to come from an index. RIDs
00203     // may be ordered for each key, but are not ordered for the entire stream.
00204     // In fact, when minus keys are only a subset of an index's keys, then
00205     // RIDs may restart at any time.
00206     // (Ex: RIDs in index [K1, K2] are ordered for each pair [k1, k2].
00207     // However, a minus based on [K1] will be completely out of order.)
00208     //
00209     // Due to the lack of ordering, we may need to restart subtrahends
00210     // whenever the minuend is out of order so all of the subtrahend data
00211     // can be minused from the next minuend input.  That is handled outside
00212     // of this method because restarts don't always need to be done
00213     // immediately after a new key is read.
00214     //
00215     // We also flush the segment writer's current tuple. If it cannot be
00216     // written, then we can't copy the next prefix yet, because the old
00217     // values will be used to construct the pending output tuple.
00218     if (prevTupleValid) {
00219         if (minuendReader.getTupleChange()) {
00220             minuendReader.resetChangeListener();
00221             int keyComp = comparePrefixes();
00222             if (keyComp != 0 || unordered) {
00223                 needSubtrahendRestart = true;
00224                 if (!flush()) {
00225                     copyPrefixPending = true;
00226                     return EXECRC_BUF_OVERFLOW;
00227                 }
00228                 copyPrefix();
00229             }
00230         }
00231     } else {
00232         prevTupleValid = true;
00233         copyPrefix();
00234         minuendReader.resetChangeListener();
00235     }
00236     return rc;
00237 }
00238 
00239 ExecStreamResult LbmMinusExecStream::readMinuendInput(
00240     LcsRid &currRid, PBuffer &currByteSeg, uint &currLen)
00241 {
00242     LbmByteNumber byteNumber;
00243     ExecStreamResult rc = minuendReader.readSegmentAndAdvance(
00244         byteNumber, currByteSeg, currLen);
00245     currRid = byteNumberToRid(byteNumber);
00246     if (rc == EXECRC_EOS) {
00247         // write out the last pending segment
00248         if (! flush()) {
00249             return EXECRC_BUF_OVERFLOW;
00250         }
00251         pOutAccessor->markEOS();
00252         return EXECRC_EOS;
00253     } else if (rc != EXECRC_YIELD) {
00254         return rc;
00255     }
00256 
00257     // segment read should never be larger than space available
00258     // for segments
00259     assert(currLen <= bitmapBufSize);
00260 
00261     return EXECRC_YIELD;
00262 }
00263 
00264 int LbmMinusExecStream::comparePrefixes()
00265 {
00266     int ret =
00267         (inAccessors[0]->getTupleDesc()).compareTuplesKey(
00268             prevTuple,
00269             bitmapSegTuples[0],
00270             nFields);
00271     return ret;
00272 }
00273 
00274 void LbmMinusExecStream::restartSubtrahends()
00275 {
00276     minSubtrahendRid = LcsRid(0);
00277     advancePending = false;
00278     for (uint i = 1; i < nInputs; i++) {
00279         pGraph->getStreamInput(getStreamId(), i)->open(true);
00280         segmentReaders[i].init(
00281             inAccessors[i],
00282             bitmapSegTuples[i],
00283             (!subtrahendsDone && nFields > 0),
00284             &subtrahendBitmap);
00285     }
00286     iInput = 1;
00287     needSubtrahendRestart = false;
00288 }
00289 
00290 void LbmMinusExecStream::copyPrefix()
00291 {
00292     /*
00293       Need to make sure pointers are allocated before memcpy.
00294       resetBuffer restores the pointers to the associated buffer.
00295     */
00296     prevTuple.resetBuffer();
00297 
00298     for (int i = 0; i < nFields; i ++) {
00299         prevTuple[i].memCopyFrom(bitmapSegTuples[0][i]);
00300     }
00301 }
00302 
00303 ExecStreamResult LbmMinusExecStream::advanceSingleSubtrahend(
00304     int inputNo,
00305     LcsRid rid)
00306 {
00307     ExecStreamResult rc = segmentReaders[inputNo].advanceToRid(rid);
00308     return rc;
00309 }
00310 
00311 ExecStreamResult LbmMinusExecStream::advanceSubtrahends(LcsRid baseRid)
00312 {
00313     // no need to advance subtrahends if they're all positioned past the
00314     // minuend
00315     if (minSubtrahendRid > baseRid) {
00316         return EXECRC_YIELD;
00317     }
00318 
00319     // advance the subtrahends, resuming at the one where we last left off
00320     for (; iInput < nInputs; iInput++) {
00321         ExecStreamResult rc = segmentReaders[iInput].advanceToRid(baseRid);
00322         if (rc == EXECRC_EOS) {
00323             continue;
00324         }
00325         if (rc != EXECRC_YIELD) {
00326             return rc;
00327         }
00328     }
00329 
00330     return EXECRC_YIELD;
00331 }
00332 
00333 bool LbmMinusExecStream::checkNeedForRestart()
00334 {
00335     // Return value indicates whether the minus can be skipped, not whether
00336     // a restart was done.
00337 
00338     if (nFields == 0) {
00339         return false;
00340     } else if (inputType == EMPTY_INPUT) {
00341         // If the input is empty, we can always bypass the minus
00342         return true;
00343     } else if (needSubtrahendRestart) {
00344         bool skipMinus = canSkipMinus();
00345         // If there are potentially overlapping rids and the subtrahend is
00346         // positioned past the current minuend, we need to restart
00347         if (!skipMinus && minSubtrahendRid > startRid) {
00348             restartSubtrahends();
00349         }
00350         return skipMinus;
00351     } else {
00352         return canSkipMinus();
00353     }
00354 }
00355 
00356 bool LbmMinusExecStream::canSkipMinus()
00357 {
00358     LcsRid rid = baseRid;
00359     LcsRid endRid = baseRid + baseLen * LbmSegment::LbmOneByteSize - 1;
00360 
00361     // Determine whether the rids in the current minuend segment are
00362     // "covered" by the bitmap in its current state
00363     if (subtrahendsDone) {
00364         // If the first rid we're interested in extends past the max rid read
00365         // from all subtrahends, then there are no rids to subtract off.
00366         if (rid > maxSubtrahendRid) {
00367             return true;
00368         }
00369     } else {
00370         // If the last rid we're interested in extends past the max rid read
00371         // from any subtrahend, then we can't use the bitmap to determine if
00372         // we can skip the minus.
00373         for (uint i = 1; i < nInputs; i++) {
00374             if (endRid > segmentReaders[i].getMaxRidSet()) {
00375                 return false;
00376             }
00377         }
00378     }
00379 
00380     PBuffer seg = baseByteSeg;
00381     for (uint i = 0; i < baseLen; i++) {
00382         uint8_t byte = *((uint8_t *) seg);
00383         for (uint j = 0; j < LbmSegment::LbmOneByteSize; j++) {
00384             if (byte & 1) {
00385                 // once we find a match, no need to look any further
00386                 if (subtrahendBitmap.test(
00387                     opaqueToInt(rid % SUBTRAHEND_BITMAP_SIZE)))
00388                 {
00389                     return false;
00390                 }
00391             }
00392             byte = byte >> 1;
00393             rid++;
00394         }
00395         seg--;
00396     }
00397     return true;
00398 }
00399 
00400 ExecStreamResult LbmMinusExecStream::minusSegments(
00401     LcsRid baseRid, PBuffer baseByteSeg, uint baseLen)
00402 {
00403     while (true) {
00404         // find the subtrahend with the minimum startrid and read its current
00405         // segment
00406         int minInput;
00407         ExecStreamResult rc = findMinInput(minInput);
00408         if (rc == EXECRC_EOS) {
00409             return rc;
00410         }
00411 
00412         LcsRid currRid;
00413         PBuffer currByteSeg;
00414         uint currLen;
00415         segmentReaders[minInput].readCurrentByteSegment(
00416             currRid, currByteSeg, currLen);
00417 
00418         // if the subtrahends are not within the range of the minuend's
00419         // current rid range, ignore the current segment and get a new one
00420         uint offset =
00421             opaqueToInt(currRid - baseRid) / LbmSegment::LbmOneByteSize;
00422         if (offset >= baseLen) {
00423             break;
00424         }
00425 
00426         // only read from the subtrahends the amount that will match the
00427         // minuend's segment
00428         currLen = std::min(currLen, baseLen - offset);
00429 
00430         // minus from the minuend -- note that segments are stored
00431         // backwards
00432         PBuffer out = pByteSegBuf + baseLen - 1 - offset;
00433         uint len = currLen;
00434         while (len--) {
00435             *out-- &= ~(*currByteSeg--);
00436         }
00437 
00438         // advance the subtrahend by the amount read in; note that we don't
00439         // return if this subtrahend has reached EOS, as there may still be
00440         // other subtrahends that aren't in the EOS state
00441         rc = segmentReaders[minInput].advanceToRid(
00442                 currRid + currLen * LbmSegment::LbmOneByteSize);
00443         if (rc != EXECRC_YIELD && rc != EXECRC_EOS) {
00444             advancePending = true;
00445             advanceSubtrahendRid =
00446                 currRid + currLen * LbmSegment::LbmOneByteSize;
00447             advanceSubtrahendInputNo = minInput;
00448             return rc;
00449         }
00450     }
00451 
00452     return EXECRC_YIELD;
00453 }
00454 
00455 ExecStreamResult LbmMinusExecStream::findMinInput(int &minInput)
00456 {
00457     minInput = -1;
00458 
00459     for (uint i = 1; i < nInputs; i++) {
00460         if (inAccessors[i]->getState() == EXECBUF_EOS) {
00461             continue;
00462         }
00463 
00464         LcsRid currRid;
00465         PBuffer currByteSeg;
00466         uint currLen;
00467         segmentReaders[i].readCurrentByteSegment(
00468             currRid, currByteSeg, currLen);
00469 
00470         if (minInput == -1 || currRid < minSubtrahendRid) {
00471             minInput = i;
00472             minSubtrahendRid = currRid;
00473         }
00474     }
00475 
00476     if (minInput == -1) {
00477         // Note that once we've made one pass over the subtrahends, by setting
00478         // subtrahendsDone, we'll avoid resetting the bits on subsequent passes.
00479         // Position minSubtrahendRid past the max subtrahend rid since the
00480         // subtrahends are no longer positioned at that minimum rid.
00481         subtrahendsDone = true;
00482         for (uint i = 1; i < nInputs; i++) {
00483             LcsRid rid = segmentReaders[i].getMaxRidSet();
00484             if (rid > maxSubtrahendRid) {
00485                 maxSubtrahendRid = rid;
00486             }
00487         }
00488         minSubtrahendRid = maxSubtrahendRid + 1;
00489         return EXECRC_EOS;
00490     } else {
00491         return EXECRC_YIELD;
00492     }
00493 }
00494 
00495 bool LbmMinusExecStream::produceTuple(TupleData bitmapTuple)
00496 {
00497     // If the minuend contained prefix fields, they are prepended to
00498     // the output: ([optional prefix fields], bitmap)
00499     if (nFields) {
00500         for (uint i = 0; i < nFields; i++) {
00501             prefixedBitmapTuple[i].copyFrom(prevTuple[i]);
00502         }
00503         assert (prefixedBitmapTuple.size() == nFields + bitmapTuple.size());
00504         for (uint i = 0; i < 3; i++) {
00505             prefixedBitmapTuple[nFields + i].copyFrom(bitmapTuple[i]);
00506         }
00507         return pOutAccessor->produceTuple(prefixedBitmapTuple);
00508     }
00509     return pOutAccessor->produceTuple(bitmapTuple);
00510 }
00511 
00512 void LbmMinusExecStream::closeImpl()
00513 {
00514     subtrahendBitmap.resize(0);
00515     LbmBitOpExecStream::closeImpl();
00516 }
00517 
00518 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmMinusExecStream.cpp#16 $");
00519 
00520 // End LbmMinusExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1