00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/tuple/StandardTypeDescriptor.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ReshapeExecStream.h"
00028
00029 #include <hash_set>
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $");
00032
00033 void ReshapeExecStream::prepare(ReshapeExecStreamParams const ¶ms)
00034 {
00035 ConduitExecStream::prepare(params);
00036
00037 TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc();
00038 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor();
00039 dynamicParameters.assign(
00040 params.dynamicParameters.begin(),
00041 params.dynamicParameters.end());
00042
00043 compOp = params.compOp;
00044 if (compOp != COMP_NOOP) {
00045 initCompareData(params, inputDesc, inputAccessor);
00046 }
00047
00048
00049 outputProjAccessor.bind(inputAccessor, params.outputProj);
00050 inputOutputDesc.projectFrom(inputDesc, params.outputProj);
00051
00052
00053 outputDesc = pOutAccessor->getTupleDesc();
00054 outputData.compute(outputDesc);
00055
00056
00057
00058 uint numOutputDynParams = 0;
00059 for (uint i = 0; i < dynamicParameters.size(); i++) {
00060 if (dynamicParameters[i].outputParam) {
00061 numOutputDynParams++;
00062 }
00063 }
00064
00065
00066
00067 assert(inputOutputDesc.size() == outputDesc.size() - numOutputDynParams);
00068 TupleDescriptor partialOutputDesc;
00069 if (numOutputDynParams == 0) {
00070 partialOutputDesc = outputDesc;
00071 } else if (inputOutputDesc.size() > 0) {
00072 partialOutputDesc.resize(inputOutputDesc.size());
00073 std::copy(
00074 outputDesc.begin(),
00075 outputDesc.end() - numOutputDynParams,
00076 partialOutputDesc.begin());
00077 }
00078
00079
00080 castRequired = (inputOutputDesc != partialOutputDesc);
00081 if (castRequired) {
00082 TupleProjection proj;
00083 if (compOp == COMP_NE) {
00084 proj = params.inputCompareProj;
00085 }
00086 assert(checkCastTypes(proj, inputOutputDesc, partialOutputDesc));
00087 inputOutputData.compute(inputOutputDesc);
00088 }
00089 }
00090
00091 void ReshapeExecStream::initCompareData(
00092 ReshapeExecStreamParams const ¶ms,
00093 TupleDescriptor const &inputDesc,
00094 TupleAccessor const &inputAccessor)
00095 {
00096
00097 assert(params.inputCompareProj.size() > 0);
00098 TupleProjection inputCompareProj = params.inputCompareProj;
00099 compTupleDesc.projectFrom(inputDesc, inputCompareProj);
00100
00101 for (uint i = 0; i < compTupleDesc.size(); i++) {
00102 compTupleDesc[i].isNullable = true;
00103 }
00104
00105
00106 inputCompareProjAccessor.bind(inputAccessor, inputCompareProj);
00107 inputCompareData.compute(compTupleDesc);
00108
00109
00110
00111
00112 TupleDescriptor partialCompTupleDesc;
00113 numCompDynParams = 0;
00114 for (uint i = 0; i < dynamicParameters.size(); i++) {
00115 if (!isMAXU(dynamicParameters[i].compareOffset)) {
00116 numCompDynParams++;
00117 }
00118 }
00119 if (numCompDynParams > 0) {
00120 partialCompTupleDesc.resize(compTupleDesc.size() - numCompDynParams);
00121 std::copy(
00122 compTupleDesc.begin(),
00123 compTupleDesc.end() - numCompDynParams,
00124 partialCompTupleDesc.begin());
00125 }
00126
00127 paramCompareData.compute(compTupleDesc);
00128 if (numCompDynParams == 0) {
00129 copyCompareTuple(
00130 compTupleDesc,
00131 paramCompareData,
00132 params.pCompTupleBuffer.get());
00133 } else if (partialCompTupleDesc.size() > 0) {
00134 TupleData partialCompareData;
00135 partialCompareData.compute(partialCompTupleDesc);
00136 copyCompareTuple(
00137 partialCompTupleDesc,
00138 partialCompareData,
00139 params.pCompTupleBuffer.get());
00140
00141
00142
00143 std::copy(
00144 partialCompareData.begin(),
00145 partialCompareData.end(),
00146 paramCompareData.begin());
00147 }
00148
00149
00150
00151 lastKey.push_back(paramCompareData.size() - 1);
00152 lastKeyDesc.projectFrom(compTupleDesc, lastKey);
00153 }
00154
00155 void ReshapeExecStream::copyCompareTuple(
00156 TupleDescriptor const &tupleDesc,
00157 TupleData &tupleData,
00158 PBuffer tupleBuffer)
00159 {
00160 TupleAccessor tupleAccessor;
00161 tupleAccessor.compute(tupleDesc);
00162 tupleAccessor.setCurrentTupleBuf(tupleBuffer);
00163 uint nBytes = tupleAccessor.getCurrentByteCount();
00164 compTupleBuffer.reset(new FixedBuffer[nBytes]);
00165 memcpy(compTupleBuffer.get(), tupleBuffer, nBytes);
00166 tupleAccessor.setCurrentTupleBuf(compTupleBuffer.get());
00167 tupleAccessor.unmarshal(tupleData);
00168 }
00169
00170 bool ReshapeExecStream::checkCastTypes(
00171 const TupleProjection &compareProj,
00172 const TupleDescriptor &inputTupleDesc,
00173 const TupleDescriptor &outputTupleDesc)
00174 {
00175 for (uint i = 0; i < inputTupleDesc.size(); i++) {
00176 if (!(inputTupleDesc[i] == outputTupleDesc[i])) {
00177
00178
00179 if (inputTupleDesc[i].isNullable &&
00180 !outputTupleDesc[i].isNullable)
00181 {
00182 assert(nullFilter(compareProj, i));
00183 } else {
00184 assert(
00185 (inputTupleDesc[i].isNullable ==
00186 outputTupleDesc[i].isNullable)
00187 || (!inputTupleDesc[i].isNullable
00188 && outputTupleDesc[i].isNullable));
00189 }
00190 StoredTypeDescriptor::Ordinal inputType =
00191 inputTupleDesc[i].pTypeDescriptor->getOrdinal();
00192 StoredTypeDescriptor::Ordinal outputType =
00193 outputTupleDesc[i].pTypeDescriptor->getOrdinal();
00194
00195
00196
00197
00198 bool inputUnicode = false;
00199 if (inputType == STANDARD_TYPE_UNICODE_CHAR) {
00200 inputType = STANDARD_TYPE_CHAR;
00201 inputUnicode = true;
00202 }
00203 if (inputType == STANDARD_TYPE_UNICODE_VARCHAR) {
00204 inputType = STANDARD_TYPE_VARCHAR;
00205 inputUnicode = true;
00206 }
00207
00208 bool outputUnicode = false;
00209 if (outputType == STANDARD_TYPE_UNICODE_CHAR) {
00210 outputType = STANDARD_TYPE_CHAR;
00211 outputUnicode = true;
00212 }
00213 if (outputType == STANDARD_TYPE_UNICODE_VARCHAR) {
00214 outputType = STANDARD_TYPE_VARCHAR;
00215 outputUnicode = true;
00216 }
00217
00218 if (inputUnicode || outputUnicode) {
00219 assert(inputUnicode && outputUnicode);
00220 }
00221
00222 if (inputType != outputType) {
00223
00224
00225 assert(
00226 (inputType == STANDARD_TYPE_CHAR)
00227 && (outputType == STANDARD_TYPE_VARCHAR));
00228 }
00229 if (inputTupleDesc[i].cbStorage != outputTupleDesc[i].cbStorage) {
00230
00231
00232 assert(
00233 ((inputType == STANDARD_TYPE_VARCHAR)
00234 || (inputType == STANDARD_TYPE_CHAR))
00235 && (outputType == STANDARD_TYPE_VARCHAR));
00236 }
00237 }
00238 }
00239 return true;
00240 }
00241
00242 bool ReshapeExecStream::nullFilter(
00243 const TupleProjection &compareProj, uint colno)
00244 {
00245 for (uint i = 0; i < compareProj.size(); i++) {
00246 if (compareProj[i] == colno) {
00247 if (!paramCompareData[i].pData) {
00248 return true;
00249 } else {
00250 break;
00251 }
00252 }
00253 }
00254 return false;
00255 }
00256
00257 void ReshapeExecStream::open(bool restart)
00258 {
00259 ConduitExecStream::open(restart);
00260 producePending = false;
00261 paramsRead = false;
00262 }
00263
00264 ExecStreamResult ReshapeExecStream::execute(
00265 ExecStreamQuantum const &quantum)
00266 {
00267 if (!paramsRead) {
00268 readDynamicParams();
00269 paramsRead = true;
00270 }
00271
00272 ExecStreamResult rc = precheckConduitBuffers();
00273 if (rc != EXECRC_YIELD) {
00274 return rc;
00275 }
00276
00277 if (producePending) {
00278 if (!pOutAccessor->produceTuple(outputData)) {
00279 return EXECRC_BUF_OVERFLOW;
00280 }
00281 pInAccessor->consumeTuple();
00282 producePending = false;
00283 }
00284
00285 for (uint i = 0; i < quantum.nTuplesMax; i++) {
00286 if (!pInAccessor->demandData()) {
00287 return EXECRC_BUF_UNDERFLOW;
00288 }
00289
00290 pInAccessor->accessConsumptionTuple();
00291
00292
00293 if (compOp != COMP_NOOP) {
00294 bool pass = compareInput();
00295 if (!pass) {
00296 pInAccessor->consumeTuple();
00297 continue;
00298 }
00299 }
00300
00301 if (castRequired) {
00302 castOutput();
00303 } else {
00304 outputProjAccessor.unmarshal(outputData);
00305 }
00306 producePending = true;
00307 if (!pOutAccessor->produceTuple(outputData)) {
00308 return EXECRC_BUF_OVERFLOW;
00309 }
00310 producePending = false;
00311 pInAccessor->consumeTuple();
00312 }
00313
00314 return EXECRC_QUANTUM_EXPIRED;
00315 }
00316
00317 void ReshapeExecStream::readDynamicParams()
00318 {
00319 uint currCompIdx = paramCompareData.size() - numCompDynParams;
00320 uint currOutputIdx = inputOutputDesc.size();
00321 for (uint i = 0; i < dynamicParameters.size(); i++) {
00322 if (!isMAXU(dynamicParameters[i].compareOffset)) {
00323 TupleDatum const ¶m =
00324 pDynamicParamManager->getParam(
00325 dynamicParameters[i].dynamicParamId).getDatum();
00326 paramCompareData[currCompIdx++] = param;
00327 }
00328 if (dynamicParameters[i].outputParam) {
00329 TupleDatum const ¶m =
00330 pDynamicParamManager->getParam(
00331 dynamicParameters[i].dynamicParamId).getDatum();
00332 outputData[currOutputIdx++] = param;
00333 }
00334 }
00335 }
00336
00337 bool ReshapeExecStream::compareInput()
00338 {
00339 inputCompareProjAccessor.unmarshal(inputCompareData);
00340 int rc;
00341
00342
00343
00344
00345 if (compOp == COMP_EQ) {
00346 rc = compTupleDesc.compareTuples(inputCompareData, paramCompareData);
00347 } else {
00348 rc =
00349 compTupleDesc.compareTuplesKey(
00350 inputCompareData, paramCompareData,
00351 paramCompareData.size() - 1);
00352 if (rc != 0) {
00353 return false;
00354 }
00355
00356 if (!paramCompareData[paramCompareData.size() - 1].pData) {
00357 rc =
00358 lastKeyDesc.compareTuples(
00359 inputCompareData, lastKey, paramCompareData, lastKey);
00360 } else {
00361 bool containsNullKey;
00362 rc =
00363 lastKeyDesc.compareTuples(
00364 inputCompareData, lastKey, paramCompareData, lastKey,
00365 &containsNullKey);
00366 if (containsNullKey) {
00367 return false;
00368 }
00369 }
00370 }
00371
00372 bool pass;
00373 switch (compOp) {
00374 case COMP_EQ:
00375 pass = (rc == 0);
00376 break;
00377 case COMP_NE:
00378 pass = (rc != 0);
00379 break;
00380 case COMP_LT:
00381 pass = (rc < 0);
00382 break;
00383 case COMP_LE:
00384 pass = (rc <= 0);
00385 break;
00386 case COMP_GT:
00387 pass = (rc > 0);
00388 break;
00389 case COMP_GE:
00390 pass = (rc >= 0);
00391 break;
00392 default:
00393 pass = false;
00394 permAssert(false);
00395 }
00396 return pass;
00397 }
00398
00399 void ReshapeExecStream::castOutput()
00400 {
00401 outputProjAccessor.unmarshal(inputOutputData);
00402 for (uint i = 0; i < inputOutputData.size(); i++) {
00403
00404 uint len = std::min(
00405 inputOutputData[i].cbData, outputDesc[i].cbStorage);
00406 outputData[i].cbData = len;
00407 if (inputOutputData[i].pData) {
00408 outputData[i].pData = inputOutputData[i].pData;
00409 } else {
00410 outputData[i].pData = NULL;
00411 }
00412 }
00413 }
00414
00415 void ReshapeExecStream::closeImpl()
00416 {
00417 ConduitExecStream::closeImpl();
00418 }
00419
00420 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $");
00421
00422