00001 /* 00002 // $Id: //open/dev/fennel/lucidera/bitmap/LbmIntersectExecStream.cpp#10 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2006-2009 LucidEra, Inc. 00005 // Copyright (C) 2006-2009 The Eigenbase Project 00006 // 00007 // This program is free software; you can redistribute it and/or modify it 00008 // under the terms of the GNU General Public License as published by the Free 00009 // Software Foundation; either version 2 of the License, or (at your option) 00010 // any later version approved by The Eigenbase Project. 00011 // 00012 // This program is distributed in the hope that it will be useful, 00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 // GNU General Public License for more details. 00016 // 00017 // You should have received a copy of the GNU General Public License 00018 // along with this program; if not, write to the Free Software 00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00020 */ 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/exec/ExecStreamBufAccessor.h" 00024 #include "fennel/lucidera/bitmap/LbmIntersectExecStream.h" 00025 00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmIntersectExecStream.cpp#10 $"); 00027 00028 void LbmIntersectExecStream::prepare(LbmIntersectExecStreamParams const ¶ms) 00029 { 00030 LbmBitOpExecStream::prepare(params); 00031 } 00032 00033 void LbmIntersectExecStream::open(bool restart) 00034 { 00035 LbmBitOpExecStream::open(restart); 00036 iInput = 0; 00037 nMatches = 0; 00038 minLen = 0; 00039 } 00040 00041 ExecStreamResult LbmIntersectExecStream::execute( 00042 ExecStreamQuantum const &quantum) 00043 { 00044 if (producePending) { 00045 ExecStreamResult rc = producePendingOutput(iInput); 00046 if (rc != EXECRC_YIELD) { 00047 return rc; 00048 } 00049 nMatches = 0; 00050 } 00051 00052 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00053 while (nMatches < nInputs) { 00054 // get the first segment from the input with at least a starting 00055 // rid of startRid 00056 LcsRid currRid; 00057 PBuffer currByteSeg; 00058 uint currLen; 00059 00060 ExecStreamResult rc = readInput( 00061 iInput, currRid, currByteSeg, currLen); 00062 if (rc != EXECRC_YIELD) { 00063 return rc; 00064 } 00065 00066 // if the starting rid of this current segment that has just 00067 // been read matches the desired starting rid, indicate that 00068 // we have a match; otherwise, reset the starting rid to the 00069 // new, larger rid value and start all over again, looking 00070 // for matches from all other input streams 00071 assert(currRid >= startRid); 00072 if (currRid > startRid) { 00073 startRid = currRid; 00074 writeStartRidParamValue(); 00075 nMatches = 1; 00076 minLen = currLen; 00077 } else { 00078 // the shortest segment indicates where the overlapping ends 00079 if (nMatches == 0 || currLen < minLen) { 00080 minLen = currLen; 00081 } 00082 nMatches++; 00083 } 00084 00085 // now try reading segments from the other streams 00086 iInput = ++iInput % nInputs; 00087 } 00088 00089 // intersect the overlapping segments 00090 if (!intersectSegments(minLen)) { 00091 return EXECRC_BUF_OVERFLOW; 00092 } 00093 00094 nMatches = 0; 00095 } 00096 00097 return EXECRC_QUANTUM_EXPIRED; 00098 } 00099 00100 bool LbmIntersectExecStream::intersectSegments(uint len) 00101 { 00102 LcsRid currRid; 00103 PBuffer currByteSeg; 00104 uint currLen; 00105 00106 // initialize temporary buffer with all 1's 00107 for (uint i = 0; i < len; i++) { 00108 pByteSegBuf[i] = 0xff; 00109 } 00110 00111 // retrieve each current segment and perform the AND operation 00112 for (uint i = 0; i < nInputs; i++) { 00113 segmentReaders[i].readCurrentByteSegment( 00114 currRid, currByteSeg, currLen); 00115 if (i == 0) { 00116 addRid = currRid; 00117 } else { 00118 permAssert(addRid == currRid); 00119 } 00120 // byte segments are stored in reverse order, so currByteSeg points 00121 // to the end of the buffer whereas pByteSegBuf points to the 00122 // beginning but needs to be filled in backwards 00123 for (int j = 0; j < len; j++) { 00124 pByteSegBuf[len - j - 1] &= currByteSeg[-j]; 00125 } 00126 } 00127 00128 // the next set of segments to read will start past the end of 00129 // the overlapping segments just read 00130 startRid = addRid + len * LbmSegment::LbmOneByteSize; 00131 writeStartRidParamValue(); 00132 00133 // add the AND'd segment to the segment under construction; 00134 // if the segment is full and the output buffer fills up writing 00135 // out the segment, return 00136 assert(len > 0); 00137 addLen = len; 00138 addByteSeg = pByteSegBuf; 00139 if (!addSegments()) { 00140 return false; 00141 } 00142 00143 return true; 00144 } 00145 00146 void LbmIntersectExecStream::closeImpl() 00147 { 00148 LbmBitOpExecStream::closeImpl(); 00149 } 00150 00151 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmIntersectExecStream.cpp#10 $"); 00152 00153 // End LbmIntersectExecStream.cpp