00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxAggExecStream.h"
00025 #include "fennel/segment/Segment.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 
00029 using namespace std;
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $");
00032 
00033 void LhxAggExecStream::prepare(
00034     LhxAggExecStreamParams const ¶ms)
00035 {
00036     ConduitExecStream::prepare(params);
00037 
00038     setHashInfo(params);
00039     setAggComputers(hashInfo, params.aggInvocations);
00040 
00041     
00042 
00043 
00044     forcePartitionLevel = params.forcePartitionLevel;
00045     enableSubPartStat = params.enableSubPartStat;
00046 
00047     buildInputIndex = hashInfo.inputDesc.size() - 1;
00048 
00049     
00050 
00051 
00052 
00053     hashTable.calculateSize(hashInfo, buildInputIndex, numBlocksHashTable);
00054 
00055     TupleDescriptor outputDesc;
00056 
00057     outputDesc = hashInfo.inputDesc[buildInputIndex];
00058 
00059     if (!params.outputTupleDesc.empty()) {
00060         assert (outputDesc == params.outputTupleDesc);
00061     }
00062 
00063     outputTuple.compute(outputDesc);
00064     pOutAccessor->setTupleShape(outputDesc);
00065 
00066     
00067 
00068 
00069     uint numInputs = 1;
00070     numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs;
00071 }
00072 
00073 void LhxAggExecStream::getResourceRequirements(
00074     ExecStreamResourceQuantity &minQuantity,
00075     ExecStreamResourceQuantity &optQuantity,
00076     ExecStreamResourceSettingType &optType)
00077 {
00078     ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00079 
00080     uint minPages =
00081         LhxHashTable::LhxHashTableMinPages * LhxPlan::LhxChildPartCount
00082         + numMiscCacheBlocks;
00083     minQuantity.nCachePages += minPages;
00084     
00085     if (isMAXU(numBlocksHashTable)) {
00086         optType = EXEC_RESOURCE_UNBOUNDED;
00087     } else {
00088         
00089         
00090         optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable);
00091         optType = EXEC_RESOURCE_ESTIMATE;
00092     }
00093 }
00094 
00095 void LhxAggExecStream::setResourceAllocation(
00096     ExecStreamResourceQuantity &quantity)
00097 {
00098     ConduitExecStream::setResourceAllocation(quantity);
00099     hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks;
00100 }
00101 
00102 
00103 void LhxAggExecStream::open(bool restart)
00104 {
00105     ConduitExecStream::open(restart);
00106 
00107     if (restart) {
00108         hashTable.releaseResources();
00109     }
00110 
00111     uint partitionLevel = 0;
00112     hashTable.init(partitionLevel, hashInfo, &aggComputers, buildInputIndex);
00113     hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00114 
00115     bool status = hashTable.allocateResources();
00116 
00117     assert(status);
00118 
00119     
00120     
00121     
00122     
00123     
00124 
00125     
00126 
00127 
00128 
00129 
00130     vector<SharedLhxPartition> partitionList;
00131 
00132     buildPart = SharedLhxPartition(new LhxPartition(this));
00133     
00134     
00135     buildPart->segStream.reset();
00136     buildPart->inputIndex = 0;
00137     partitionList.push_back(buildPart);
00138 
00139     rootPlan = SharedLhxPlan(new LhxPlan());
00140     rootPlan->init(
00141         WeakLhxPlan(),
00142         partitionLevel,
00143         partitionList,
00144         enableSubPartStat);
00145 
00146     
00147 
00148 
00149     partInfo.init(&hashInfo);
00150 
00151     
00152 
00153 
00154     curPlan = rootPlan.get();
00155     isTopPlan = true;
00156 
00157     buildReader.open(curPlan->getPartition(buildInputIndex), hashInfo);
00158 
00159     aggState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00160 }
00161 
00162 ExecStreamResult LhxAggExecStream::execute(ExecStreamQuantum const &quantum)
00163 {
00164     while (true) {
00165         
00166         
00167         
00168         
00169         switch (aggState) {
00170             
00171             
00172             
00173             
00174         case ForcePartitionBuild:
00175             {
00176                 
00177 
00178 
00179                 
00180                 
00181                 inputTuple.compute(buildReader.getTupleDesc());
00182                 for (;;) {
00183                     if (!buildReader.isTupleConsumptionPending()) {
00184                         if (buildReader.getState() == EXECBUF_EOS) {
00185                             numTuplesProduced = 0;
00186                             
00187 
00188 
00189                             aggState = Produce;
00190                             break;
00191                         }
00192 
00193                         if (!buildReader.demandData()) {
00194                             if (isTopPlan) {
00195                                 
00196 
00197 
00198 
00199                                 return EXECRC_BUF_UNDERFLOW;
00200                             } else {
00201                                 
00202 
00203 
00204 
00205 
00206                                 break;
00207                             }
00208                         }
00209                         buildReader.unmarshalTuple(inputTuple);
00210                     }
00211 
00212                     
00213 
00214 
00215 
00216 
00217 
00218                     if (curPlan->getPartitionLevel() < forcePartitionLevel ||
00219                         !hashTable.addTuple(inputTuple)) {
00220                         if (isTopPlan) {
00221                             partInfo.open(
00222                                 &hashTableReader,
00223                                 &buildReader,
00224                                 inputTuple,
00225                                 &aggComputers);
00226                         } else {
00227                             partInfo.open(
00228                                 &hashTableReader,
00229                                 &buildReader,
00230                                 inputTuple,
00231                                 &partialAggComputers);
00232                         }
00233                         aggState = Partition;
00234                         break;
00235                     }
00236                     buildReader.consumeTuple();
00237                 }
00238                 break;
00239             }
00240         case Build:
00241             {
00242                 
00243 
00244 
00245                 inputTuple.compute(buildReader.getTupleDesc());
00246                 for (;;) {
00247                     if (!buildReader.isTupleConsumptionPending()) {
00248                         if (buildReader.getState() == EXECBUF_EOS) {
00249                             buildReader.close();
00250                             numTuplesProduced = 0;
00251                             
00252 
00253 
00254                             aggState = Produce;
00255                             break;
00256                         }
00257 
00258                         if (!buildReader.demandData()) {
00259                             if (isTopPlan) {
00260                                 
00261 
00262 
00263 
00264                                 return EXECRC_BUF_UNDERFLOW;
00265                             } else {
00266                                 
00267 
00268 
00269 
00270 
00271                                 break;
00272                             }
00273                         }
00274                         buildReader.unmarshalTuple(inputTuple);
00275                     }
00276 
00277                     
00278 
00279 
00280                     if (!hashTable.addTuple(inputTuple)) {
00281                         if (isTopPlan) {
00282                             partInfo.open(
00283                                 &hashTableReader,
00284                                 &buildReader,
00285                                 inputTuple,
00286                                 &aggComputers);
00287                         } else {
00288                             partInfo.open(
00289                                 &hashTableReader,
00290                                 &buildReader,
00291                                 inputTuple,
00292                                 &partialAggComputers);
00293                         }
00294                         aggState = Partition;
00295                         break;
00296                     }
00297                     buildReader.consumeTuple();
00298                 }
00299                 break;
00300             }
00301         case Partition:
00302             {
00303                 for (;;) {
00304                     if (curPlan->generatePartitions(hashInfo, partInfo)
00305                         == PartitionUnderflow) {
00306                         
00307 
00308 
00309                         return EXECRC_BUF_UNDERFLOW;
00310                     } else {
00311                         
00312                         
00313 
00314 
00315 
00316                         break;
00317                     }
00318                 }
00319                 partInfo.close();
00320                 aggState = CreateChildPlan;
00321                 break;
00322             }
00323         case CreateChildPlan:
00324             {
00325                 
00326 
00327 
00328                 curPlan->createChildren(partInfo, false, false);
00329 
00330                 FENNEL_TRACE(TRACE_FINE, curPlan->toString());
00331 
00332                 
00333                 
00334                 
00335                 
00336                 
00337                 
00338 
00339 
00340                 curPlan = curPlan->getFirstChild().get();
00341                 isTopPlan = false;
00342 
00343                 hashTable.releaseResources();
00344 
00345                 
00346 
00347 
00348 
00349 
00350 
00351                 hashTable.init(
00352                     curPlan->getPartitionLevel(),
00353                     hashInfo,
00354                     &partialAggComputers,
00355                     buildInputIndex);
00356                 hashTableReader.init(
00357                     &hashTable,
00358                     hashInfo,
00359                     buildInputIndex);
00360 
00361                 bool status = hashTable.allocateResources();
00362                 assert(status);
00363 
00364                 buildReader.open(
00365                     curPlan->getPartition(buildInputIndex),
00366                     hashInfo);
00367 
00368                 aggState =
00369                     (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00370                 break;
00371             }
00372         case GetNextPlan:
00373             {
00374                 hashTable.releaseResources();
00375 
00376                 checkAbort();
00377 
00378                 curPlan = curPlan->getNextLeaf();
00379 
00380                 if (curPlan) {
00381                     
00382 
00383 
00384 
00385 
00386 
00387                     hashTable.init(
00388                         curPlan->getPartitionLevel(),
00389                         hashInfo,
00390                         &partialAggComputers,
00391                         buildInputIndex);
00392                     hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00393                     bool status = hashTable.allocateResources();
00394                     assert(status);
00395 
00396                     buildReader.open(
00397                         curPlan->getPartition(buildInputIndex),
00398                         hashInfo);
00399 
00400                     aggState =
00401                         (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00402                 } else {
00403                     aggState = Done;
00404                 }
00405                 break;
00406             }
00407         case Produce:
00408             {
00409                 
00410                 
00411                 
00412                 
00413                 
00414                 
00415                 
00416                 
00417 
00418 
00419 
00420                 if (hashTableReader.getNext(outputTuple)) {
00421                     aggState = ProducePending;
00422                     
00423 
00424 
00425 
00426                     nextState = Produce;
00427                 } else {
00428                     aggState = GetNextPlan;
00429                 }
00430                 break;
00431             }
00432         case ProducePending:
00433             {
00434                 if (pOutAccessor->produceTuple(outputTuple)) {
00435                     numTuplesProduced++;
00436                     aggState = nextState;
00437                 } else {
00438                     numTuplesProduced = 0;
00439                     return EXECRC_BUF_OVERFLOW;
00440                 }
00441 
00442                 
00443 
00444 
00445 
00446                 if (numTuplesProduced >= quantum.nTuplesMax) {
00447                     
00448 
00449 
00450                     numTuplesProduced = 0;
00451                     return EXECRC_QUANTUM_EXPIRED;
00452                 }
00453                 break;
00454             }
00455         case Done:
00456             {
00457                 pOutAccessor->markEOS();
00458                 return EXECRC_EOS;
00459             }
00460         }
00461     }
00462 
00463     
00464 
00465 
00466     assert(false);
00467 }
00468 
00469 void LhxAggExecStream::closeImpl()
00470 {
00471     hashTable.releaseResources();
00472     if (rootPlan) {
00473         rootPlan->close();
00474         rootPlan.reset();
00475     }
00476     
00477     
00478     
00479     ConduitExecStream::closeImpl();
00480 }
00481 
00482 void LhxAggExecStream::setAggComputers(
00483     LhxHashInfo &hashInfo,
00484     AggInvocationList const &aggInvocations)
00485 {
00486     
00487 
00488 
00489     TupleDescriptor inputDesc = pInAccessor->getTupleDesc();
00490 
00491     
00492 
00493 
00494 
00495     TupleDescriptor &hashDesc = hashInfo.inputDesc.back();
00496 
00497     
00498 
00499 
00500     TupleProjection &aggsProj = hashInfo.aggsProj;
00501 
00506     AggFunction partialAggFunction;
00507 
00508     uint i = 0;
00509 
00510     assert (aggInvocations.size() == aggsProj.size());
00511 
00512     for (AggInvocationConstIter pInvocation(aggInvocations.begin());
00513          pInvocation != aggInvocations.end();
00514          ++pInvocation)
00515     {
00516         switch (pInvocation->aggFunction) {
00517         case AGG_FUNC_COUNT:
00518             partialAggFunction = AGG_FUNC_SUM;
00519             break;
00520         case AGG_FUNC_SUM:
00521         case AGG_FUNC_MIN:
00522         case AGG_FUNC_MAX:
00523         case AGG_FUNC_SINGLE_VALUE:
00524             partialAggFunction = pInvocation->aggFunction;
00525             break;
00526         default:
00527             permFail("unknown aggregation function: "
00528                      << pInvocation->aggFunction);
00529             break;
00530         }
00531 
00532         
00533 
00534 
00535         TupleAttributeDescriptor const *pInputAttr = NULL;
00536         if (pInvocation->iInputAttr != -1) {
00537             pInputAttr = &(inputDesc[pInvocation->iInputAttr]);
00538         }
00539         aggComputers.push_back(
00540             AggComputer::newAggComputer(
00541                 pInvocation->aggFunction, pInputAttr));
00542         aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr);
00543 
00544         
00545 
00546 
00547         TupleAttributeDescriptor const *pInputAttrPartialAgg =
00548             &(hashDesc[aggsProj[i]]);
00549         partialAggComputers.push_back(
00550             AggComputer::newAggComputer(
00551                 partialAggFunction, pInputAttrPartialAgg));
00552         partialAggComputers.back().setInputAttrIndex(aggsProj[i]);
00553         i ++;
00554     }
00555 }
00556 
00557 void LhxAggExecStream::setHashInfo(
00558     LhxAggExecStreamParams const ¶ms)
00559 {
00560     TupleDescriptor inputDesc = pInAccessor->getTupleDesc();
00561 
00562     hashInfo.streamBufAccessor.push_back(pInAccessor);
00563 
00564     hashInfo.cndKeys.push_back(params.cndGroupByKeys);
00565 
00566     hashInfo.numRows.push_back(params.numRows);
00567 
00568     hashInfo.filterNull.push_back(false);
00569 
00570     
00571     TupleProjection filterNullKeyProj;
00572     hashInfo.filterNullKeyProj.push_back(filterNullKeyProj);
00573 
00574     hashInfo.removeDuplicate.push_back(false);
00575     hashInfo.useJoinFilter.push_back(false);
00576 
00577     hashInfo.memSegmentAccessor = params.scratchAccessor;
00578     hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00579     hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00580 
00581     TupleProjection keyProj;
00582     vector<LhxHashTrim> isKeyColVarChar;
00583 
00584     for (int i = 0; i < params.groupByKeyCount; i ++) {
00585         keyProj.push_back(i);
00586         
00587 
00588 
00589 
00590         StoredTypeDescriptor::Ordinal ordinal =
00591             inputDesc[i].pTypeDescriptor->getOrdinal();
00592         if (ordinal == STANDARD_TYPE_VARCHAR) {
00593             isKeyColVarChar.push_back(HASH_TRIM_VARCHAR);
00594         } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) {
00595             isKeyColVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR);
00596         } else {
00597             isKeyColVarChar.push_back(HASH_TRIM_NONE);
00598         }
00599     }
00600     hashInfo.keyProj.push_back(keyProj);
00601     hashInfo.isKeyColVarChar.push_back(isKeyColVarChar);
00602 
00603     
00604 
00605 
00606     TupleProjection dataProj;
00607     hashInfo.dataProj.push_back(dataProj);
00608 
00609     
00610 
00611 
00612     TupleDescriptor keyDesc;
00613     keyDesc.projectFrom(inputDesc, keyProj);
00614 
00615     
00616 
00617 
00618     StandardTypeDescriptorFactory stdTypeFactory;
00619     TupleAttributeDescriptor countDesc(
00620         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00621 
00622     
00623     
00624     
00625     
00626     
00627     
00628     
00629     
00630     
00631     
00632     
00633 
00634     
00635     
00636 
00637 
00638 
00639 
00640     int i = params.groupByKeyCount;
00641     for (AggInvocationConstIter pInvocation(params.aggInvocations.begin());
00642          pInvocation != params.aggInvocations.end();
00643          ++pInvocation)
00644     {
00645         switch (pInvocation->aggFunction) {
00646         case AGG_FUNC_COUNT:
00647             keyDesc.push_back(countDesc);
00648             break;
00649         case AGG_FUNC_SUM:
00650         case AGG_FUNC_MIN:
00651         case AGG_FUNC_MAX:
00652         case AGG_FUNC_SINGLE_VALUE:
00653             
00654             keyDesc.push_back(inputDesc[pInvocation->iInputAttr]);
00655             keyDesc.back().isNullable = true;
00656             break;
00657         }
00658         hashInfo.aggsProj.push_back(i++);
00659     }
00660 
00661     hashInfo.inputDesc.push_back(keyDesc);
00662 }
00663 
00664 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $");
00665 
00666