00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/ftrs/BTreeSearchExecStream.h"
00026 #include "fennel/btree/BTreeReader.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreeSearchExecStream.cpp#15 $");
00031
00032 void BTreeSearchExecStream::prepare(BTreeSearchExecStreamParams const ¶ms)
00033 {
00034 BTreeReadExecStream::prepare(params);
00035 ConduitExecStream::prepare(params);
00036
00037 leastUpper = true;
00038 outerJoin = params.outerJoin;
00039 searchKeyParams = params.searchKeyParams;
00040
00041
00042
00043 TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc();
00044
00045 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor();
00046
00047 if (params.inputDirectiveProj.size()) {
00048 assert(params.inputDirectiveProj.size() == 2);
00049
00050
00051
00052
00053 assert(params.inputKeyProj.size() > 0);
00054 assert((params.inputKeyProj.size() % 2) == 0);
00055 directiveAccessor.bind(inputAccessor, params.inputDirectiveProj);
00056 TupleDescriptor inputDirectiveDesc;
00057 inputDirectiveDesc.projectFrom(inputDesc, params.inputDirectiveProj);
00058
00059
00060 StandardTypeDescriptorFactory stdTypeFactory;
00061 TupleAttributeDescriptor expectedDirectiveDesc(
00062 stdTypeFactory.newDataType(STANDARD_TYPE_CHAR));
00063 expectedDirectiveDesc.cbStorage = 1;
00064 assert(
00065 inputDirectiveDesc[LOWER_BOUND_DIRECTIVE] == expectedDirectiveDesc);
00066 assert(
00067 inputDirectiveDesc[UPPER_BOUND_DIRECTIVE] == expectedDirectiveDesc);
00068
00069 directiveData.compute(inputDirectiveDesc);
00070 }
00071
00072 if (params.inputKeyProj.size()) {
00073 TupleProjection inputKeyProj = params.inputKeyProj;
00074 if (params.inputDirectiveProj.size()) {
00075
00076
00077 TupleProjection upperBoundProj;
00078 int n = inputKeyProj.size() / 2;
00079
00080 upperBoundProj.resize(n);
00081
00082 std::copy(
00083 inputKeyProj.begin() + n,
00084 inputKeyProj.end(),
00085 upperBoundProj.begin());
00086
00087 inputKeyProj.resize(n);
00088
00089 upperBoundAccessor.bind(inputAccessor, upperBoundProj);
00090 upperBoundDesc.projectFrom(inputDesc, upperBoundProj);
00091 upperBoundData.compute(upperBoundDesc);
00092
00093 assert(
00094 searchKeyParams.size() == 0 ||
00095 (searchKeyParams.size() >= (n-1)*2+1 &&
00096 searchKeyParams.size() <= n*2));
00097 } else {
00098 assert(searchKeyParams.size() == 0);
00099 }
00100 inputKeyAccessor.bind(inputAccessor,inputKeyProj);
00101 inputKeyDesc.projectFrom(inputDesc,inputKeyProj);
00102 } else {
00103 inputKeyDesc = inputDesc;
00104 assert(searchKeyParams.size() == 0);
00105 }
00106 inputKeyData.compute(inputKeyDesc);
00107
00108 if (upperBoundDesc.size()) {
00109
00110
00111 assert(upperBoundDesc == inputKeyDesc);
00112 }
00113
00114 preFilterNulls = false;
00115 if ((outerJoin && inputKeyDesc.containsNullable()) ||
00116 searchKeyParams.size() > 0)
00117 {
00118
00119
00120
00121 preFilterNulls = true;
00122
00123
00124
00125
00126 if (searchKeyParams.size() == 0) {
00127 for (uint i = 0; i < inputKeyData.size(); i++) {
00128 searchKeyProj.push_back(i);
00129 upperBoundKeyProj.push_back(i);
00130 }
00131 }
00132 }
00133
00134 inputJoinAccessor.bind(inputAccessor,params.inputJoinProj);
00135
00136 TupleDescriptor joinDescriptor;
00137 joinDescriptor.projectFrom(inputDesc,params.inputJoinProj);
00138
00139 TupleProjection readerKeyProj = treeDescriptor.keyProjection;
00140 readerKeyProj.resize(inputKeyDesc.size());
00141 readerKeyData.compute(inputKeyDesc);
00142
00143 nJoinAttributes = params.outputTupleDesc.size() - params.outputProj.size();
00144 }
00145
00146 void BTreeSearchExecStream::open(bool restart)
00147 {
00148
00149
00150 if (!restart && opaqueToInt(rootPageIdParamId) > 0) {
00151 treeDescriptor.rootPageId =
00152 *reinterpret_cast<PageId const *>(
00153 pDynamicParamManager->getParam(rootPageIdParamId).getDatum().pData);
00154 }
00155 BTreeReadExecStream::open(restart);
00156 ConduitExecStream::open(restart);
00157 dynamicKeysRead = false;
00158
00159 if (restart) {
00160 return;
00161 }
00162
00163
00164 TupleProjection readerKeyProj = treeDescriptor.keyProjection;
00165 readerKeyProj.resize(inputKeyDesc.size());
00166 readerKeyAccessor.bind(
00167 pReader->getTupleAccessorForRead(),
00168 readerKeyProj);
00169 }
00170
00171 ExecStreamResult BTreeSearchExecStream::execute(
00172 ExecStreamQuantum const &quantum)
00173 {
00174 ExecStreamResult rc = precheckConduitBuffers();
00175 if (rc != EXECRC_YIELD) {
00176 return rc;
00177 }
00178
00179 uint nTuples = 0;
00180 assert(quantum.nTuplesMax > 0);
00181
00182
00183 for (;;) {
00184 if (!innerSearchLoop()) {
00185 return EXECRC_BUF_UNDERFLOW;
00186 }
00187
00188
00189 rc = innerFetchLoop(quantum, nTuples);
00190 if (rc == EXECRC_YIELD) {
00191 pInAccessor->consumeTuple();
00192 } else {
00193 return rc;
00194 }
00195 }
00196 }
00197
00198 bool BTreeSearchExecStream::innerSearchLoop()
00199 {
00200 while (!pReader->isPositioned()) {
00201 if (!pInAccessor->demandData()) {
00202 return false;
00203 }
00204
00205 readSearchKey();
00206 readDirectives();
00207 pSearchKey = &inputKeyData;
00208 readUpperBoundKey();
00209 if (!searchForKey()) {
00210 pInAccessor->consumeTuple();
00211 }
00212 }
00213 return true;
00214 }
00215
00216 void BTreeSearchExecStream::readSearchKey()
00217 {
00218
00219
00220
00221 TupleAccessor &inputAccessor =
00222 pInAccessor->accessConsumptionTuple();
00223
00224 if (searchKeyParams.size() == 0) {
00225 if (inputKeyAccessor.size()) {
00226
00227 inputKeyAccessor.unmarshal(inputKeyData);
00228 } else {
00229
00230 inputAccessor.unmarshal(inputKeyData);
00231 }
00232 } else {
00233
00234
00235 assert(!dynamicKeysRead);
00236
00237
00238
00239
00240
00241 uint nParams = searchKeyParams.size();
00242 searchKeyProj.clear();
00243 for (uint i = 0; i < nParams / 2; i++) {
00244 inputKeyData[searchKeyParams[i].keyOffset] =
00245 pDynamicParamManager->getParam(
00246 searchKeyParams[i].dynamicParamId).getDatum();
00247 searchKeyProj.push_back(i);
00248 }
00249
00250
00251 if ((nParams%2) && searchKeyParams[nParams/2].keyOffset == nParams/2) {
00252 inputKeyData[nParams / 2] =
00253 pDynamicParamManager->getParam(
00254 searchKeyParams[nParams / 2].dynamicParamId).getDatum();
00255
00256
00257
00258 searchKeyProj.push_back(nParams / 2);
00259 }
00260
00261 dynamicKeysRead = true;
00262 }
00263 }
00264
00265 void BTreeSearchExecStream::readDirectives()
00266 {
00267 if (!directiveAccessor.size()) {
00268
00269 lowerBoundDirective = SEARCH_CLOSED_LOWER;
00270 upperBoundDirective = SEARCH_CLOSED_UPPER;
00271 return;
00272 }
00273
00274 directiveAccessor.unmarshal(directiveData);
00275
00276
00277 assert(directiveData[LOWER_BOUND_DIRECTIVE].pData);
00278 assert(directiveData[UPPER_BOUND_DIRECTIVE].pData);
00279
00280 lowerBoundDirective =
00281 SearchEndpoint(*(directiveData[LOWER_BOUND_DIRECTIVE].pData));
00282 upperBoundDirective =
00283 SearchEndpoint(*(directiveData[UPPER_BOUND_DIRECTIVE].pData));
00284 }
00285
00286 bool BTreeSearchExecStream::searchForKey()
00287 {
00288 switch (lowerBoundDirective) {
00289 case SEARCH_UNBOUNDED_LOWER:
00290 if (pSearchKey->size() <= 1) {
00291 pReader->searchFirst();
00292 break;
00293 }
00294
00295
00296
00297 case SEARCH_CLOSED_LOWER:
00298 pReader->searchForKey(*pSearchKey, DUP_SEEK_BEGIN, leastUpper);
00299 break;
00300 case SEARCH_OPEN_LOWER:
00301 pReader->searchForKey(*pSearchKey, DUP_SEEK_END, leastUpper);
00302 break;
00303 default:
00304 permFail(
00305 "unexpected lower bound directive: "
00306 << (char) lowerBoundDirective);
00307 }
00308
00309 bool match = true;
00310 if (preFilterNulls && pSearchKey->containsNull(searchKeyProj)) {
00311
00312
00313
00314 match = false;
00315 } else {
00316 if (pReader->isSingular()) {
00317
00318 match = false;
00319 } else {
00320 if (preFilterNulls &&
00321 upperBoundData.containsNull(upperBoundKeyProj))
00322 {
00323 match = false;
00324 } else {
00325 match = testInterval();
00326 }
00327 }
00328 }
00329
00330 if (!match) {
00331 if (!outerJoin) {
00332 pReader->endSearch();
00333 return false;
00334 }
00335
00336 for (uint i = nJoinAttributes; i < tupleData.size(); ++i) {
00337 tupleData[i].pData = NULL;
00338 }
00339 } else {
00340 projAccessor.unmarshal(
00341 tupleData.begin() + nJoinAttributes);
00342 }
00343
00344
00345 if (inputJoinAccessor.size()) {
00346 inputJoinAccessor.unmarshal(tupleData);
00347 }
00348
00349 return true;
00350 }
00351
00352 void BTreeSearchExecStream::readUpperBoundKey()
00353 {
00354 if (searchKeyParams.size() == 0) {
00355 if (upperBoundDesc.size()) {
00356 upperBoundAccessor.unmarshal(upperBoundData);
00357 } else {
00358 upperBoundData = *pSearchKey;
00359 }
00360 } else {
00361
00362
00363
00364
00365 uint nParams = searchKeyParams.size();
00366
00367
00368
00369
00370
00371 upperBoundKeyProj.clear();
00372 if (!(nParams % 2)
00373 || searchKeyParams[nParams / 2].keyOffset == nParams / 2 + 1)
00374 {
00375 upperBoundData[0] =
00376 pDynamicParamManager->getParam(
00377 searchKeyParams[nParams / 2].dynamicParamId).getDatum();
00378 upperBoundKeyProj.push_back(0);
00379 }
00380 uint keySize = upperBoundData.size();
00381 for (uint i = nParams / 2 + 1; i < nParams; i++) {
00382 upperBoundData[searchKeyParams[i].keyOffset - keySize] =
00383 pDynamicParamManager->getParam(
00384 searchKeyParams[i].dynamicParamId).getDatum();
00385 upperBoundKeyProj.push_back(i - keySize);
00386 }
00387 }
00388 }
00389
00390 bool BTreeSearchExecStream::testInterval()
00391 {
00392 if (upperBoundDirective == SEARCH_UNBOUNDED_UPPER) {
00393
00394
00395
00396 if (pSearchKey->size() > 1) {
00397 readerKeyAccessor.unmarshal(readerKeyData);
00398 int c =
00399 inputKeyDesc.compareTuplesKey(
00400 readerKeyData,
00401 *pSearchKey,
00402 pSearchKey->size() - 1);
00403 if (c != 0) {
00404 return false;
00405 }
00406 }
00407 return true;
00408 } else {
00409 readerKeyAccessor.unmarshal(readerKeyData);
00410 int c = inputKeyDesc.compareTuples(upperBoundData, readerKeyData);
00411 if (upperBoundDirective == SEARCH_CLOSED_UPPER) {
00412
00413
00414
00415
00416 if (!leastUpper && lowerBoundDirective == SEARCH_CLOSED_LOWER &&
00417 pSearchKey->size() > 1 && c > 0)
00418 {
00419 return checkNextKey();
00420 }
00421 if (c >= 0) {
00422 return true;
00423 }
00424 } else {
00425 if (c > 0) {
00426 return true;
00427 }
00428 }
00429 }
00430 return false;
00431 }
00432
00433 bool BTreeSearchExecStream::checkNextKey()
00434 {
00435
00436 if (!pReader->searchNext()) {
00437 return false;
00438 }
00439 readerKeyAccessor.unmarshal(readerKeyData);
00440 int c = inputKeyDesc.compareTuples(upperBoundData, readerKeyData);
00441
00442 assert(c <= 0);
00443 return (c == 0);
00444 }
00445
00446 ExecStreamResult BTreeSearchExecStream::innerFetchLoop(
00447 ExecStreamQuantum const &quantum,
00448 uint &nTuples)
00449 {
00450 for (;;) {
00451 if (nTuples >= quantum.nTuplesMax) {
00452 return EXECRC_QUANTUM_EXPIRED;
00453 }
00454 if (reachedTupleLimit(nTuples)) {
00455 return EXECRC_BUF_OVERFLOW;
00456 }
00457 if (pOutAccessor->produceTuple(tupleData)) {
00458 ++nTuples;
00459 } else {
00460 return EXECRC_BUF_OVERFLOW;
00461 }
00462 if (pReader->searchNext()) {
00463 if (testInterval()) {
00464 projAccessor.unmarshal(
00465 tupleData.begin() + nJoinAttributes);
00466
00467 continue;
00468 }
00469 }
00470 pReader->endSearch();
00471
00472 return EXECRC_YIELD;
00473 }
00474 }
00475
00476 void BTreeSearchExecStream::closeImpl()
00477 {
00478 ConduitExecStream::closeImpl();
00479 BTreeReadExecStream::closeImpl();
00480 }
00481
00482 bool BTreeSearchExecStream::reachedTupleLimit(uint nTuples)
00483 {
00484 return false;
00485 }
00486
00487 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreeSearchExecStream.cpp#15 $");
00488
00489