LhxJoinExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/hashexe/LhxJoinExecStream.cpp#4 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxJoinExecStream.h"
00025 #include "fennel/segment/Segment.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 
00029 using namespace std;
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxJoinExecStream.cpp#4 $");
00032 
00033 void LhxJoinExecStream::prepare(
00034     LhxJoinExecStreamParams const &params)
00035 {
00036     assert (params.leftKeyProj.size() == params.rightKeyProj.size());
00037 
00038     ConfluenceExecStream::prepare(params);
00039 
00040     setJoinType(params);
00041     setHashInfo(params);
00042 
00043     uint numInputs = inAccessors.size();
00044 
00045     inputTuple.reset(new TupleData[2]);
00046     inputTupleSize.reset(new uint[2]);
00047 
00048     for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
00049         inputTuple[inputIndex].compute(
00050             inAccessors[inputIndex]->getTupleDesc());
00051         inputTupleSize[inputIndex] = inputTuple[inputIndex].size();
00052     }
00053 
00054     /*
00055      * Force partitioning level. Only set in tests.
00056      */
00057     forcePartitionLevel = params.forcePartitionLevel;
00058     enableSubPartStat = params.enableSubPartStat;
00059 
00060     /*
00061      * NOTE: currently anti joins that need to remove duplicates can not
00062      * switch join sides(join then
00063      * effectively becomes LeftAnti) because the hash table is used to remove
00064      * duplicated non-matched tuples. The "Anti" side has to be the build side.
00065      * It is difficult, thought not impossible,
00066      * to remove duplicates of non-matched tuples from the probe side.
00067      *
00068      * One approach to solve this is to insert the non-matched probe tuple into
00069      * the hash table, mark it as matched, and return this tuple. Subsequent
00070      * identical probe tuple will see the tuple as a "match" and will not
00071      * return the tuple(hence satisfy the anti join semantics).
00072      * This scheme also works when the hash table overflows. Tuples in the
00073      * hash table, including the probe tuples inserted, will be partitioned
00074      * as child partitions of the build input.
00075      * The remaining probe input, together with all the matched tuples from the
00076      * hash table, will be partitioned to disk as the children of the probe
00077      * input. Note matched tuples are stored in both input.
00078      *
00079      * This partitioning scheme makes sure that the join result is correct
00080      * using the above described LeftAnti join algorithm or the already
00081      * implemented RightAnti join algorithm, regardless of input assignment for
00082      * the next partition level.
00083      *
00084      * RightAnti join without duplicate removal can use swing.
00085      * LeftAnti join without duplicate removal can use swing.
00086      *
00087      * RightAnti join with duplicate removal cannot use swing.
00088      * LeftAnti join with duplicate removal is not supported(see setJoinType())
00089      *
00090      */
00091     bool leftAntiJoin =
00092         (returnProbeOuter() && !returnProbeInner() && !returnBuild());
00093 
00094     bool rightAntiJoin =
00095         (returnBuildOuter() && !returnBuildInner() && !returnProbe());
00096 
00097     bool antiJoin = leftAntiJoin || rightAntiJoin;
00098 
00099     enableSwing = params.enableSwing && (!(antiJoin && setopDistinct));
00100 
00101     /*
00102      * Calculate the number of blocks required to perform the join, as given by
00103      * the optimizer, completely in memory.
00104      */
00105     hashTable.calculateSize(
00106         hashInfo,
00107         DefaultBuildInputIndex,
00108         numBlocksHashTable);
00109 
00110     TupleDescriptor outputDesc;
00111 
00112     if (params.outputProj.size() != 0) {
00113         outputDesc.projectFrom(params.outputTupleDesc, params.outputProj);
00114     } else {
00115         outputDesc = params.outputTupleDesc;
00116     }
00117 
00118     outputTuple.compute(outputDesc);
00119 
00120     assert (outputTuple.size() == (inputTupleSize[0] + inputTupleSize[1]) ||
00121         outputTuple.size() == inputTupleSize[0]||
00122         outputTuple.size() == inputTupleSize[1]);
00123 
00124     pOutAccessor->setTupleShape(outputDesc);
00125 
00126     /*
00127      * Set aside one cache block per child partition writer for I/O
00128      */
00129     numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs;
00130 }
00131 
00132 void LhxJoinExecStream::getResourceRequirements(
00133     ExecStreamResourceQuantity &minQuantity,
00134     ExecStreamResourceQuantity &optQuantity,
00135     ExecStreamResourceSettingType &optType)
00136 {
00137     ConfluenceExecStream::getResourceRequirements(minQuantity,optQuantity);
00138 
00139     uint minPages = LhxHashTable::LhxHashTableMinPages + numMiscCacheBlocks;
00140     minQuantity.nCachePages += minPages;
00141     // if no stats were available, make an unbounded resource request
00142     if (isMAXU(numBlocksHashTable)) {
00143         optType = EXEC_RESOURCE_UNBOUNDED;
00144     } else {
00145         // make sure the opt is bigger than the min; otherwise, the
00146         // resource governor won't try to give it extra
00147         optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable);
00148         optType = EXEC_RESOURCE_ESTIMATE;
00149     }
00150 }
00151 
00152 void LhxJoinExecStream::setResourceAllocation(
00153     ExecStreamResourceQuantity &quantity)
00154 {
00155     ConfluenceExecStream::setResourceAllocation(quantity);
00156     hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks;
00157 }
00158 
00159 void LhxJoinExecStream::open(bool restart)
00160 {
00161     ConfluenceExecStream::open(restart);
00162 
00163     if (restart) {
00164         hashTable.releaseResources();
00165     };
00166 
00167     uint partitionLevel = 0;
00168 
00169     /*
00170      * Create the root plan.
00171      *
00172      * The execute state machine operates at the plan level.
00173      */
00174     probePart = SharedLhxPartition(new LhxPartition(this));
00175     buildPart = SharedLhxPartition(new LhxPartition(this));
00176 
00177     (probePart->segStream).reset();
00178     probePart->inputIndex = DefaultProbeInputIndex;
00179 
00180     (buildPart->segStream).reset();
00181     buildPart->inputIndex = DefaultBuildInputIndex;
00182 
00183     vector<SharedLhxPartition> partitionList;
00184     partitionList.push_back(probePart);
00185     partitionList.push_back(buildPart);
00186 
00187     vector<shared_array<uint> > subPartStats;
00188     subPartStats.push_back(shared_array<uint>());
00189     subPartStats.push_back(shared_array<uint>());
00190 
00191     shared_ptr<dynamic_bitset<> > joinFilterInit =
00192         shared_ptr<dynamic_bitset<> >();
00193 
00194     VectorOfUint filteredRows;
00195     filteredRows.push_back(0);
00196     filteredRows.push_back(0);
00197 
00198     /*
00199      * No input join filter for root plan.
00200      */
00201     rootPlan =  SharedLhxPlan(new LhxPlan());
00202     rootPlan->init(
00203         WeakLhxPlan(),
00204         partitionLevel,
00205         partitionList,
00206         subPartStats,
00207         joinFilterInit,
00208         filteredRows,
00209         enableSubPartStat,
00210         enableSwing);
00211 
00212     /*
00213      * Initialize recursive partitioning context.
00214      */
00215     partInfo.init(&hashInfo);
00216 
00217     curPlan = rootPlan.get();
00218     isTopPlan = true;
00219 
00220     hashTable.init(
00221         curPlan->getPartitionLevel(),
00222         hashInfo,
00223         curPlan->getBuildInput());
00224     hashTableReader.init(&hashTable, hashInfo, curPlan->getBuildInput());
00225 
00226     bool status = hashTable.allocateResources();
00227     assert (status);
00228 
00229     buildReader.open(curPlan->getBuildPartition(), hashInfo);
00230 
00231     joinState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00232     nextState.clear();
00233 }
00234 
00235 ExecStreamResult LhxJoinExecStream::execute(ExecStreamQuantum const &quantum)
00236 {
00237     while (true) {
00238         switch (joinState) {
00239         case ForcePartitionBuild:
00240             {
00241                 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()];
00242 
00243                 /*
00244                  * Build
00245                  */
00246                 for (;;) {
00247                     if (!buildReader.isTupleConsumptionPending()) {
00248                         if (buildReader.getState() == EXECBUF_EOS) {
00249                             /*
00250                              * break out of this loop, and start probing.
00251                              */
00252                             buildReader.close();
00253                             probeReader.open(
00254                                 curPlan->getProbePartition(),
00255                                 hashInfo);
00256                             joinState = Probe;
00257                             numTuplesProduced = 0;
00258                             break;
00259                         }
00260 
00261                         if (!buildReader.demandData()) {
00262                             if (isTopPlan) {
00263                                 /*
00264                                  * Top level: request more data from producer.
00265                                  */
00266                                 return EXECRC_BUF_UNDERFLOW;
00267                             } else {
00268                                 /*
00269                                  * Recursive level: no more data in partition.
00270                                  * Come back to the top of the same state to
00271                                  * detect EOS.
00272                                  */
00273                                 break;
00274                             }
00275                         }
00276                         buildReader.unmarshalTuple(buildTuple);
00277                     }
00278 
00279                     /*
00280                      * Add tuple to hash table.
00281                      *
00282                      * NOTE: This is a testing state. Always partition up to
00283                      * forcePartitionLevel.
00284                      */
00285                     if (curPlan->getPartitionLevel() < forcePartitionLevel ||
00286                         !hashTable.addTuple(buildTuple)) {
00287                         /*
00288                          * If hash table is full, partition input data.
00289                          *
00290                          * First, partition the right(build input).
00291                          */
00292                         partInfo.open(
00293                             &hashTableReader, &buildReader, buildTuple,
00294                             curPlan->getProbePartition(),
00295                             curPlan->getBuildInput());
00296                         joinState = Partition;
00297                         break;
00298                     }
00299                     buildReader.consumeTuple();
00300                 }
00301                 break;
00302             }
00303         case Build:
00304             {
00305                 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()];
00306 
00307                 /*
00308                  * Build
00309                  */
00310                 for (;;) {
00311                     if (!buildReader.isTupleConsumptionPending()) {
00312                         if (buildReader.getState() == EXECBUF_EOS) {
00313                             /*
00314                              * break out of this loop, and start probing.
00315                              */
00316                             buildReader.close();
00317                             probeReader.open(
00318                                 curPlan->getProbePartition(),
00319                                 hashInfo);
00320                             joinState = Probe;
00321                             numTuplesProduced = 0;
00322                             break;
00323                         }
00324 
00325                         if (!buildReader.demandData()) {
00326                             if (isTopPlan) {
00327                                 /*
00328                                  * Top level: request more data from producer.
00329                                  */
00330                                 return EXECRC_BUF_UNDERFLOW;
00331                             } else {
00332                                 /*
00333                                  * Recursive level: no more data in partition.
00334                                  * Come back to the top of the same state to
00335                                  * detect EOS.
00336                                  */
00337                                 break;
00338                             }
00339                         }
00340                         buildReader.unmarshalTuple(buildTuple);
00341                     }
00342 
00343                     /*
00344                      * Add tuple to hash table.
00345                      */
00346                     if (!hashTable.addTuple(buildTuple)) {
00347                         /*
00348                          * If hash table is full, partition input data.
00349                          *
00350                          * First, partition the right(build input).
00351                          */
00352                         partInfo.open(
00353                             &hashTableReader, &buildReader, buildTuple,
00354                             curPlan->getProbePartition(),
00355                             curPlan->getBuildInput());
00356                         joinState = Partition;
00357                         break;
00358                     }
00359                     buildReader.consumeTuple();
00360                 }
00361                 break;
00362             }
00363         case Partition:
00364             {
00365                 for (;;) {
00366                     if (curPlan->generatePartitions(hashInfo, partInfo)
00367                         == PartitionUnderflow) {
00368                         /*
00369                          * Request more data from producer.
00370                          */
00371                         return EXECRC_BUF_UNDERFLOW;
00372                     } else {
00373                         /*
00374                          * Finished building the partitions for both
00375                          * inputs.
00376                          */
00377                         break;
00378                     }
00379                 }
00380                 partInfo.close();
00381                 joinState = CreateChildPlan;
00382                 break;
00383             }
00384         case CreateChildPlan:
00385             {
00386                 /*
00387                  * Link the newly created partitioned in the plan tree.
00388                  */
00389                 curPlan->createChildren(
00390                     partInfo,
00391                     enableSubPartStat,
00392                     enableSwing);
00393 
00394                 FENNEL_TRACE(TRACE_FINE, curPlan->toString());
00395 
00396                 /*
00397                  * now recursice down the plan tree to get the first leaf plan.
00398                  */
00399                 curPlan = curPlan->getFirstChild().get();
00400                 isTopPlan = false;
00401 
00402                 hashTable.releaseResources();
00403 
00404                 hashTable.init(
00405                     curPlan->getPartitionLevel(),
00406                     hashInfo,
00407                     curPlan->getBuildInput());
00408                 hashTableReader.init(
00409                     &hashTable,
00410                     hashInfo,
00411                     curPlan->getBuildInput());
00412 
00413                 bool status = hashTable.allocateResources();
00414                 assert (status);
00415                 buildReader.open(curPlan->getBuildPartition(), hashInfo);
00416 
00417                 joinState =
00418                     (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00419                 nextState.clear();
00420                 break;
00421             }
00422         case GetNextPlan:
00423             {
00424                 hashTable.releaseResources();
00425 
00426                 checkAbort();
00427 
00428                 curPlan = curPlan->getNextLeaf();
00429 
00430                 if (curPlan) {
00431                     hashTable.init(
00432                         curPlan->getPartitionLevel(),
00433                         hashInfo,
00434                         curPlan->getBuildInput());
00435                     hashTableReader.init(
00436                         &hashTable,
00437                         hashInfo,
00438                         curPlan->getBuildInput());
00439 
00440                     bool status = hashTable.allocateResources();
00441                     assert (status);
00442                     buildReader.open(curPlan->getBuildPartition(), hashInfo);
00443                     joinState =
00444                         (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00445                     nextState.clear();
00446                 } else {
00447                     joinState = Done;
00448                 }
00449                 break;
00450             }
00451         case Probe:
00452             {
00453                 TupleData &probeTuple = inputTuple[curPlan->getProbeInput()];
00454                 uint probeTupleSize = inputTupleSize[curPlan->getProbeInput()];
00455                 TupleProjection &probeKeyProj  =
00456                     hashInfo.keyProj[curPlan->getProbeInput()];
00457                 uint buildTupleSize = inputTupleSize[curPlan->getBuildInput()];
00458                 bool removeDuplicateProbe =
00459                     hashInfo.removeDuplicate[curPlan->getProbeInput()];
00460                 TupleProjection &filterNullProbeKeyProj  =
00461                     hashInfo.filterNullKeyProj[curPlan->getProbeInput()];
00462                 bool filterNullProbe = regularJoin;
00463 
00464                 uint probeFieldOffset =
00465                     returnBuild(curPlan) ?
00466                     buildTupleSize * curPlan->getProbeInput() : 0;
00467                 uint buildFieldOffset =
00468                     returnProbe(curPlan) ?
00469                     probeTupleSize * curPlan->getBuildInput() : 0;
00470                 uint probeFieldLength =
00471                     returnProbe(curPlan) ? probeTupleSize : 0;
00472                 uint buildFieldLength =
00473                     returnBuild(curPlan) ? buildTupleSize : 0;
00474 
00475                 /*
00476                  * Probe
00477                  */
00478                 for (;;) {
00479                     if (!probeReader.isTupleConsumptionPending()) {
00480                         if (probeReader.getState() == EXECBUF_EOS) {
00481                             probeReader.close();
00482                             if (returnBuildOuter(curPlan)) {
00483                                 /*
00484                                  * Join types that return non-matching
00485                                  * tuples from the build input:
00486                                  *    RightOuter, FullOuter, RightAnti,
00487                                  *
00488                                  * Set the output tuple to have NULL values on
00489                                  * the left(probe side), and return all the
00490                                  * non-matching tuples in the hash table on the
00491                                  * right.
00492                                  */
00493                                 hashTableReader.bindUnMatched();
00494 
00495                                 /*
00496                                  * fill in the probe side, if required, with
00497                                  * NULLs
00498                                  */
00499                                 for (uint i = 0; i < probeFieldLength; i ++) {
00500                                     outputTuple[i + probeFieldOffset].pData =
00501                                         NULL;
00502                                 }
00503                                 joinState = ProduceBuild;
00504                                 nextState.push_back(GetNextPlan);
00505                             } else {
00506                                 /*
00507                                  * Probing for this plan is done.
00508                                  */
00509                                 joinState = GetNextPlan;
00510                             }
00511                             break;
00512                         }
00513                         if (!probeReader.demandData()) {
00514                             if (isTopPlan) {
00515                                 /*
00516                                  * Top level: request more data from producer.
00517                                  */
00518                                 return EXECRC_BUF_UNDERFLOW;
00519                             } else {
00520                                 /*
00521                                  * Recursive level: no more data in partition.
00522                                  * Come back to the top of the same state to
00523                                  * detect EOS.
00524                                  */
00525                                 break;
00526                             }
00527                         }
00528                         probeReader.unmarshalTuple(probeTuple);
00529                     }
00530 
00531                     PBuffer keyBuf = NULL;
00532 
00533                     /*
00534                      * Try to locate matching key in the hash table.
00535                      * If this tuple does contain null in its key columns, it
00536                      * will not join so hash table lookup is not needed.
00537                      */
00538                     if (!filterNullProbe ||
00539                         !probeTuple.containsNull(filterNullProbeKeyProj)) {
00540                         keyBuf =
00541                             hashTable.findKey(
00542                                 probeTuple,
00543                                 probeKeyProj,
00544                                 removeDuplicateProbe);
00545                     }
00546 
00547                     if (keyBuf) {
00548                         if (returnBuildInner(curPlan)) {
00549                             /*
00550                              * Join types that return matching tuples from both
00551                              * inputs: InnerJoin, LeftOuter, RightOuter,
00552                              * FullOuter
00553                              *
00554                              * Join types that return matching tuples from the
00555                              * build input: RightSemi(when matched for the
00556                              * first time)
00557                              *
00558                              * Set the output tuple to include only the probe
00559                              * input and get all the matching tuples from the
00560                              * build side. For RightSemi, the probeFieldLength
00561                              * to be included in the output tuple is 0.
00562                              */
00563                             for (uint i = 0; i < probeFieldLength; i ++) {
00564                                 outputTuple[i + probeFieldOffset].copyFrom(
00565                                     probeTuple[i]);
00566                             }
00567 
00571                             hashTableReader.bindKey(keyBuf);
00572                             joinState = ProduceBuild;
00573                             nextState.push_back(Probe);
00574                             break;
00575                         } else if (returnProbeInner(curPlan) &&
00576                             !returnProbeOuter() && !returnBuild(curPlan)) {
00577                             /*
00578                              * Join types that return (distinct) matching
00579                              * tuples from the probe input: LeftSemiJoin
00580                              *
00581                              * Produce one output tuple per matched tuple from
00582                              * the left side.
00583                              *
00584                              * Set the output tuple to include only
00585                              * the probe input.
00586                              */
00587                             for (uint i = 0; i < probeFieldLength; i ++) {
00588                                 outputTuple[i + probeFieldOffset].copyFrom(
00589                                     probeTuple[i]);
00590                             }
00591                             joinState = ProducePending;
00592                             nextState.push_back(Probe);
00593                             break;
00594                         } else {
00595                             /*
00596                              * RightAnti and RightSemi(when not matched for the
00597                              * first time) fall through here.
00598                              * Go back to match other probing rows.
00599                              * Return non-matched(for RightAnti)tuples from the
00600                              * hash table.
00601                              */
00602                             probeReader.consumeTuple();
00603                         }
00604                     } else {
00605                         /*
00606                          * No match. Need to return the leftTuple if leftOuter
00607                          * join.
00608                          */
00609                         if (returnProbeOuter(curPlan)) {
00610                             /*
00611                              * Join types that return non-matching
00612                              * tuples from the probe input: LeftOuter, FullOuter
00613                              *
00614                              * Set the output tuple to include only the left
00615                              * input, and set NULL values on the right.
00616                              */
00617                             for (uint i = 0; i < probeFieldLength; i ++) {
00618                                 outputTuple[i + probeFieldOffset].copyFrom(
00619                                     probeTuple[i]);
00620                             }
00621 
00622                             for (uint i = 0; i < buildFieldLength; i ++) {
00623                                 outputTuple[i + buildFieldOffset].pData = NULL;
00624                             }
00625                             joinState = ProducePending;
00626                             nextState.push_back(Probe);
00627                             break;
00628                         } else {
00629                             probeReader.consumeTuple();
00630                         }
00631                     }
00632                 }
00633                 break;
00634             }
00635         case ProduceBuild:
00636             {
00637                 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()];
00638                 uint probeTupleSize = inputTupleSize[curPlan->getProbeInput()];
00639                 uint buildTupleSize = inputTupleSize[curPlan->getBuildInput()];
00640                 uint buildFieldOffset =
00641                     returnProbe(curPlan) ?
00642                     probeTupleSize * curPlan->getBuildInput() : 0;
00643                 uint buildFieldLength =
00644                     returnBuild(curPlan) ? buildTupleSize : 0;
00645 
00646                 /*
00647                  * Producing the results.
00648                  * Handle output overflow and quantum expiration in
00649                  * ProducePending state.
00650                  */
00651                 if (hashTableReader.getNext(buildTuple)) {
00652                     for (uint i = 0; i < buildFieldLength; i ++) {
00653                         outputTuple[i + buildFieldOffset].copyFrom(
00654                             buildTuple[i]);
00655                     }
00656 
00657                     joinState = ProducePending;
00658                     /*
00659                      * Come back to this state after producing the output tuple
00660                      * successfully.
00661                      */
00662                     nextState.push_back(ProduceBuild);
00663                 } else {
00664                     joinState = nextState.back();
00665                     nextState.pop_back();
00666                     if (joinState == Probe) {
00667                         probeReader.consumeTuple();
00668                     }
00669                 }
00670                 break;
00671             }
00672         case ProducePending:
00673             {
00674                 if (pOutAccessor->produceTuple(outputTuple)) {
00675                     numTuplesProduced++;
00676                     joinState = nextState.back();
00677                     nextState.pop_back();
00678                     if (joinState == Probe) {
00679                         probeReader.consumeTuple();
00680                     }
00681                 } else {
00682                     numTuplesProduced = 0;
00683                     return EXECRC_BUF_OVERFLOW;
00684                 }
00685 
00686                 /*
00687                  * Successfully produced an output row. Now check if quantum
00688                  * has expired.
00689                  */
00690                 if (numTuplesProduced >= quantum.nTuplesMax) {
00691                     /*
00692                      * Reset count.
00693                      */
00694                     numTuplesProduced = 0;
00695                     return EXECRC_QUANTUM_EXPIRED;
00696                 }
00697                 break;
00698             }
00699         case Done:
00700             {
00701                 pOutAccessor->markEOS();
00702                 return EXECRC_EOS;
00703             }
00704         }
00705     }
00706 
00707     /*
00708      * The state machine should never come here.
00709      */
00710     assert (false);
00711 }
00712 
00713 void LhxJoinExecStream::closeImpl()
00714 {
00715     hashTable.releaseResources();
00716     if (rootPlan) {
00717         rootPlan->close();
00718         rootPlan.reset();
00719     }
00720     ConfluenceExecStream::closeImpl();
00721 }
00722 
00723 void LhxJoinExecStream::setJoinType(
00724     LhxJoinExecStreamParams const &params)
00725 {
00726     /*
00727      * Join types currently supported:
00728      *
00729      * Inner, Left Outer, Right Outer, Full Outer
00730      * Right Anti(return non-matching rows from the build side)
00731      * Left Semi(return matching rows from the probe side)
00732      *
00733      * These join types are marked by using the above four parameters. Each
00734      * specify whether to return matching or nonmatching tuples from a join
00735      * input.
00736      *
00737      * LeftInner LeftOuter RightInner RightOuter   Join Type
00738      *     F         T          F          F      (Left Anti)
00739      *     F         F          F          T       Right Anti
00740      *     T         F          F          F       Left Semi
00741      *     F         F          T          F      (Right Semi)
00742      *     T         F          T          F       Inner Join
00743      *     T         F          T          T       Right Outer
00744      *     T         T          T          F       Left Outer
00745      *     T         T          T          T       Full Outer
00746      * Note join types in () are not visiible in optimizer plan.
00747      */
00748 
00749     joinType.reset(new dynamic_bitset<>(4));
00750 
00751     joinType->set(0, params.leftInner);
00752     joinType->set(1, params.leftOuter);
00753     joinType->set(2, params.rightInner);
00754     joinType->set(3, params.rightOuter);
00755 
00756     /*
00757      * By construction, at most one of the above six is true for a combination
00758      * of values.
00759      * Now make sure at least one of them is true.
00760      * Otherwise, the optimizer has passed in a join type not supported by this
00761      * join implementation.
00762      */
00763     assert (joinType->count() != 0);
00764 
00765     regularJoin   = !params.setopDistinct && !params.setopAll;
00766     setopDistinct =  params.setopDistinct && !params.setopAll;
00767     setopAll      = !params.setopDistinct &&  params.setopAll;
00768 
00769     assert (!setopAll && (regularJoin || setopDistinct));
00770 
00771     /*
00772      * Anit joins with duplicate removal needs to use hash table to remove
00773      * duplicated non-matching tuples. Hence the anti side needs to be the
00774      * build input(original right side).
00775      */
00776     bool leftAnti =
00777         (returnProbeOuter() && !returnProbeInner() && !returnBuild());
00778 
00779     assert (!(leftAnti && setopDistinct));
00780 }
00781 
00782 void LhxJoinExecStream::setHashInfo(
00783     LhxJoinExecStreamParams const &params)
00784 {
00785     uint numInputs = inAccessors.size();
00786     for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
00787         hashInfo.streamBufAccessor.push_back(inAccessors[inputIndex]);
00788         hashInfo.inputDesc.push_back(
00789             inAccessors[inputIndex]->getTupleDesc());
00790         /*
00791          * set(distinct) matching operations eliminate duplicates.
00792          */
00793         hashInfo.removeDuplicate.push_back(setopDistinct);
00794         hashInfo.numRows.push_back(params.numRows);
00795         hashInfo.cndKeys.push_back(params.cndKeys);
00796     }
00797 
00798     bool leftSemi =
00799         (returnProbeInner() && !returnProbeOuter() && !returnBuild());
00800 
00801     bool rightSemi =
00802         (returnBuildInner() && !returnBuildOuter() && !returnProbe());
00803 
00804     /*
00805      * removeDuplicate is no longer a feature for set op only. For semi joins,
00806      * e.g. IN predicates, the lookup table needs to eliminate duplicates as
00807      * well. The way this is achieved is different for LEFTSEMI and RIGHTSEMI.
00808      * For LEFTSEMI, if the join output row for a matched tuple from the left
00809      * does not include any matched tuples from the right(the build side), then
00810      * at most one output tuple is returned per left tuple. (See comment in
00811      * execute() state Probe).
00812      * For RIGHTSEMI, however, all the matched tuples from the build side need
00813      * to be returned and only once. This is done by checking the "matched"
00814      * flag per left tuple. If the same key has not been matched before, then
00815      * return all matching tuples from the RHS. Otherwise, discard(and return
00816      * nothing from the RHS) and go to the next tuple on the left. In this
00817      * case, the call to findKey() in execute():Probe needs to pass true for
00818      * parameter removeDuplicateProbe.
00819      *
00820      */
00821     if (leftSemi) {
00822         hashInfo.removeDuplicate[DefaultBuildInputIndex] = true;
00823     }
00824 
00825     if (rightSemi) {
00826         hashInfo.removeDuplicate[DefaultProbeInputIndex] = true;
00827     }
00828 
00829     /*
00830      * Nulls do not join, unless in set operation.
00831      * Filter null values if non-matching tuples are not needed.
00832      */
00833     hashInfo.filterNull.push_back(regularJoin && !returnProbeOuter());
00834     hashInfo.filterNull.push_back(regularJoin && !returnBuildOuter());
00835 
00836     hashInfo.keyProj.push_back(params.leftKeyProj);
00837     hashInfo.keyProj.push_back(params.rightKeyProj);
00838 
00839     TupleProjection filterNullLeftKeyProj;
00840     TupleProjection filterNullRightKeyProj;
00841 
00842     // only filter null on join sides from which non-joining tuples will not
00843     // need to be returned
00844     filterNullLeftKeyProj.projectFrom(
00845         params.leftKeyProj, params.filterNullKeyProj);
00846 
00847     filterNullRightKeyProj.projectFrom(
00848         params.rightKeyProj, params.filterNullKeyProj);
00849 
00850     hashInfo.filterNullKeyProj.push_back(filterNullLeftKeyProj);
00851     hashInfo.filterNullKeyProj.push_back(filterNullRightKeyProj);
00852 
00853     hashInfo.useJoinFilter.push_back(
00854         params.enableJoinFilter && !returnProbeOuter());
00855     hashInfo.useJoinFilter.push_back(
00856         params.enableJoinFilter && !returnBuildOuter());
00857 
00858     hashInfo.memSegmentAccessor = params.scratchAccessor;
00859     hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00860     hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00861 
00862     for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
00863         TupleProjection &keyProj  = hashInfo.keyProj[inputIndex];
00864         TupleDescriptor &inputDesc  = hashInfo.inputDesc[inputIndex];
00865 
00866         vector<LhxHashTrim> isKeyVarChar;
00867         TupleProjection dataProj;
00868 
00869         /*
00870          * Hashing is special for varchar types(the trailing blanks are
00871          * insignificant).
00872          */
00873         for (int j = 0; j < keyProj.size(); j ++) {
00874             StoredTypeDescriptor::Ordinal ordinal =
00875                 inputDesc[keyProj[j]].pTypeDescriptor->getOrdinal();
00876             if (ordinal == STANDARD_TYPE_VARCHAR) {
00877                 isKeyVarChar.push_back(HASH_TRIM_VARCHAR);
00878             } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) {
00879                 isKeyVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR);
00880             } else {
00881                 isKeyVarChar.push_back(HASH_TRIM_NONE);
00882             }
00883         }
00884 
00885         hashInfo.isKeyColVarChar.push_back(isKeyVarChar);
00886 
00887         /*
00888          * Need to construct a covering set of keys; for example:
00889          * keyProj (3,4,2,3) should have a covering set of (3,4,2);
00890          */
00891         for (int i = 0; i < inputDesc.size(); i ++) {
00892             /*
00893              * Okay a dumb for loop to search for key columns.
00894              */
00895             bool colIsKey = false;
00896             for (int j = 0; j < keyProj.size(); j ++) {
00897                 if (i == keyProj[j]) {
00898                     colIsKey = true;
00899                     break;
00900                 }
00901             }
00902             if (!colIsKey) {
00903                 dataProj.push_back(i);
00904             }
00905         }
00906         hashInfo.dataProj.push_back(dataProj);
00907     }
00908 }
00909 
00910 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxJoinExecStream.cpp#4 $");
00911 
00912 // End LhxJoinExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1