ReshapeExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/tuple/StandardTypeDescriptor.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ReshapeExecStream.h"
00028 
00029 #include <hash_set>
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $");
00032 
00033 void ReshapeExecStream::prepare(ReshapeExecStreamParams const &params)
00034 {
00035     ConduitExecStream::prepare(params);
00036 
00037     TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc();
00038     TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor();
00039     dynamicParameters.assign(
00040         params.dynamicParameters.begin(),
00041         params.dynamicParameters.end());
00042 
00043     compOp = params.compOp;
00044     if (compOp != COMP_NOOP) {
00045         initCompareData(params, inputDesc, inputAccessor);
00046     }
00047 
00048     // Setup the output projection that projects from the input tuple
00049     outputProjAccessor.bind(inputAccessor, params.outputProj);
00050     inputOutputDesc.projectFrom(inputDesc, params.outputProj);
00051 
00052     // Setup the output descriptor and data
00053     outputDesc = pOutAccessor->getTupleDesc();
00054     outputData.compute(outputDesc);
00055 
00056     // Determine how many of the dynamic parameters need to be written into
00057     // the output tuple
00058     uint numOutputDynParams = 0;
00059     for (uint i = 0; i < dynamicParameters.size(); i++) {
00060         if (dynamicParameters[i].outputParam) {
00061             numOutputDynParams++;
00062         }
00063     }
00064 
00065     // Setup the output descriptor and data that excludes dynamic parameters
00066     // in the output tuple
00067     assert(inputOutputDesc.size() == outputDesc.size() - numOutputDynParams);
00068     TupleDescriptor partialOutputDesc;
00069     if (numOutputDynParams == 0) {
00070         partialOutputDesc = outputDesc;
00071     } else if (inputOutputDesc.size() > 0) {
00072         partialOutputDesc.resize(inputOutputDesc.size());
00073         std::copy(
00074             outputDesc.begin(),
00075             outputDesc.end() - numOutputDynParams,
00076             partialOutputDesc.begin());
00077     }
00078 
00079     // determine if simple casting is required
00080     castRequired = (inputOutputDesc != partialOutputDesc);
00081     if (castRequired) {
00082         TupleProjection proj;
00083         if (compOp == COMP_NE) {
00084             proj = params.inputCompareProj;
00085         }
00086         assert(checkCastTypes(proj, inputOutputDesc, partialOutputDesc));
00087         inputOutputData.compute(inputOutputDesc);
00088     }
00089 }
00090 
00091 void ReshapeExecStream::initCompareData(
00092     ReshapeExecStreamParams const &params,
00093     TupleDescriptor const &inputDesc,
00094     TupleAccessor const &inputAccessor)
00095 {
00096     // Setup the comparison tuple descriptor
00097     assert(params.inputCompareProj.size() > 0);
00098     TupleProjection inputCompareProj = params.inputCompareProj;
00099     compTupleDesc.projectFrom(inputDesc, inputCompareProj);
00100     // Adjust the descriptor to allow NULLs in case we're filtering out NULLs
00101     for (uint i = 0; i < compTupleDesc.size(); i++) {
00102         compTupleDesc[i].isNullable = true;
00103     }
00104 
00105     // Setup the projection of the columns for comparison
00106     inputCompareProjAccessor.bind(inputAccessor, inputCompareProj);
00107     inputCompareData.compute(compTupleDesc);
00108 
00109     // Setup a descriptor that excludes the dynamic parameters that will
00110     // be used in comparisons, if there are dynamic parameters.  The dynamic
00111     // parameters appear at the end of the descriptor.
00112     TupleDescriptor partialCompTupleDesc;
00113     numCompDynParams = 0;
00114     for (uint i = 0; i < dynamicParameters.size(); i++) {
00115         if (!isMAXU(dynamicParameters[i].compareOffset)) {
00116             numCompDynParams++;
00117         }
00118     }
00119     if (numCompDynParams > 0) {
00120         partialCompTupleDesc.resize(compTupleDesc.size() - numCompDynParams);
00121         std::copy(
00122             compTupleDesc.begin(),
00123             compTupleDesc.end() - numCompDynParams,
00124             partialCompTupleDesc.begin());
00125     }
00126 
00127     paramCompareData.compute(compTupleDesc);
00128     if (numCompDynParams == 0) {
00129         copyCompareTuple(
00130             compTupleDesc,
00131             paramCompareData,
00132             params.pCompTupleBuffer.get());
00133     } else if (partialCompTupleDesc.size() > 0) {
00134         TupleData partialCompareData;
00135         partialCompareData.compute(partialCompTupleDesc);
00136         copyCompareTuple(
00137             partialCompTupleDesc,
00138             partialCompareData,
00139             params.pCompTupleBuffer.get());
00140 
00141         // Copy the partial tuple data to the tuple data that will
00142         // be used in the actual comparisons
00143         std::copy(
00144             partialCompareData.begin(),
00145             partialCompareData.end(),
00146             paramCompareData.begin());
00147     }
00148 
00149     // Setup a tuple projection to project the last key for use in
00150     // non-equality comparisons
00151     lastKey.push_back(paramCompareData.size() - 1);
00152     lastKeyDesc.projectFrom(compTupleDesc, lastKey);
00153 }
00154 
00155 void ReshapeExecStream::copyCompareTuple(
00156     TupleDescriptor const &tupleDesc,
00157     TupleData &tupleData,
00158     PBuffer tupleBuffer)
00159 {
00160     TupleAccessor tupleAccessor;
00161     tupleAccessor.compute(tupleDesc);
00162     tupleAccessor.setCurrentTupleBuf(tupleBuffer);
00163     uint nBytes = tupleAccessor.getCurrentByteCount();
00164     compTupleBuffer.reset(new FixedBuffer[nBytes]);
00165     memcpy(compTupleBuffer.get(), tupleBuffer, nBytes);
00166     tupleAccessor.setCurrentTupleBuf(compTupleBuffer.get());
00167     tupleAccessor.unmarshal(tupleData);
00168 }
00169 
00170 bool ReshapeExecStream::checkCastTypes(
00171     const TupleProjection &compareProj,
00172     const TupleDescriptor &inputTupleDesc,
00173     const TupleDescriptor &outputTupleDesc)
00174 {
00175     for (uint i = 0; i < inputTupleDesc.size(); i++) {
00176         if (!(inputTupleDesc[i] == outputTupleDesc[i])) {
00177             // only allow not nullable -> nullable, unless nulls are being
00178             // filtered out from that column
00179             if (inputTupleDesc[i].isNullable &&
00180                 !outputTupleDesc[i].isNullable)
00181             {
00182                 assert(nullFilter(compareProj, i));
00183             } else {
00184                 assert(
00185                     (inputTupleDesc[i].isNullable ==
00186                         outputTupleDesc[i].isNullable)
00187                     || (!inputTupleDesc[i].isNullable
00188                         && outputTupleDesc[i].isNullable));
00189             }
00190             StoredTypeDescriptor::Ordinal inputType =
00191                 inputTupleDesc[i].pTypeDescriptor->getOrdinal();
00192             StoredTypeDescriptor::Ordinal outputType =
00193                 outputTupleDesc[i].pTypeDescriptor->getOrdinal();
00194 
00195             // can't convert between unicode and non-unicode;
00196             // normalize types to non-unicode to make other checks
00197             // easier, but verify that either both or neither are unicode
00198             bool inputUnicode = false;
00199             if (inputType == STANDARD_TYPE_UNICODE_CHAR) {
00200                 inputType = STANDARD_TYPE_CHAR;
00201                 inputUnicode = true;
00202             }
00203             if (inputType == STANDARD_TYPE_UNICODE_VARCHAR) {
00204                 inputType = STANDARD_TYPE_VARCHAR;
00205                 inputUnicode = true;
00206             }
00207 
00208             bool outputUnicode = false;
00209             if (outputType == STANDARD_TYPE_UNICODE_CHAR) {
00210                 outputType = STANDARD_TYPE_CHAR;
00211                 outputUnicode = true;
00212             }
00213             if (outputType == STANDARD_TYPE_UNICODE_VARCHAR) {
00214                 outputType = STANDARD_TYPE_VARCHAR;
00215                 outputUnicode = true;
00216             }
00217 
00218             if (inputUnicode || outputUnicode) {
00219                 assert(inputUnicode && outputUnicode);
00220             }
00221 
00222             if (inputType != outputType) {
00223                 // if types are different, must be casting from char to
00224                 // varchar
00225                 assert(
00226                     (inputType == STANDARD_TYPE_CHAR)
00227                     && (outputType == STANDARD_TYPE_VARCHAR));
00228             }
00229             if (inputTupleDesc[i].cbStorage != outputTupleDesc[i].cbStorage) {
00230                 // if lengths are different, must be casting from char or
00231                 // varchar to varchar
00232                 assert(
00233                     ((inputType == STANDARD_TYPE_VARCHAR)
00234                         || (inputType == STANDARD_TYPE_CHAR))
00235                     && (outputType == STANDARD_TYPE_VARCHAR));
00236             }
00237         }
00238     }
00239     return true;
00240 }
00241 
00242 bool ReshapeExecStream::nullFilter(
00243     const TupleProjection &compareProj, uint colno)
00244 {
00245     for (uint i = 0; i < compareProj.size(); i++) {
00246         if (compareProj[i] == colno) {
00247             if (!paramCompareData[i].pData) {
00248                 return true;
00249             } else {
00250                 break;
00251             }
00252         }
00253     }
00254     return false;
00255 }
00256 
00257 void ReshapeExecStream::open(bool restart)
00258 {
00259     ConduitExecStream::open(restart);
00260     producePending = false;
00261     paramsRead = false;
00262 }
00263 
00264 ExecStreamResult ReshapeExecStream::execute(
00265     ExecStreamQuantum const &quantum)
00266 {
00267     if (!paramsRead) {
00268         readDynamicParams();
00269         paramsRead = true;
00270     }
00271 
00272     ExecStreamResult rc = precheckConduitBuffers();
00273     if (rc != EXECRC_YIELD) {
00274         return rc;
00275     }
00276 
00277     if (producePending) {
00278         if (!pOutAccessor->produceTuple(outputData)) {
00279             return EXECRC_BUF_OVERFLOW;
00280         }
00281         pInAccessor->consumeTuple();
00282         producePending = false;
00283     }
00284 
00285     for (uint i = 0; i < quantum.nTuplesMax; i++) {
00286         if (!pInAccessor->demandData()) {
00287             return EXECRC_BUF_UNDERFLOW;
00288         }
00289 
00290         pInAccessor->accessConsumptionTuple();
00291 
00292         // filter the data, if filtering criteria provided
00293         if (compOp != COMP_NOOP) {
00294             bool pass = compareInput();
00295             if (!pass) {
00296                 pInAccessor->consumeTuple();
00297                 continue;
00298             }
00299         }
00300 
00301         if (castRequired) {
00302             castOutput();
00303         } else {
00304             outputProjAccessor.unmarshal(outputData);
00305         }
00306         producePending = true;
00307         if (!pOutAccessor->produceTuple(outputData)) {
00308             return EXECRC_BUF_OVERFLOW;
00309         }
00310         producePending = false;
00311         pInAccessor->consumeTuple();
00312     }
00313 
00314     return EXECRC_QUANTUM_EXPIRED;
00315 }
00316 
00317 void ReshapeExecStream::readDynamicParams()
00318 {
00319     uint currCompIdx = paramCompareData.size() - numCompDynParams;
00320     uint currOutputIdx = inputOutputDesc.size();
00321     for (uint i = 0; i < dynamicParameters.size(); i++) {
00322         if (!isMAXU(dynamicParameters[i].compareOffset)) {
00323             TupleDatum const &param =
00324                 pDynamicParamManager->getParam(
00325                     dynamicParameters[i].dynamicParamId).getDatum();
00326             paramCompareData[currCompIdx++] = param;
00327         }
00328         if (dynamicParameters[i].outputParam) {
00329             TupleDatum const &param =
00330                 pDynamicParamManager->getParam(
00331                     dynamicParameters[i].dynamicParamId).getDatum();
00332             outputData[currOutputIdx++] = param;
00333         }
00334     }
00335 }
00336 
00337 bool ReshapeExecStream::compareInput()
00338 {
00339     inputCompareProjAccessor.unmarshal(inputCompareData);
00340     int rc;
00341 
00342     // if the comparison is non-equality, first compare the first n-1 keys
00343     // for equality; if those keys are equal, then do the non-equality
00344     // comparison on just the last key
00345     if (compOp == COMP_EQ) {
00346         rc = compTupleDesc.compareTuples(inputCompareData, paramCompareData);
00347     } else {
00348         rc =
00349             compTupleDesc.compareTuplesKey(
00350                 inputCompareData, paramCompareData,
00351                 paramCompareData.size() - 1);
00352         if (rc != 0) {
00353             return false;
00354         }
00355         // ignore NULLs if doing a comparison against a non-NULL value
00356         if (!paramCompareData[paramCompareData.size() - 1].pData) {
00357             rc =
00358                 lastKeyDesc.compareTuples(
00359                     inputCompareData, lastKey, paramCompareData, lastKey);
00360         } else {
00361             bool containsNullKey;
00362             rc =
00363                 lastKeyDesc.compareTuples(
00364                     inputCompareData, lastKey, paramCompareData, lastKey,
00365                     &containsNullKey);
00366             if (containsNullKey) {
00367                 return false;
00368             }
00369         }
00370     }
00371 
00372     bool pass;
00373     switch (compOp) {
00374     case COMP_EQ:
00375         pass = (rc == 0);
00376         break;
00377     case COMP_NE:
00378         pass = (rc != 0);
00379         break;
00380     case COMP_LT:
00381         pass = (rc < 0);
00382         break;
00383     case COMP_LE:
00384         pass = (rc <= 0);
00385         break;
00386     case COMP_GT:
00387         pass = (rc > 0);
00388         break;
00389     case COMP_GE:
00390         pass = (rc >= 0);
00391         break;
00392     default:
00393         pass = false;
00394         permAssert(false);
00395     }
00396     return pass;
00397 }
00398 
00399 void ReshapeExecStream::castOutput()
00400 {
00401     outputProjAccessor.unmarshal(inputOutputData);
00402     for (uint i = 0; i < inputOutputData.size(); i++) {
00403         // truncate value if it exceeds the destination size
00404         uint len = std::min(
00405             inputOutputData[i].cbData, outputDesc[i].cbStorage);
00406         outputData[i].cbData = len;
00407         if (inputOutputData[i].pData) {
00408             outputData[i].pData = inputOutputData[i].pData;
00409         } else {
00410             outputData[i].pData = NULL;
00411         }
00412     }
00413 }
00414 
00415 void ReshapeExecStream::closeImpl()
00416 {
00417     ConduitExecStream::closeImpl();
00418 }
00419 
00420 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $");
00421 
00422 // End ReshapeExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1