LcsRowScanExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 LucidEra, Inc.
00005 // Copyright (C) 2005-2009 The Eigenbase Project
00006 //
00007 // This program is free software; you can redistribute it and/or modify it
00008 // under the terms of the GNU General Public License as published by the Free
00009 // Software Foundation; either version 2 of the License, or (at your option)
00010 // any later version approved by The Eigenbase Project.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this program; if not, write to the Free Software
00019 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020 */
00021 
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/tuple/StandardTypeDescriptor.h"
00024 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00025 #include "fennel/exec/ExecStreamBufAccessor.h"
00026 #include "fennel/common/SearchEndpoint.h"
00027 #include <math.h>
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $");
00030 
00031 int32_t LcsRowScanExecStreamParams::defaultSystemSamplingClumps = 10;
00032 
00033 LcsRowScanExecStream::LcsRowScanExecStream()
00034 :
00035     LcsRowScanBaseExecStream(),
00036     ridRunIter(&ridRuns)
00037 {
00038     ridRuns.resize(4000);
00039 }
00040 
00041 void LcsRowScanExecStream::prepareResidualFilters(
00042     LcsRowScanExecStreamParams const &params)
00043 {
00044     nFilters = params.residualFilterCols.size();
00045 
00046     /*
00047      * compute the outputTupleData position of filter columns
00048      */
00049     VectorOfUint valueCols;
00050     uint j, k = 0;
00051     for (uint i = 0;  i < nFilters; i++) {
00052         for (j = 0; j < params.outputProj.size(); j++) {
00053             if (params.outputProj[j] == params.residualFilterCols[i]) {
00054                 valueCols.push_back(j);
00055                 break;
00056             }
00057         }
00058 
00059         if (j >= params.outputProj.size()) {
00060             valueCols.push_back(params.outputProj.size() +  k);
00061             k++;
00062         }
00063     }
00064 
00065     /*
00066      * compute the cluster id and cluster position
00067      */
00068     uint valueClus;
00069     uint clusterPos;
00070     uint clusterStart = 0;
00071     uint realClusterStart = 0;
00072 
00073     filters.reset(new PLcsResidualColumnFilters[nFilters]);
00074 
00075     for (uint i = 0; i < nClusters; i++) {
00076         uint clusterEnd = clusterStart +
00077             params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1;
00078 
00079         for (uint j = 0; j < nFilters; j++) {
00080             if (params.residualFilterCols[j] >= clusterStart &&
00081                 params.residualFilterCols[j] <= clusterEnd)
00082             {
00083                 valueClus = i;
00084 
00085                 /*
00086                  * find the position within the cluster
00087                  */
00088                 for (uint k = 0; k < projMap.size(); k++) {
00089                     if (projMap[k] == valueCols[j]) {
00090                         clusterPos = k - realClusterStart -
00091                             nonClusterCols.size();
00092 
00093                         LcsResidualColumnFilters &filter =
00094                             pClusters[valueClus]->
00095                             clusterCols[clusterPos].
00096                             getFilters();
00097 
00098                         filters[j] = &filter;
00099 
00100                         filter.hasResidualFilters = true;
00101 
00102                         filter.readerKeyProj.push_back(valueCols[j]);
00103                         filter.inputKeyDesc.projectFrom(
00104                             projDescriptor,
00105                             filter.readerKeyProj);
00106                         filter.attrAccessor.compute(
00107                             filter.inputKeyDesc[0]);
00108 
00109                         filter.lowerBoundProj.push_back(1);
00110                         filter.upperBoundProj.push_back(3);
00111                         filter.readerKeyData.computeAndAllocate(
00112                             filter.inputKeyDesc);
00113 
00114                         break;
00115                     }
00116                 }
00117                 // Continue with the same cluster for more filters
00118             }
00119         }
00120         // Look for filters in the next cluster; modify cluster boundaries
00121         clusterStart = clusterEnd + 1;
00122         realClusterStart += pClusters[i]->nColsToRead;
00123     }
00124 }
00125 
00126 void LcsRowScanExecStream::prepare(LcsRowScanExecStreamParams const &params)
00127 {
00128     LcsRowScanBaseExecStream::prepare(params);
00129 
00130     isFullScan = params.isFullScan;
00131     hasExtraFilter = params.hasExtraFilter;
00132 
00133     // Set up rid bitmap input stream
00134     ridTupleData.compute(inAccessors[0]->getTupleDesc());
00135 
00136     // validate input stream parameters
00137     TupleDescriptor inputDesc = inAccessors[0]->getTupleDesc();
00138     assert(inputDesc.size() == 3);
00139     StandardTypeDescriptorFactory stdTypeFactory;
00140     TupleAttributeDescriptor expectedRidDesc(
00141         stdTypeFactory.newDataType(STANDARD_TYPE_RECORDNUM));
00142     assert(inputDesc[0] == expectedRidDesc);
00143 
00144     assert(hasExtraFilter == (inAccessors.size() > 1));
00145 
00146     if (hasExtraFilter) {
00147         prepareResidualFilters(params);
00148     } else {
00149         nFilters = 0;
00150     }
00151 
00152     /*
00153      * projDescriptor now also includes filter columns
00154      */
00155     for (uint i = 0; i < params.outputProj.size(); i++) {
00156         outputProj.push_back(i);
00157     }
00158 
00159     pOutAccessor->setTupleShape(pOutAccessor->getTupleDesc());
00160     outputTupleData.computeAndAllocate(projDescriptor);
00161 
00162     /*
00163      * build the real output accessor
00164      * it will be used to unmarshal data into the
00165      * real output row: projOutputTuple.
00166      */
00167     projOutputTupleData.compute(pOutAccessor->getTupleDesc());
00168 
00169     attrAccessors.resize(projDescriptor.size());
00170     for (uint i = 0; i < projDescriptor.size(); ++i) {
00171         attrAccessors[i].compute(projDescriptor[i]);
00172     }
00173 
00174     /* configure sampling */
00175     samplingMode = params.samplingMode;
00176 
00177     if (samplingMode != SAMPLING_OFF) {
00178         samplingRate = params.samplingRate;
00179         rowCount = params.samplingRowCount;
00180 
00181         if (samplingMode == SAMPLING_BERNOULLI) {
00182             isSamplingRepeatable = params.samplingIsRepeatable;
00183             repeatableSeed = params.samplingRepeatableSeed;
00184             samplingClumps = -1;
00185 
00186             samplingRng.reset(new BernoulliRng(samplingRate));
00187         } else {
00188             assert(isFullScan);
00189 
00190             samplingClumps = params.samplingClumps;
00191             assert(samplingClumps > 0);
00192 
00193             isSamplingRepeatable = false;
00194         }
00195     }
00196 }
00197 
00198 void LcsRowScanExecStream::open(bool restart)
00199 {
00200     LcsRowScanBaseExecStream::open(restart);
00201     producePending = false;
00202     tupleFound = false;
00203     nRidsRead = 0;
00204     ridRunsBuilt = false;
00205     currRidRun.startRid = LcsRid(MAXU);
00206     currRidRun.nRids = 0;
00207     ridRuns.clear();
00208     ridRunIter.reset();
00209 
00210     if (isFullScan) {
00211         inputRid = LcsRid(0);
00212         readDeletedRid = true;
00213         deletedRidEos = false;
00214     }
00215     nextRid = LcsRid(0);
00216     ridReader.init(inAccessors[0], ridTupleData);
00217 
00218     /*
00219      * Read from the 1st input, but only if we're not doing a restart.
00220      * Restarts can reuse the structures set up on the initial open
00221      * because the current assumption is that the residual filter
00222      * values don't change in between restarts.  If on restart, if a filter
00223      * wasn't completely initialized, then reinitialize it.
00224      */
00225     if (!restart) {
00226         iFilterToInitialize = 0;
00227     } else if (iFilterToInitialize < nFilters) {
00228         if (!filters[iFilterToInitialize]->filterDataInitialized) {
00229             filters[iFilterToInitialize]->filterData.clear();
00230         }
00231     }
00232 
00233     if (samplingMode == SAMPLING_BERNOULLI) {
00234         if (isSamplingRepeatable) {
00235             samplingRng->reseed(repeatableSeed);
00236         } else if (!restart) {
00237             samplingRng->reseed(static_cast<uint32_t>(time(0)));
00238         }
00239     } else if (samplingMode == SAMPLING_SYSTEM) {
00240         clumpSize = 0;
00241         clumpDistance = 0;
00242         clumpPos = 0;
00243         numClumpsBuilt = 0;
00244 
00245         initializeSystemSampling();
00246     }
00247 }
00248 
00249 void LcsRowScanExecStream::getResourceRequirements(
00250     ExecStreamResourceQuantity &minQuantity,
00251     ExecStreamResourceQuantity &optQuantity)
00252 {
00253     LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity);
00254 }
00255 
00256 bool LcsRowScanExecStream::initializeFiltersIfNeeded()
00257 {
00258     /*
00259      * initialize the filters local data
00260      */
00261     for (; iFilterToInitialize < nFilters; iFilterToInitialize++) {
00262         SharedExecStreamBufAccessor &pInAccessor =
00263             inAccessors[iFilterToInitialize + 1];
00264         TupleAccessor &inputAccessor =
00265             pInAccessor->getConsumptionTupleAccessor();
00266 
00267         if (pInAccessor->getState() != EXECBUF_EOS) {
00268             PLcsResidualColumnFilters filter = filters[iFilterToInitialize];
00269 
00270             while (pInAccessor->demandData()) {
00271                 SharedLcsResidualFilter filterData(new LcsResidualFilter);
00272 
00273                 pInAccessor->accessConsumptionTuple();
00274 
00275                 /*
00276                  * Build lower and upper bound data
00277                  */
00278                 filterData->boundData.compute(pInAccessor->getTupleDesc());
00279                 filterData->boundBuf.reset(
00280                     new FixedBuffer[inputAccessor.getCurrentByteCount()]);
00281 
00282                 memcpy(
00283                     filterData->boundBuf.get(),
00284                     pInAccessor->getConsumptionStart(),
00285                     inputAccessor.getCurrentByteCount());
00286 
00287                 /*
00288                  * inputAccessor is used to unmarshal into boundData.
00289                  * in order to do this, its current buffer is set to
00290                  * boundBuf and restored.
00291                  */
00292                 PConstBuffer tmpBuf;
00293                 tmpBuf = inputAccessor.getCurrentTupleBuf();
00294                 inputAccessor.setCurrentTupleBuf(filterData->boundBuf.get());
00295                 inputAccessor.unmarshal(filterData->boundData);
00296                 inputAccessor.setCurrentTupleBuf(tmpBuf);
00297 
00298                 /*
00299                  * record directives.
00300                  */
00301                 filterData->lowerBoundDirective =
00302                     SearchEndpoint(*filterData->boundData[0].pData);
00303                 filterData->upperBoundDirective =
00304                     SearchEndpoint(*filterData->boundData[2].pData);
00305 
00306                 filter->filterData.push_back(filterData);
00307 
00308                 pInAccessor->consumeTuple();
00309             }
00310 
00311             if (pInAccessor->getState() != EXECBUF_EOS) {
00312                 return false;
00313             }
00314         }
00315         filters[iFilterToInitialize]->filterDataInitialized = true;
00316     }
00317     return true;
00318 }
00319 
00320 
00321 void LcsRowScanExecStream::initializeSystemSampling()
00322 {
00323     clumpPos = 0;
00324     clumpSkipPos = 0;
00325 
00326     FENNEL_TRACE(TRACE_FINE, "rowCount = " << rowCount);
00327     FENNEL_TRACE(
00328         TRACE_FINE, "samplingRate = " << static_cast<double>(samplingRate));
00329 
00330     if (rowCount <= 0) {
00331         // Handle empty table or non-sense input.
00332         clumpSize = 1;
00333         clumpDistance = 0;
00334         numClumps = 0;
00335         return;
00336     }
00337 
00338     // Manipulate this value in a separate member field so we don't
00339     // mistakenly modify our stored copy of the parameter.
00340     numClumps = samplingClumps;
00341 
00342     // Compute clump size and distance
00343     int64_t sampleSize =
00344         static_cast<uint64_t>(
00345             round(
00346                 static_cast<double>(rowCount) *
00347                 static_cast<double>(samplingRate)));
00348     if (sampleSize < numClumps) {
00349         // Read at least as many rows as there are clumps, even if sample rate
00350         // is very small.
00351         sampleSize = numClumps;
00352     }
00353 
00354     if (sampleSize > rowCount) {
00355         // samplingRate should be < 1.0, but handle the case where it isn't,
00356         // or where there are fewer rows than clumps.
00357         sampleSize = rowCount;
00358         numClumps = 1;
00359     }
00360 
00361     FENNEL_TRACE(TRACE_FINE, "sampleSize = " << sampleSize);
00362 
00363     clumpSize =
00364         static_cast<uint64_t>(
00365             round(
00366                 static_cast<double>(sampleSize) /
00367                 static_cast<double>(numClumps)));
00368     assert(sampleSize >= clumpSize);
00369     assert(clumpSize >= 1);
00370 
00371     FENNEL_TRACE(TRACE_FINE, "clumpSize = " << clumpSize);
00372 
00373     if (numClumps > 1) {
00374         // Arrange for the last clump to end at the end of the table.
00375         clumpDistance =
00376             static_cast<uint64_t>(
00377                 round(
00378                     static_cast<double>(rowCount - sampleSize) /
00379                     static_cast<double>(numClumps - 1)));
00380 
00381         // Rounding can cause us to push the final clump past the end of the
00382         // table.  Avoid this when possible.
00383         uint64_t rowsRequired =
00384             (clumpSize + clumpDistance) * (numClumps - 1) + clumpSize;
00385         if (rowsRequired > rowCount && clumpDistance > 0) {
00386             clumpDistance--;
00387         }
00388     } else {
00389         // The entire sample will come from the beginning of the table.
00390         clumpDistance = (rowCount - sampleSize);
00391     }
00392 
00393     FENNEL_TRACE(TRACE_FINE, "clumpDistance = " << clumpDistance);
00394 }
00395 
00396 
00397 ExecStreamResult LcsRowScanExecStream::execute(ExecStreamQuantum const &quantum)
00398 {
00399     if (!initializeFiltersIfNeeded()) {
00400         return EXECRC_BUF_UNDERFLOW;
00401     }
00402 
00403     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00404         uint iClu;
00405         bool passedFilter;
00406 
00407         while (!producePending) {
00408             // No need to fill the rid run buffer each time through the loop
00409             if (!ridRunsBuilt && ridRuns.nFreeSpace() > 100) {
00410                 ExecStreamResult rc = fillRidRunBuffer();
00411                 if (rc != EXECRC_YIELD) {
00412                     return rc;
00413                 }
00414             }
00415 
00416             // Determine the rid that needs to be fetched based on the
00417             // contents of the rid run buffer.
00418             LcsRid rid =
00419                 LcsClusterReader::getFetchRids(ridRunIter, nextRid, true);
00420             if (rid == LcsRid(MAXU)) {
00421                 assert(ridRunIter.done());
00422                 pOutAccessor->markEOS();
00423                 return EXECRC_EOS;
00424             }
00425 
00426             uint prevClusterEnd = 0;
00427             // reset datum pointers, in case previous tuple had nulls
00428             outputTupleData.resetBuffer();
00429 
00430             // Read the non-cluster columns first
00431             for (uint j = 0; j < nonClusterCols.size(); j++) {
00432                 if (nonClusterCols[j] == LCS_RID_COLUMN_ID) {
00433                     memcpy(
00434                         const_cast<PBuffer>(outputTupleData[projMap[j]].pData),
00435                         (PBuffer) &rid, sizeof(LcsRid));
00436                     prevClusterEnd++;
00437                 } else {
00438                     permAssert(false);
00439                 }
00440             }
00441 
00442             // Then go through each cluster, forming rows and checking ranges
00443             for (iClu = 0, passedFilter = true; iClu <  nClusters; iClu++) {
00444                 SharedLcsClusterReader &pScan = pClusters[iClu];
00445 
00446                 // Resync the cluster reader to the current rid position
00447                 pScan->catchUp(ridRunIter.getCurrPos(), nextRid);
00448 
00449                 // if we have not read a batch yet or we've reached the
00450                 // end of a batch, position to the rid we want to read
00451 
00452                 if (!pScan->isPositioned() || rid >= pScan->getRangeEndRid()) {
00453                     bool rc = pScan->position(rid);
00454 
00455                     // rid not found, so just consume the rid and
00456                     // continue
00457                     if (rc == false)
00458                         break;
00459 
00460                     assert(rid >= pScan->getRangeStartRid()
00461                            && rid < pScan->getRangeEndRid());
00462 
00463                     // Tell all column scans that the batch has changed.
00464                     syncColumns(pScan);
00465                 } else {
00466                     // Should not have moved into previous batch.
00467                     assert(rid > pScan->getRangeStartRid());
00468 
00469                     // move to correct position within scan; we know we
00470                     // will not fall off end of batch, so use non-checking
00471                     // function (for speed)
00472                     pScan->advanceWithinBatch(
00473                         opaqueToInt(rid - pScan->getCurrentRid()));
00474                 }
00475 
00476                 passedFilter =
00477                     readColVals(
00478                         pScan,
00479                         outputTupleData,
00480                         prevClusterEnd);
00481                 if (!passedFilter) {
00482                     break;
00483                 }
00484                 prevClusterEnd += pScan->nColsToRead;
00485             }
00486 
00487             if (!passedFilter) {
00488                 continue;
00489             }
00490             if (iClu == nClusters) {
00491                 tupleFound = true;
00492             }
00493             producePending = true;
00494         }
00495 
00496         // produce tuple
00497         projOutputTupleData.projectFrom(outputTupleData, outputProj);
00498         if (tupleFound) {
00499             if (!pOutAccessor->produceTuple(projOutputTupleData)) {
00500                 return EXECRC_BUF_OVERFLOW;
00501             }
00502         }
00503         producePending = false;
00504 
00505         if (isFullScan) {
00506             // if tuple not found, reached end of table
00507             if (!tupleFound) {
00508                 pOutAccessor->markEOS();
00509                 return EXECRC_EOS;
00510             }
00511         }
00512 
00513         tupleFound = false;
00514         nRidsRead++;
00515     }
00516 
00517     return EXECRC_QUANTUM_EXPIRED;
00518 }
00519 
00520 ExecStreamResult LcsRowScanExecStream::fillRidRunBuffer()
00521 {
00522     ExecStreamResult rc;
00523     RecordNum nRows;
00524 
00525     do {
00526         if (!isFullScan) {
00527             rc = ridReader.readRidAndAdvance(inputRid);
00528             if (rc == EXECRC_EOS) {
00529                 ridRunsBuilt = true;
00530                 break;
00531             }
00532             if (rc != EXECRC_YIELD) {
00533                 return rc;
00534             }
00535             nRows = 1;
00536 
00537         } else {
00538             if (!deletedRidEos && readDeletedRid) {
00539                 rc = ridReader.readRidAndAdvance(deletedRid);
00540                 if (rc == EXECRC_EOS) {
00541                     deletedRidEos = true;
00542                     if (samplingMode == SAMPLING_OFF) {
00543                         ridRunsBuilt = true;
00544                     } else if (samplingMode == SAMPLING_SYSTEM &&
00545                         numClumps == 0)
00546                     {
00547                         ridRunsBuilt = true;
00548                         break;
00549                     }
00550                 } else if (rc != EXECRC_YIELD) {
00551                     return rc;
00552                 } else {
00553                     readDeletedRid = false;
00554                 }
00555             }
00556             // skip over deleted rids
00557             if (!deletedRidEos && inputRid == deletedRid) {
00558                 inputRid++;
00559                 readDeletedRid = true;
00560                 continue;
00561             } else {
00562                 if (deletedRidEos) {
00563                     nRows = MAXU;
00564                 } else {
00565                     nRows = opaqueToInt(deletedRid - inputRid);
00566                 }
00567             }
00568         }
00569 
00570         if (samplingMode != SAMPLING_OFF) {
00571             if (samplingMode == SAMPLING_SYSTEM) {
00572                 if (clumpSkipPos > 0) {
00573                     // We need to skip clumpSkipPos RIDs, taking into
00574                     // account deleted RIDs.  If all deleted RIDs have been
00575                     // processed (a), we can just skip forward to the next
00576                     // clump.  If we know the next deleted RID, skip to the
00577                     // next clump if we can (b), else skip to the deleted
00578                     // RID (c).  Processing will return here to handle the
00579                     // remaining clumpSkipPos rows when we reach the next
00580                     // live RID.  If we don't know the next deleted RID
00581                     // (d), skip the current live RID, let the deleted RID
00582                     // processing occur above and then processing will
00583                     // return here to deal with the remaining clumpSkipPos
00584                     // rows.
00585                     if (deletedRidEos) {
00586                         // (a)
00587                         inputRid += clumpSkipPos;
00588                         clumpSkipPos = 0;
00589                     } else if (!readDeletedRid) {
00590                         if (deletedRid > inputRid + clumpSkipPos) {
00591                             // (b)
00592                             inputRid += clumpSkipPos;
00593                             clumpSkipPos = 0;
00594                             nRows = opaqueToInt(deletedRid - inputRid);
00595                         } else {
00596                             // (c)
00597                             clumpSkipPos -= opaqueToInt(deletedRid - inputRid);
00598                             inputRid = deletedRid;
00599                             continue;
00600                         }
00601                     } else {
00602                         // (d)
00603                         clumpSkipPos--;
00604                         inputRid++;
00605                         continue;
00606                     }
00607                 }
00608 
00609                 if (nRows >= clumpSize - clumpPos) {
00610                     // Scale back the size of the rid run based on the
00611                     // clump size
00612                     nRows = clumpSize - clumpPos;
00613                     clumpPos = 0;
00614                     clumpSkipPos = clumpDistance;
00615                     if (++numClumpsBuilt == numClumps) {
00616                         ridRunsBuilt = true;
00617                     }
00618                 } else {
00619                     // We only have enough rids for a partial clump
00620                     clumpPos += nRows;
00621                 }
00622             } else {
00623                 // Bernoulli sampling
00624                 if (opaqueToInt(inputRid) >= opaqueToInt(rowCount)) {
00625                     ridRunsBuilt = true;
00626                     break;
00627                 }
00628                 if (!samplingRng->nextValue()) {
00629                     inputRid++;
00630                     continue;
00631                 }
00632                 nRows = 1;
00633             }
00634         }
00635 
00636         if (currRidRun.startRid == LcsRid(MAXU)) {
00637             currRidRun.startRid = inputRid;
00638             currRidRun.nRids = nRows;
00639         } else if (currRidRun.startRid + currRidRun.nRids == inputRid) {
00640             // If the next set of rids is contiguous with the previous,
00641             // continue adding on to the current run
00642             if (nRows == RecordNum(MAXU)) {
00643                 currRidRun.nRids = MAXU;
00644             } else {
00645                 currRidRun.nRids += nRows;
00646             }
00647         } else {
00648             // Otherwise, end the current one
00649             ridRuns.push_back(currRidRun);
00650 
00651             // And start a new one
00652             currRidRun.startRid = inputRid;
00653             currRidRun.nRids = nRows;
00654         }
00655 
00656         if (isFullScan) {
00657             inputRid += nRows;
00658         }
00659     } while (ridRuns.spaceAvailable() && !ridRunsBuilt);
00660 
00661     // Write out the last run
00662     if (ridRunsBuilt && currRidRun.startRid != LcsRid(MAXU)) {
00663         ridRuns.push_back(currRidRun);
00664     }
00665 
00666     if (ridRunsBuilt) {
00667         ridRuns.setReadOnly();
00668     }
00669     return EXECRC_YIELD;
00670 }
00671 
00672 void LcsRowScanExecStream::closeImpl()
00673 {
00674     LcsRowScanBaseExecStream::closeImpl();
00675 
00676     for (uint i = 0; i < nFilters; i++) {
00677         filters[i]->filterData.clear();
00678     }
00679 }
00680 
00681 void LcsRowScanExecStream::buildOutputProj(
00682     TupleProjection &outputProj,
00683     LcsRowScanBaseExecStreamParams const &params)
00684 {
00685     LcsRowScanExecStreamParams const &rowScanParams =
00686         dynamic_cast<const LcsRowScanExecStreamParams&>(params);
00687 
00688     /*
00689      * Build a projection that contains filter columns
00690      */
00691     for (uint i = 0; i < rowScanParams.outputProj.size(); i++) {
00692         outputProj.push_back(rowScanParams.outputProj[i]);
00693     }
00694     for (uint i = 0; i < rowScanParams.residualFilterCols.size(); i++) {
00695         uint j;
00696         for (j = 0; j < rowScanParams.outputProj.size(); j++) {
00697             if (rowScanParams.outputProj[j] ==
00698                 rowScanParams.residualFilterCols[i])
00699             {
00700                 break;
00701             }
00702         }
00703 
00704         if (j >= rowScanParams.outputProj.size()) {
00705             outputProj.push_back(rowScanParams.residualFilterCols[i]);
00706         }
00707     }
00708 }
00709 
00710 
00711 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $");
00712 
00713 // End LcsRowScanExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1