LhxAggExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxAggExecStream.h"
00025 #include "fennel/segment/Segment.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 
00029 using namespace std;
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $");
00032 
00033 void LhxAggExecStream::prepare(
00034     LhxAggExecStreamParams const &params)
00035 {
00036     ConduitExecStream::prepare(params);
00037 
00038     setHashInfo(params);
00039     setAggComputers(hashInfo, params.aggInvocations);
00040 
00041     /*
00042      * Force partitioning level. Only set in tests.
00043      */
00044     forcePartitionLevel = params.forcePartitionLevel;
00045     enableSubPartStat = params.enableSubPartStat;
00046 
00047     buildInputIndex = hashInfo.inputDesc.size() - 1;
00048 
00049     /*
00050      * number of block and slots required to perform the aggregation in memory,
00051      * using estimates from the optimizer.
00052      */
00053     hashTable.calculateSize(hashInfo, buildInputIndex, numBlocksHashTable);
00054 
00055     TupleDescriptor outputDesc;
00056 
00057     outputDesc = hashInfo.inputDesc[buildInputIndex];
00058 
00059     if (!params.outputTupleDesc.empty()) {
00060         assert (outputDesc == params.outputTupleDesc);
00061     }
00062 
00063     outputTuple.compute(outputDesc);
00064     pOutAccessor->setTupleShape(outputDesc);
00065 
00066     /*
00067      * Set aside one cache block per child partition writer for I/O
00068      */
00069     uint numInputs = 1;
00070     numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs;
00071 }
00072 
00073 void LhxAggExecStream::getResourceRequirements(
00074     ExecStreamResourceQuantity &minQuantity,
00075     ExecStreamResourceQuantity &optQuantity,
00076     ExecStreamResourceSettingType &optType)
00077 {
00078     ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00079 
00080     uint minPages =
00081         LhxHashTable::LhxHashTableMinPages * LhxPlan::LhxChildPartCount
00082         + numMiscCacheBlocks;
00083     minQuantity.nCachePages += minPages;
00084     // if valid stats weren't passed in, make an unbounded resource request
00085     if (isMAXU(numBlocksHashTable)) {
00086         optType = EXEC_RESOURCE_UNBOUNDED;
00087     } else {
00088         // make sure the opt is bigger than the min; otherwise, the
00089         // resource governor won't try to give it extra
00090         optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable);
00091         optType = EXEC_RESOURCE_ESTIMATE;
00092     }
00093 }
00094 
00095 void LhxAggExecStream::setResourceAllocation(
00096     ExecStreamResourceQuantity &quantity)
00097 {
00098     ConduitExecStream::setResourceAllocation(quantity);
00099     hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks;
00100 }
00101 
00102 
00103 void LhxAggExecStream::open(bool restart)
00104 {
00105     ConduitExecStream::open(restart);
00106 
00107     if (restart) {
00108         hashTable.releaseResources();
00109     }
00110 
00111     uint partitionLevel = 0;
00112     hashTable.init(partitionLevel, hashInfo, &aggComputers, buildInputIndex);
00113     hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00114 
00115     bool status = hashTable.allocateResources();
00116 
00117     assert(status);
00118 
00119     // REVIEW jvs 25-Aug-2006: Fennel coding convention is pParentPlan,
00120     // pBuildPart, etc.  Same comment applies everywhere.  Also, consider using
00121     // boost::ptr_vector<LhxPartition> rather than
00122     // std::vector<SharedLhxPartition> (unless shared pointers are really
00123     // required).
00124 
00125     /*
00126      * Create the root plan.
00127      *
00128      * The execute state machine operates at the plan level.
00129      */
00130     vector<SharedLhxPartition> partitionList;
00131 
00132     buildPart = SharedLhxPartition(new LhxPartition(this));
00133     // REVIEW jvs 25-Aug-2006:  Why does buildPart->segStream need to be reset
00134     // immediately after construction?
00135     buildPart->segStream.reset();
00136     buildPart->inputIndex = 0;
00137     partitionList.push_back(buildPart);
00138 
00139     rootPlan = SharedLhxPlan(new LhxPlan());
00140     rootPlan->init(
00141         WeakLhxPlan(),
00142         partitionLevel,
00143         partitionList,
00144         enableSubPartStat);
00145 
00146     /*
00147      * initialize recursive partitioning context.
00148      */
00149     partInfo.init(&hashInfo);
00150 
00151     /*
00152      * Now starts at the first (root) plan.
00153      */
00154     curPlan = rootPlan.get();
00155     isTopPlan = true;
00156 
00157     buildReader.open(curPlan->getPartition(buildInputIndex), hashInfo);
00158 
00159     aggState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00160 }
00161 
00162 ExecStreamResult LhxAggExecStream::execute(ExecStreamQuantum const &quantum)
00163 {
00164     while (true) {
00165         // REVIEW jvs 25-Aug-2006:  Some compilers do better if you
00166         // put the most commonly used cases first in a switch.  Definitely
00167         // from a "follow-the-logic" standpoint, a testing-only state
00168         // like ForcePartitionBuild belongs last.
00169         switch (aggState) {
00170             // REVIEW jvs 25-Aug-2006:  I'm not sure that repeating all
00171             // of this code between the ForcePartitionBuild and Build
00172             // states is worth it just to remove one test from the
00173             // inner loop.
00174         case ForcePartitionBuild:
00175             {
00176                 /*
00177                  * Build
00178                  */
00179                 // REVIEW jvs 25-Aug-2006:  Is it really necessary to compute
00180                 // the tuple every time through here?
00181                 inputTuple.compute(buildReader.getTupleDesc());
00182                 for (;;) {
00183                     if (!buildReader.isTupleConsumptionPending()) {
00184                         if (buildReader.getState() == EXECBUF_EOS) {
00185                             numTuplesProduced = 0;
00186                             /*
00187                              * break out of this loop, and start returning.
00188                              */
00189                             aggState = Produce;
00190                             break;
00191                         }
00192 
00193                         if (!buildReader.demandData()) {
00194                             if (isTopPlan) {
00195                                 /*
00196                                  * Top level: request more data from stream
00197                                  * producer.
00198                                  */
00199                                 return EXECRC_BUF_UNDERFLOW;
00200                             } else {
00201                                 /*
00202                                  * Recursive level: no more data in partition.
00203                                  * Come back to the top of the same state to
00204                                  * detect EOS.
00205                                  */
00206                                 break;
00207                             }
00208                         }
00209                         buildReader.unmarshalTuple(inputTuple);
00210                     }
00211 
00212                     /*
00213                      * Add tuple to hash table.
00214                      *
00215                      * NOTE: This is a testing state. Always partition up to
00216                      * forcePartitionLevel.
00217                      */
00218                     if (curPlan->getPartitionLevel() < forcePartitionLevel ||
00219                         !hashTable.addTuple(inputTuple)) {
00220                         if (isTopPlan) {
00221                             partInfo.open(
00222                                 &hashTableReader,
00223                                 &buildReader,
00224                                 inputTuple,
00225                                 &aggComputers);
00226                         } else {
00227                             partInfo.open(
00228                                 &hashTableReader,
00229                                 &buildReader,
00230                                 inputTuple,
00231                                 &partialAggComputers);
00232                         }
00233                         aggState = Partition;
00234                         break;
00235                     }
00236                     buildReader.consumeTuple();
00237                 }
00238                 break;
00239             }
00240         case Build:
00241             {
00242                 /*
00243                  * Build
00244                  */
00245                 inputTuple.compute(buildReader.getTupleDesc());
00246                 for (;;) {
00247                     if (!buildReader.isTupleConsumptionPending()) {
00248                         if (buildReader.getState() == EXECBUF_EOS) {
00249                             buildReader.close();
00250                             numTuplesProduced = 0;
00251                             /*
00252                              * break out of this loop, and start returning.
00253                              */
00254                             aggState = Produce;
00255                             break;
00256                         }
00257 
00258                         if (!buildReader.demandData()) {
00259                             if (isTopPlan) {
00260                                 /*
00261                                  * Top level: request more data from stream
00262                                  * producer.
00263                                  */
00264                                 return EXECRC_BUF_UNDERFLOW;
00265                             } else {
00266                                 /*
00267                                  * Recursive level: no more data in partition.
00268                                  * Come back to the top of the same state to
00269                                  * detect EOS.
00270                                  */
00271                                 break;
00272                             }
00273                         }
00274                         buildReader.unmarshalTuple(inputTuple);
00275                     }
00276 
00277                     /*
00278                      * Add tuple to hash table.
00279                      */
00280                     if (!hashTable.addTuple(inputTuple)) {
00281                         if (isTopPlan) {
00282                             partInfo.open(
00283                                 &hashTableReader,
00284                                 &buildReader,
00285                                 inputTuple,
00286                                 &aggComputers);
00287                         } else {
00288                             partInfo.open(
00289                                 &hashTableReader,
00290                                 &buildReader,
00291                                 inputTuple,
00292                                 &partialAggComputers);
00293                         }
00294                         aggState = Partition;
00295                         break;
00296                     }
00297                     buildReader.consumeTuple();
00298                 }
00299                 break;
00300             }
00301         case Partition:
00302             {
00303                 for (;;) {
00304                     if (curPlan->generatePartitions(hashInfo, partInfo)
00305                         == PartitionUnderflow) {
00306                         /*
00307                          * Request more data from producer.
00308                          */
00309                         return EXECRC_BUF_UNDERFLOW;
00310                     } else {
00311                         // REVIEW jvs 25-Aug-2006:  only one input for agg..
00312                         /*
00313                          * Finished building the partitions for both
00314                          * inputs.
00315                          */
00316                         break;
00317                     }
00318                 }
00319                 partInfo.close();
00320                 aggState = CreateChildPlan;
00321                 break;
00322             }
00323         case CreateChildPlan:
00324             {
00325                 /*
00326                  * Link the newly created partitioned in the plan tree.
00327                  */
00328                 curPlan->createChildren(partInfo, false, false);
00329 
00330                 FENNEL_TRACE(TRACE_FINE, curPlan->toString());
00331 
00332                 // REVIEW jvs 25-Aug-2006:  This comment makes it sound
00333                 // like it's walking multiple levels in the plan tree
00334                 // right here, but really it's just walking down to the
00335                 // first leaf it just created (i.e. one step in
00336                 // recursion if curPlan was already non-root).
00337                 /*
00338                  * now recurse down the plan tree to get the first leaf plan.
00339                  */
00340                 curPlan = curPlan->getFirstChild().get();
00341                 isTopPlan = false;
00342 
00343                 hashTable.releaseResources();
00344 
00345                 /*
00346                  * At recursive level, the input tuple shape is
00347                  * different. Inputs are all partial aggregates now.
00348                  * Hash table needs to aggregate partial results using the
00349                  * correct agg computer interface.
00350                  */
00351                 hashTable.init(
00352                     curPlan->getPartitionLevel(),
00353                     hashInfo,
00354                     &partialAggComputers,
00355                     buildInputIndex);
00356                 hashTableReader.init(
00357                     &hashTable,
00358                     hashInfo,
00359                     buildInputIndex);
00360 
00361                 bool status = hashTable.allocateResources();
00362                 assert(status);
00363 
00364                 buildReader.open(
00365                     curPlan->getPartition(buildInputIndex),
00366                     hashInfo);
00367 
00368                 aggState =
00369                     (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00370                 break;
00371             }
00372         case GetNextPlan:
00373             {
00374                 hashTable.releaseResources();
00375 
00376                 checkAbort();
00377 
00378                 curPlan = curPlan->getNextLeaf();
00379 
00380                 if (curPlan) {
00381                     /*
00382                      * At recursive level, the input tuple shape is
00383                      * different. Inputs are all partial aggregates now.
00384                      * Hash table needs to aggregate partial results using the
00385                      * correct agg computer interface.
00386                      */
00387                     hashTable.init(
00388                         curPlan->getPartitionLevel(),
00389                         hashInfo,
00390                         &partialAggComputers,
00391                         buildInputIndex);
00392                     hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00393                     bool status = hashTable.allocateResources();
00394                     assert(status);
00395 
00396                     buildReader.open(
00397                         curPlan->getPartition(buildInputIndex),
00398                         hashInfo);
00399 
00400                     aggState =
00401                         (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00402                 } else {
00403                     aggState = Done;
00404                 }
00405                 break;
00406             }
00407         case Produce:
00408             {
00409                 // REVIEW jvs 25-Aug-2006: Is there a reason tuples can't be
00410                 // pumped out in a loop right here?  Popping in and out of
00411                 // the state machine for every tuple is a bit of overhead.
00412                 // It's only a couple of lines of code which would be
00413                 // duplicated.  (An inline method would contradict
00414                 // my earlier comment about numTuplesProduced being
00415                 // a local variable.)
00416                 /*
00417                  * Producing the results.  Handle output overflow and quantum
00418                  * expiration in ProducePending.
00419                  */
00420                 if (hashTableReader.getNext(outputTuple)) {
00421                     aggState = ProducePending;
00422                     /*
00423                      * Come back to this state after producing the output tuple
00424                      * successfully.
00425                      */
00426                     nextState = Produce;
00427                 } else {
00428                     aggState = GetNextPlan;
00429                 }
00430                 break;
00431             }
00432         case ProducePending:
00433             {
00434                 if (pOutAccessor->produceTuple(outputTuple)) {
00435                     numTuplesProduced++;
00436                     aggState = nextState;
00437                 } else {
00438                     numTuplesProduced = 0;
00439                     return EXECRC_BUF_OVERFLOW;
00440                 }
00441 
00442                 /*
00443                  * Successfully produced an output row. Now check if quantum
00444                  * has expired.
00445                  */
00446                 if (numTuplesProduced >= quantum.nTuplesMax) {
00447                     /*
00448                      * Reset count.
00449                      */
00450                     numTuplesProduced = 0;
00451                     return EXECRC_QUANTUM_EXPIRED;
00452                 }
00453                 break;
00454             }
00455         case Done:
00456             {
00457                 pOutAccessor->markEOS();
00458                 return EXECRC_EOS;
00459             }
00460         }
00461     }
00462 
00463     /*
00464      * The state machine should never come here.
00465      */
00466     assert(false);
00467 }
00468 
00469 void LhxAggExecStream::closeImpl()
00470 {
00471     hashTable.releaseResources();
00472     if (rootPlan) {
00473         rootPlan->close();
00474         rootPlan.reset();
00475     }
00476     // REVIEW jvs 25-Aug-2006: Are there other resources that ought to be
00477     // released here?  Anything in hashTableReader, partInfo, buildPart,
00478     // buildReader?  Or does that all get cleaned up implicitly?
00479     ConduitExecStream::closeImpl();
00480 }
00481 
00482 void LhxAggExecStream::setAggComputers(
00483     LhxHashInfo &hashInfo,
00484     AggInvocationList const &aggInvocations)
00485 {
00486     /*
00487      * InputDesc from underlying producer.
00488      */
00489     TupleDescriptor inputDesc = pInAccessor->getTupleDesc();
00490 
00491     /*
00492      * TupleDescriptor used by the hash table, of the format:
00493      * [ group-by keys, aggregates ]
00494      */
00495     TupleDescriptor &hashDesc = hashInfo.inputDesc.back();
00496 
00497     /*
00498      * Fields corresponding to the aggregates in hashDesc
00499      */
00500     TupleProjection &aggsProj = hashInfo.aggsProj;
00501 
00506     AggFunction partialAggFunction;
00507 
00508     uint i = 0;
00509 
00510     assert (aggInvocations.size() == aggsProj.size());
00511 
00512     for (AggInvocationConstIter pInvocation(aggInvocations.begin());
00513          pInvocation != aggInvocations.end();
00514          ++pInvocation)
00515     {
00516         switch (pInvocation->aggFunction) {
00517         case AGG_FUNC_COUNT:
00518             partialAggFunction = AGG_FUNC_SUM;
00519             break;
00520         case AGG_FUNC_SUM:
00521         case AGG_FUNC_MIN:
00522         case AGG_FUNC_MAX:
00523         case AGG_FUNC_SINGLE_VALUE:
00524             partialAggFunction = pInvocation->aggFunction;
00525             break;
00526         default:
00527             permFail("unknown aggregation function: "
00528                      << pInvocation->aggFunction);
00529             break;
00530         }
00531 
00532         /*
00533          * Add to aggregate computer list.
00534          */
00535         TupleAttributeDescriptor const *pInputAttr = NULL;
00536         if (pInvocation->iInputAttr != -1) {
00537             pInputAttr = &(inputDesc[pInvocation->iInputAttr]);
00538         }
00539         aggComputers.push_back(
00540             AggComputer::newAggComputer(
00541                 pInvocation->aggFunction, pInputAttr));
00542         aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr);
00543 
00544         /*
00545          * Add to partial aggregate computer list.
00546          */
00547         TupleAttributeDescriptor const *pInputAttrPartialAgg =
00548             &(hashDesc[aggsProj[i]]);
00549         partialAggComputers.push_back(
00550             AggComputer::newAggComputer(
00551                 partialAggFunction, pInputAttrPartialAgg));
00552         partialAggComputers.back().setInputAttrIndex(aggsProj[i]);
00553         i ++;
00554     }
00555 }
00556 
00557 void LhxAggExecStream::setHashInfo(
00558     LhxAggExecStreamParams const &params)
00559 {
00560     TupleDescriptor inputDesc = pInAccessor->getTupleDesc();
00561 
00562     hashInfo.streamBufAccessor.push_back(pInAccessor);
00563 
00564     hashInfo.cndKeys.push_back(params.cndGroupByKeys);
00565 
00566     hashInfo.numRows.push_back(params.numRows);
00567 
00568     hashInfo.filterNull.push_back(false);
00569 
00570     // empty projection : do not filter nulls
00571     TupleProjection filterNullKeyProj;
00572     hashInfo.filterNullKeyProj.push_back(filterNullKeyProj);
00573 
00574     hashInfo.removeDuplicate.push_back(false);
00575     hashInfo.useJoinFilter.push_back(false);
00576 
00577     hashInfo.memSegmentAccessor = params.scratchAccessor;
00578     hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00579     hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00580 
00581     TupleProjection keyProj;
00582     vector<LhxHashTrim> isKeyColVarChar;
00583 
00584     for (int i = 0; i < params.groupByKeyCount; i ++) {
00585         keyProj.push_back(i);
00586         /*
00587          * Hashing is special for varchar types (the trailing blanks are
00588          * insignificant).
00589          */
00590         StoredTypeDescriptor::Ordinal ordinal =
00591             inputDesc[i].pTypeDescriptor->getOrdinal();
00592         if (ordinal == STANDARD_TYPE_VARCHAR) {
00593             isKeyColVarChar.push_back(HASH_TRIM_VARCHAR);
00594         } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) {
00595             isKeyColVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR);
00596         } else {
00597             isKeyColVarChar.push_back(HASH_TRIM_NONE);
00598         }
00599     }
00600     hashInfo.keyProj.push_back(keyProj);
00601     hashInfo.isKeyColVarChar.push_back(isKeyColVarChar);
00602 
00603     /*
00604      * Empty data projection.
00605      */
00606     TupleProjection dataProj;
00607     hashInfo.dataProj.push_back(dataProj);
00608 
00609     /*
00610      * Set up keyDesc
00611      */
00612     TupleDescriptor keyDesc;
00613     keyDesc.projectFrom(inputDesc, keyProj);
00614 
00615     /*
00616      * Attribute descriptor for COUNT output
00617      */
00618     StandardTypeDescriptorFactory stdTypeFactory;
00619     TupleAttributeDescriptor countDesc(
00620         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00621 
00622     // REVIEW jvs 25-Aug-2006: It's possible to get rid of this nullability
00623     // type transformation (but it requires matching changes at the Farrago
00624     // level).  The reason is that LhxAggExecStream is only used for GROUP BY.
00625     // Since empty groups are only possible with full-table agg, they are not
00626     // an issue with GROUP BY.  So, the output can only be null if the input
00627     // admits nulls.  However, the validator currently applies the
00628     // transformation in all cases (e.g. SqlSumAggFunction uses
00629     // rtiFirstArgTypeForceNullable).  To do it right, it would need to be
00630     // context-sensitive (and SortedAggExecStream would need to be changed to
00631     // match, discriminating on whether any group keys were specified).
00632     // Probably not worth it.
00633 
00634     // REVIEW jvs 25-Aug-2006: What is the prevTupleDesc mentioned here?
00635     /*
00636       Compute the accumulator result portion of prevTupleDesc based on
00637       requested aggregate function invocations, and instantiate polymorphic
00638       AggComputers bound to correct inputs.
00639     */
00640     int i = params.groupByKeyCount;
00641     for (AggInvocationConstIter pInvocation(params.aggInvocations.begin());
00642          pInvocation != params.aggInvocations.end();
00643          ++pInvocation)
00644     {
00645         switch (pInvocation->aggFunction) {
00646         case AGG_FUNC_COUNT:
00647             keyDesc.push_back(countDesc);
00648             break;
00649         case AGG_FUNC_SUM:
00650         case AGG_FUNC_MIN:
00651         case AGG_FUNC_MAX:
00652         case AGG_FUNC_SINGLE_VALUE:
00653             // Key type is same as input type, but nullable
00654             keyDesc.push_back(inputDesc[pInvocation->iInputAttr]);
00655             keyDesc.back().isNullable = true;
00656             break;
00657         }
00658         hashInfo.aggsProj.push_back(i++);
00659     }
00660 
00661     hashInfo.inputDesc.push_back(keyDesc);
00662 }
00663 
00664 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $");
00665 
00666 // End LhxAggExecStream.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1